diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_sql_upgrade.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_sql_upgrade.py | 1234 |
1 files changed, 770 insertions, 464 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_sql_upgrade.py b/keystone-moon/keystone/tests/unit/test_sql_upgrade.py index d617d445..5ca12f66 100644 --- a/keystone-moon/keystone/tests/unit/test_sql_upgrade.py +++ b/keystone-moon/keystone/tests/unit/test_sql_upgrade.py @@ -29,11 +29,13 @@ WARNING:: all data will be lost. """ -import copy import json import uuid +import migrate from migrate.versioning import api as versioning_api +from migrate.versioning import repository +import mock from oslo_config import cfg from oslo_db import exception as db_exception from oslo_db.sqlalchemy import migration @@ -41,12 +43,10 @@ from oslo_db.sqlalchemy import session as db_session from sqlalchemy.engine import reflection import sqlalchemy.exc from sqlalchemy import schema +from testtools import matchers from keystone.common import sql -from keystone.common.sql import migrate_repo from keystone.common.sql import migration_helpers -from keystone.contrib import federation -from keystone.contrib import revoke from keystone import exception from keystone.tests import unit from keystone.tests.unit import default_fixtures @@ -54,7 +54,6 @@ from keystone.tests.unit.ksfixtures import database CONF = cfg.CONF -DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id # NOTE(morganfainberg): This should be updated when each DB migration collapse # is done to mirror the expected structure of the DB in the format of @@ -67,8 +66,8 @@ INITIAL_TABLE_STRUCTURE = { 'id', 'name', 'enabled', 'extra', ], 'endpoint': [ - 'id', 'legacy_endpoint_id', 'interface', 'region', 'service_id', 'url', - 'enabled', 'extra', + 'id', 'legacy_endpoint_id', 'interface', 'region_id', 'service_id', + 'url', 'enabled', 'extra', ], 'group': [ 'id', 'domain_id', 'name', 'description', 'extra', @@ -78,6 +77,7 @@ INITIAL_TABLE_STRUCTURE = { ], 'project': [ 'id', 'name', 'extra', 'description', 'enabled', 'domain_id', + 'parent_id', ], 'role': [ 'id', 'name', 'extra', @@ -108,23 +108,82 @@ INITIAL_TABLE_STRUCTURE = { 'assignment': [ 'type', 'actor_id', 'target_id', 'role_id', 'inherited', ], -} - - -INITIAL_EXTENSION_TABLE_STRUCTURE = { - 'revocation_event': [ - 'id', 'domain_id', 'project_id', 'user_id', 'role_id', - 'trust_id', 'consumer_id', 'access_token_id', - 'issued_before', 'expires_at', 'revoked_at', 'audit_id', - 'audit_chain_id', + 'id_mapping': [ + 'public_id', 'domain_id', 'local_id', 'entity_type', + ], + 'whitelisted_config': [ + 'domain_id', 'group', 'option', 'value', + ], + 'sensitive_config': [ + 'domain_id', 'group', 'option', 'value', ], } -EXTENSIONS = {'federation': federation, - 'revoke': revoke} + +# Test migration_helpers.get_init_version separately to ensure it works before +# using in the SqlUpgrade tests. +class MigrationHelpersGetInitVersionTests(unit.TestCase): + @mock.patch.object(repository, 'Repository') + def test_get_init_version_no_path(self, repo): + migrate_versions = mock.MagicMock() + # make a version list starting with zero. `get_init_version` will + # return None for this value. + migrate_versions.versions.versions = list(range(0, 5)) + repo.return_value = migrate_versions + + # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid + # an exception. + with mock.patch('os.path.isdir', return_value=True): + # since 0 is the smallest version expect None + version = migration_helpers.get_init_version() + self.assertIsNone(version) + + # check that the default path was used as the first argument to the + # first invocation of repo. Cannot match the full path because it is + # based on where the test is run. + param = repo.call_args_list[0][0][0] + self.assertTrue(param.endswith('/sql/migrate_repo')) + + @mock.patch.object(repository, 'Repository') + def test_get_init_version_with_path_initial_version_0(self, repo): + migrate_versions = mock.MagicMock() + # make a version list starting with zero. `get_init_version` will + # return None for this value. + migrate_versions.versions.versions = list(range(0, 5)) + repo.return_value = migrate_versions + + # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid + # an exception. + with mock.patch('os.path.isdir', return_value=True): + path = '/keystone/migrate_repo/' + + # since 0 is the smallest version expect None + version = migration_helpers.get_init_version(abs_path=path) + self.assertIsNone(version) + + @mock.patch.object(repository, 'Repository') + def test_get_init_version_with_path(self, repo): + initial_version = 10 + + migrate_versions = mock.MagicMock() + migrate_versions.versions.versions = list(range(initial_version + 1, + initial_version + 5)) + repo.return_value = migrate_versions + + # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid + # an exception. + with mock.patch('os.path.isdir', return_value=True): + path = '/keystone/migrate_repo/' + + version = migration_helpers.get_init_version(abs_path=path) + self.assertEqual(initial_version, version) class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase): + # override this in subclasses. The default of zero covers tests such + # as extensions upgrades. + _initial_db_version = 0 + def initialize_sql(self): self.metadata = sqlalchemy.MetaData() self.metadata.bind = self.engine @@ -139,6 +198,7 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase): def setUp(self): super(SqlMigrateBase, self).setUp() + self.load_backends() database.initialize_sql_session() conn_str = CONF.database.connection if (conn_str != unit.IN_MEM_DB_CONN_STRING and @@ -155,7 +215,9 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase): connection='sqlite:///%s' % db_file) # create and share a single sqlalchemy engine for testing - self.engine = sql.get_engine() + with sql.session_for_write() as session: + self.engine = session.get_bind() + self.addCleanup(self.cleanup_instance('engine')) self.Session = db_session.get_maker(self.engine, autocommit=False) self.addCleanup(sqlalchemy.orm.session.Session.close_all) @@ -164,7 +226,8 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase): self.repo_package()) self.schema = versioning_api.ControlledSchema.create( self.engine, - self.repo_path, self.initial_db_version) + self.repo_path, + self._initial_db_version) # auto-detect the highest available schema version in the migrate_repo self.max_version = self.schema.repository.version().version @@ -229,6 +292,23 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase): else: raise AssertionError('Table "%s" already exists' % table_name) + def assertTableCountsMatch(self, table1_name, table2_name): + try: + table1 = self.select_table(table1_name) + except sqlalchemy.exc.NoSuchTableError: + raise AssertionError('Table "%s" does not exist' % table1_name) + try: + table2 = self.select_table(table2_name) + except sqlalchemy.exc.NoSuchTableError: + raise AssertionError('Table "%s" does not exist' % table2_name) + session = self.Session() + table1_count = session.execute(table1.count()).scalar() + table2_count = session.execute(table2.count()).scalar() + if table1_count != table2_count: + raise AssertionError('Table counts do not match: {0} ({1}), {2} ' + '({3})'.format(table1_name, table1_count, + table2_name, table2_count)) + def upgrade(self, *args, **kwargs): self._migrate(*args, **kwargs) @@ -257,50 +337,30 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase): self.assertItemsEqual(expected_cols, actual_cols, '%s table' % table_name) - @property - def initial_db_version(self): - return getattr(self, '_initial_db_version', 0) - class SqlUpgradeTests(SqlMigrateBase): - - _initial_db_version = migrate_repo.DB_INIT_VERSION + _initial_db_version = migration_helpers.get_init_version() def test_blank_db_to_start(self): self.assertTableDoesNotExist('user') def test_start_version_db_init_version(self): - version = migration.db_version(sql.get_engine(), self.repo_path, - migrate_repo.DB_INIT_VERSION) + with sql.session_for_write() as session: + version = migration.db_version(session.get_bind(), self.repo_path, + self._initial_db_version) self.assertEqual( - migrate_repo.DB_INIT_VERSION, + self._initial_db_version, version, - 'DB is not at version %s' % migrate_repo.DB_INIT_VERSION) + 'DB is not at version %s' % self._initial_db_version) def test_upgrade_add_initial_tables(self): - self.upgrade(migrate_repo.DB_INIT_VERSION + 1) + self.upgrade(self._initial_db_version + 1) self.check_initial_table_structure() def check_initial_table_structure(self): for table in INITIAL_TABLE_STRUCTURE: self.assertTableColumns(table, INITIAL_TABLE_STRUCTURE[table]) - # Ensure the default domain was properly created. - default_domain = migration_helpers.get_default_domain() - - meta = sqlalchemy.MetaData() - meta.bind = self.engine - - domain_table = sqlalchemy.Table('domain', meta, autoload=True) - - session = self.Session() - q = session.query(domain_table) - refs = q.all() - - self.assertEqual(1, len(refs)) - for k in default_domain.keys(): - self.assertEqual(default_domain[k], getattr(refs[0], k)) - def insert_dict(self, session, table_name, d, table=None): """Naively inserts key-value pairs into a table, given a dictionary.""" if table is None: @@ -312,127 +372,43 @@ class SqlUpgradeTests(SqlMigrateBase): session.execute(insert) session.commit() - def test_id_mapping(self): - self.upgrade(50) - self.assertTableDoesNotExist('id_mapping') - self.upgrade(51) - self.assertTableExists('id_mapping') - - def test_region_url_upgrade(self): - self.upgrade(52) - self.assertTableColumns('region', - ['id', 'description', 'parent_region_id', - 'extra', 'url']) - - def test_endpoint_region_upgrade_columns(self): - self.upgrade(53) - self.assertTableColumns('endpoint', - ['id', 'legacy_endpoint_id', 'interface', - 'service_id', 'url', 'extra', 'enabled', - 'region_id']) - region_table = sqlalchemy.Table('region', self.metadata, autoload=True) - self.assertEqual(255, region_table.c.id.type.length) - self.assertEqual(255, region_table.c.parent_region_id.type.length) - endpoint_table = sqlalchemy.Table('endpoint', - self.metadata, - autoload=True) - self.assertEqual(255, endpoint_table.c.region_id.type.length) - - def test_endpoint_region_migration(self): - self.upgrade(52) - session = self.Session() - _small_region_name = '0' * 30 - _long_region_name = '0' * 255 - _clashing_region_name = '0' * 70 - - def add_service(): - service_id = uuid.uuid4().hex - - service = { - 'id': service_id, - 'type': uuid.uuid4().hex - } - - self.insert_dict(session, 'service', service) - - return service_id - - def add_endpoint(service_id, region): - endpoint_id = uuid.uuid4().hex - - endpoint = { - 'id': endpoint_id, - 'interface': uuid.uuid4().hex[:8], - 'service_id': service_id, - 'url': uuid.uuid4().hex, - 'region': region - } - self.insert_dict(session, 'endpoint', endpoint) - - return endpoint_id - - _service_id_ = add_service() - add_endpoint(_service_id_, region=_long_region_name) - add_endpoint(_service_id_, region=_long_region_name) - add_endpoint(_service_id_, region=_clashing_region_name) - add_endpoint(_service_id_, region=_small_region_name) - add_endpoint(_service_id_, region=None) - - # upgrade to 53 - session.close() - self.upgrade(53) - session = self.Session() - self.metadata.clear() + def test_kilo_squash(self): + self.upgrade(67) - region_table = sqlalchemy.Table('region', self.metadata, autoload=True) - self.assertEqual(1, session.query(region_table). - filter_by(id=_long_region_name).count()) - self.assertEqual(1, session.query(region_table). - filter_by(id=_clashing_region_name).count()) - self.assertEqual(1, session.query(region_table). - filter_by(id=_small_region_name).count()) + # In 053 the size of ID and parent region ID columns were changed + table = sqlalchemy.Table('region', self.metadata, autoload=True) + self.assertEqual(255, table.c.id.type.length) + self.assertEqual(255, table.c.parent_region_id.type.length) + table = sqlalchemy.Table('endpoint', self.metadata, autoload=True) + self.assertEqual(255, table.c.region_id.type.length) - endpoint_table = sqlalchemy.Table('endpoint', - self.metadata, - autoload=True) - self.assertEqual(5, session.query(endpoint_table).count()) - self.assertEqual(2, session.query(endpoint_table). - filter_by(region_id=_long_region_name).count()) - self.assertEqual(1, session.query(endpoint_table). - filter_by(region_id=_clashing_region_name).count()) - self.assertEqual(1, session.query(endpoint_table). - filter_by(region_id=_small_region_name).count()) - - def test_add_actor_id_index(self): - self.upgrade(53) - self.upgrade(54) + # In 054 an index was created for the actor_id of the assignment table table = sqlalchemy.Table('assignment', self.metadata, autoload=True) index_data = [(idx.name, list(idx.columns.keys())) for idx in table.indexes] self.assertIn(('ix_actor_id', ['actor_id']), index_data) - def test_token_user_id_and_trust_id_index_upgrade(self): - self.upgrade(54) - self.upgrade(55) + # In 055 indexes were created for user and trust IDs in the token table table = sqlalchemy.Table('token', self.metadata, autoload=True) index_data = [(idx.name, list(idx.columns.keys())) for idx in table.indexes] self.assertIn(('ix_token_user_id', ['user_id']), index_data) self.assertIn(('ix_token_trust_id', ['trust_id']), index_data) - def test_project_parent_id_upgrade(self): - self.upgrade(61) - self.assertTableColumns('project', - ['id', 'name', 'extra', 'description', - 'enabled', 'domain_id', 'parent_id']) + # In 062 the role ID foreign key was removed from the assignment table + if self.engine.name == "mysql": + self.assertFalse(self.does_fk_exist('assignment', 'role_id')) - def test_drop_assignment_role_fk(self): - self.upgrade(61) - self.assertTrue(self.does_fk_exist('assignment', 'role_id')) - self.upgrade(62) + # In 064 the domain ID FK was removed from the group and user tables if self.engine.name != 'sqlite': # sqlite does not support FK deletions (or enforcement) - self.assertFalse(self.does_fk_exist('assignment', 'role_id')) + self.assertFalse(self.does_fk_exist('group', 'domain_id')) + self.assertFalse(self.does_fk_exist('user', 'domain_id')) + + # In 067 the role ID index was removed from the assignment table + if self.engine.name == "mysql": + self.assertFalse(self._does_index_exist('assignment', + 'assignment_role_id_fkey')) def test_insert_assignment_inherited_pk(self): ASSIGNMENT_TABLE_NAME = 'assignment' @@ -502,7 +478,6 @@ class SqlUpgradeTests(SqlMigrateBase): def does_pk_exist(self, table, pk_column): """Checks whether a column is primary key on a table.""" - inspector = reflection.Inspector.from_engine(self.engine) pk_columns = inspector.get_pk_constraint(table)['constrained_columns'] @@ -515,119 +490,164 @@ class SqlUpgradeTests(SqlMigrateBase): return True return False - def test_drop_region_url_upgrade(self): - self.upgrade(63) - self.assertTableColumns('region', - ['id', 'description', 'parent_region_id', - 'extra']) - - def test_domain_fk(self): - self.upgrade(63) - self.assertTrue(self.does_fk_exist('group', 'domain_id')) - self.assertTrue(self.does_fk_exist('user', 'domain_id')) - self.upgrade(64) - if self.engine.name != 'sqlite': - # sqlite does not support FK deletions (or enforcement) - self.assertFalse(self.does_fk_exist('group', 'domain_id')) - self.assertFalse(self.does_fk_exist('user', 'domain_id')) - - def test_add_domain_config(self): - whitelisted_table = 'whitelisted_config' - sensitive_table = 'sensitive_config' - self.upgrade(64) - self.assertTableDoesNotExist(whitelisted_table) - self.assertTableDoesNotExist(sensitive_table) - self.upgrade(65) - self.assertTableColumns(whitelisted_table, - ['domain_id', 'group', 'option', 'value']) - self.assertTableColumns(sensitive_table, - ['domain_id', 'group', 'option', 'value']) - - def test_fixup_service_name_value_upgrade(self): - """Update service name data from `extra` to empty string.""" - def add_service(**extra_data): - service_id = uuid.uuid4().hex - - service = { - 'id': service_id, - 'type': uuid.uuid4().hex, - 'extra': json.dumps(extra_data), - } - - self.insert_dict(session, 'service', service) - - return service_id - - self.upgrade(65) - session = self.Session() - - # Services with extra values having a random attribute and - # different combinations of name - random_attr_name = uuid.uuid4().hex - random_attr_value = uuid.uuid4().hex - random_attr_str = "%s='%s'" % (random_attr_name, random_attr_value) - random_attr_no_name = {random_attr_name: random_attr_value} - random_attr_no_name_str = "%s='%s'" % (random_attr_name, - random_attr_value) - random_attr_name_value = {random_attr_name: random_attr_value, - 'name': 'myname'} - random_attr_name_value_str = 'name=myname,%s' % random_attr_str - random_attr_name_empty = {random_attr_name: random_attr_value, - 'name': ''} - random_attr_name_empty_str = 'name=,%s' % random_attr_str - random_attr_name_none = {random_attr_name: random_attr_value, - 'name': None} - random_attr_name_none_str = 'name=None,%s' % random_attr_str - - services = [ - (add_service(**random_attr_no_name), - random_attr_name_empty, random_attr_no_name_str), - (add_service(**random_attr_name_value), - random_attr_name_value, random_attr_name_value_str), - (add_service(**random_attr_name_empty), - random_attr_name_empty, random_attr_name_empty_str), - (add_service(**random_attr_name_none), - random_attr_name_empty, random_attr_name_none_str), - ] - - # NOTE(viktors): Add a service with empty extra field - self.insert_dict(session, 'service', - {'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex}) - - session.close() - self.upgrade(66) - session = self.Session() - - # Verify that the services have the expected values. - self.metadata.clear() - service_table = sqlalchemy.Table('service', self.metadata, - autoload=True) - - def fetch_service_extra(service_id): - cols = [service_table.c.extra] - f = service_table.c.id == service_id - s = sqlalchemy.select(cols).where(f) - service = session.execute(s).fetchone() - return json.loads(service.extra) - - for service_id, exp_extra, msg in services: - extra = fetch_service_extra(service_id) - self.assertDictEqual(exp_extra, extra, msg) - - def _does_index_exist(self, table_name, index_name): + def does_index_exist(self, table_name, index_name): meta = sqlalchemy.MetaData(bind=self.engine) - table = sqlalchemy.Table('assignment', meta, autoload=True) + table = sqlalchemy.Table(table_name, meta, autoload=True) return index_name in [idx.name for idx in table.indexes] - def test_drop_assignment_role_id_index_mysql(self): - self.upgrade(66) - if self.engine.name == "mysql": - self.assertTrue(self._does_index_exist('assignment', - 'assignment_role_id_fkey')) - self.upgrade(67) - if self.engine.name == "mysql": - self.assertFalse(self._does_index_exist('assignment', - 'assignment_role_id_fkey')) + def does_constraint_exist(self, table_name, constraint_name): + meta = sqlalchemy.MetaData(bind=self.engine) + table = sqlalchemy.Table(table_name, meta, autoload=True) + return constraint_name in [con.name for con in table.constraints] + + def test_endpoint_policy_upgrade(self): + self.assertTableDoesNotExist('policy_association') + self.upgrade(81) + self.assertTableColumns('policy_association', + ['id', 'policy_id', 'endpoint_id', + 'service_id', 'region_id']) + + @mock.patch.object(migration_helpers, 'get_db_version', return_value=1) + def test_endpoint_policy_already_migrated(self, mock_ep): + + # By setting the return value to 1, the migration has already been + # run, and there's no need to create the table again + + self.upgrade(81) + + mock_ep.assert_called_once_with(extension='endpoint_policy', + engine=mock.ANY) + + # It won't exist because we are mocking it, but we can verify + # that 081 did not create the table + self.assertTableDoesNotExist('policy_association') + + def test_create_federation_tables(self): + self.identity_provider = 'identity_provider' + self.federation_protocol = 'federation_protocol' + self.service_provider = 'service_provider' + self.mapping = 'mapping' + self.remote_ids = 'idp_remote_ids' + + self.assertTableDoesNotExist(self.identity_provider) + self.assertTableDoesNotExist(self.federation_protocol) + self.assertTableDoesNotExist(self.service_provider) + self.assertTableDoesNotExist(self.mapping) + self.assertTableDoesNotExist(self.remote_ids) + + self.upgrade(82) + self.assertTableColumns(self.identity_provider, + ['id', 'description', 'enabled']) + + self.assertTableColumns(self.federation_protocol, + ['id', 'idp_id', 'mapping_id']) + + self.assertTableColumns(self.mapping, + ['id', 'rules']) + + self.assertTableColumns(self.service_provider, + ['id', 'description', 'enabled', 'auth_url', + 'relay_state_prefix', 'sp_url']) + + self.assertTableColumns(self.remote_ids, ['idp_id', 'remote_id']) + + federation_protocol = sqlalchemy.Table(self.federation_protocol, + self.metadata, + autoload=True) + self.assertFalse(federation_protocol.c.mapping_id.nullable) + + sp_table = sqlalchemy.Table(self.service_provider, + self.metadata, + autoload=True) + self.assertFalse(sp_table.c.auth_url.nullable) + self.assertFalse(sp_table.c.sp_url.nullable) + + @mock.patch.object(migration_helpers, 'get_db_version', return_value=8) + def test_federation_already_migrated(self, mock_federation): + + # By setting the return value to 8, the migration has already been + # run, and there's no need to create the table again. + self.upgrade(82) + + mock_federation.assert_any_call(extension='federation', + engine=mock.ANY) + + # It won't exist because we are mocking it, but we can verify + # that 082 did not create the table. + self.assertTableDoesNotExist('identity_provider') + self.assertTableDoesNotExist('federation_protocol') + self.assertTableDoesNotExist('mapping') + self.assertTableDoesNotExist('service_provider') + self.assertTableDoesNotExist('idp_remote_ids') + + def test_create_oauth_tables(self): + consumer = 'consumer' + request_token = 'request_token' + access_token = 'access_token' + self.assertTableDoesNotExist(consumer) + self.assertTableDoesNotExist(request_token) + self.assertTableDoesNotExist(access_token) + self.upgrade(83) + self.assertTableColumns(consumer, + ['id', + 'description', + 'secret', + 'extra']) + self.assertTableColumns(request_token, + ['id', + 'request_secret', + 'verifier', + 'authorizing_user_id', + 'requested_project_id', + 'role_ids', + 'consumer_id', + 'expires_at']) + self.assertTableColumns(access_token, + ['id', + 'access_secret', + 'authorizing_user_id', + 'project_id', + 'role_ids', + 'consumer_id', + 'expires_at']) + + @mock.patch.object(migration_helpers, 'get_db_version', return_value=5) + def test_oauth1_already_migrated(self, mock_oauth1): + + # By setting the return value to 5, the migration has already been + # run, and there's no need to create the table again. + self.upgrade(83) + + mock_oauth1.assert_any_call(extension='oauth1', engine=mock.ANY) + + # It won't exist because we are mocking it, but we can verify + # that 083 did not create the table. + self.assertTableDoesNotExist('consumer') + self.assertTableDoesNotExist('request_token') + self.assertTableDoesNotExist('access_token') + + def test_create_revoke_table(self): + self.assertTableDoesNotExist('revocation_event') + self.upgrade(84) + self.assertTableColumns('revocation_event', + ['id', 'domain_id', 'project_id', 'user_id', + 'role_id', 'trust_id', 'consumer_id', + 'access_token_id', 'issued_before', + 'expires_at', 'revoked_at', + 'audit_chain_id', 'audit_id']) + + @mock.patch.object(migration_helpers, 'get_db_version', return_value=2) + def test_revoke_already_migrated(self, mock_revoke): + + # By setting the return value to 2, the migration has already been + # run, and there's no need to create the table again. + self.upgrade(84) + + mock_revoke.assert_any_call(extension='revoke', engine=mock.ANY) + + # It won't exist because we are mocking it, but we can verify + # that 084 did not create the table. + self.assertTableDoesNotExist('revocation_event') def test_project_is_domain_upgrade(self): self.upgrade(74) @@ -636,6 +656,13 @@ class SqlUpgradeTests(SqlMigrateBase): 'enabled', 'domain_id', 'parent_id', 'is_domain']) + def test_implied_roles_upgrade(self): + self.upgrade(87) + self.assertTableColumns('implied_role', + ['prior_role_id', 'implied_role_id']) + self.assertTrue(self.does_fk_exist('implied_role', 'prior_role_id')) + self.assertTrue(self.does_fk_exist('implied_role', 'implied_role_id')) + def test_add_config_registration(self): config_registration = 'config_register' self.upgrade(74) @@ -643,136 +670,497 @@ class SqlUpgradeTests(SqlMigrateBase): self.upgrade(75) self.assertTableColumns(config_registration, ['type', 'domain_id']) - def populate_user_table(self, with_pass_enab=False, - with_pass_enab_domain=False): - # Populate the appropriate fields in the user - # table, depending on the parameters: - # - # Default: id, name, extra - # pass_enab: Add password, enabled as well - # pass_enab_domain: Add password, enabled and domain as well - # - this_table = sqlalchemy.Table("user", - self.metadata, - autoload=True) - for user in default_fixtures.USERS: - extra = copy.deepcopy(user) - extra.pop('id') - extra.pop('name') - - if with_pass_enab: - password = extra.pop('password', None) - enabled = extra.pop('enabled', True) - ins = this_table.insert().values( + def test_endpoint_filter_upgrade(self): + def assert_tables_columns_exist(): + self.assertTableColumns('project_endpoint', + ['endpoint_id', 'project_id']) + self.assertTableColumns('endpoint_group', + ['id', 'name', 'description', 'filters']) + self.assertTableColumns('project_endpoint_group', + ['endpoint_group_id', 'project_id']) + + self.assertTableDoesNotExist('project_endpoint') + self.upgrade(85) + assert_tables_columns_exist() + + @mock.patch.object(migration_helpers, 'get_db_version', return_value=2) + def test_endpoint_filter_already_migrated(self, mock_endpoint_filter): + + # By setting the return value to 2, the migration has already been + # run, and there's no need to create the table again. + self.upgrade(85) + + mock_endpoint_filter.assert_any_call(extension='endpoint_filter', + engine=mock.ANY) + + # It won't exist because we are mocking it, but we can verify + # that 085 did not create the table. + self.assertTableDoesNotExist('project_endpoint') + self.assertTableDoesNotExist('endpoint_group') + self.assertTableDoesNotExist('project_endpoint_group') + + def test_add_trust_unique_constraint_upgrade(self): + self.upgrade(86) + inspector = reflection.Inspector.from_engine(self.engine) + constraints = inspector.get_unique_constraints('trust') + constraint_names = [constraint['name'] for constraint in constraints] + self.assertIn('duplicate_trust_constraint', constraint_names) + + def test_add_domain_specific_roles(self): + """Check database upgraded successfully for domain specific roles. + + The following items need to be checked: + + - The domain_id column has been added + - That it has been added to the uniqueness constraints + - Existing roles have their domain_id columns set to the specific + string of '<<null>>' + + """ + NULL_DOMAIN_ID = '<<null>>' + + self.upgrade(87) + session = self.Session() + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + # Add a role before we upgrade, so we can check that its new domain_id + # attribute is handled correctly + role_id = uuid.uuid4().hex + self.insert_dict(session, 'role', + {'id': role_id, 'name': uuid.uuid4().hex}) + session.close() + + self.upgrade(88) + + session = self.Session() + self.metadata.clear() + self.assertTableColumns('role', ['id', 'name', 'domain_id', 'extra']) + # Check the domain_id has been added to the uniqueness constraint + inspector = reflection.Inspector.from_engine(self.engine) + constraints = inspector.get_unique_constraints('role') + constraint_columns = [ + constraint['column_names'] for constraint in constraints + if constraint['name'] == 'ixu_role_name_domain_id'] + self.assertIn('domain_id', constraint_columns[0]) + + # Now check our role has its domain_id attribute set correctly + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + cols = [role_table.c.domain_id] + filter = role_table.c.id == role_id + statement = sqlalchemy.select(cols).where(filter) + role_entry = session.execute(statement).fetchone() + self.assertEqual(NULL_DOMAIN_ID, role_entry[0]) + + def test_add_root_of_all_domains(self): + NULL_DOMAIN_ID = '<<keystone.domain.root>>' + self.upgrade(89) + session = self.Session() + + domain_table = sqlalchemy.Table( + 'domain', self.metadata, autoload=True) + query = session.query(domain_table).filter_by(id=NULL_DOMAIN_ID) + domain_from_db = query.one() + self.assertIn(NULL_DOMAIN_ID, domain_from_db) + + project_table = sqlalchemy.Table( + 'project', self.metadata, autoload=True) + query = session.query(project_table).filter_by(id=NULL_DOMAIN_ID) + project_from_db = query.one() + self.assertIn(NULL_DOMAIN_ID, project_from_db) + + session.close() + + def test_add_local_user_and_password_tables(self): + local_user_table = 'local_user' + password_table = 'password' + self.upgrade(89) + self.assertTableDoesNotExist(local_user_table) + self.assertTableDoesNotExist(password_table) + self.upgrade(90) + self.assertTableColumns(local_user_table, + ['id', + 'user_id', + 'domain_id', + 'name']) + self.assertTableColumns(password_table, + ['id', + 'local_user_id', + 'password']) + + def test_migrate_data_to_local_user_and_password_tables(self): + def get_expected_users(): + expected_users = [] + for test_user in default_fixtures.USERS: + user = {} + user['id'] = uuid.uuid4().hex + user['name'] = test_user['name'] + user['domain_id'] = test_user['domain_id'] + user['password'] = test_user['password'] + user['enabled'] = True + user['extra'] = json.dumps(uuid.uuid4().hex) + user['default_project_id'] = uuid.uuid4().hex + expected_users.append(user) + return expected_users + + def add_users_to_db(expected_users, user_table): + for user in expected_users: + ins = user_table.insert().values( {'id': user['id'], 'name': user['name'], - 'password': password, - 'enabled': bool(enabled), - 'extra': json.dumps(extra)}) - else: - if with_pass_enab_domain: - password = extra.pop('password', None) - enabled = extra.pop('enabled', True) - extra.pop('domain_id') - ins = this_table.insert().values( - {'id': user['id'], - 'name': user['name'], - 'domain_id': user['domain_id'], - 'password': password, - 'enabled': bool(enabled), - 'extra': json.dumps(extra)}) - else: - ins = this_table.insert().values( - {'id': user['id'], - 'name': user['name'], - 'extra': json.dumps(extra)}) - self.engine.execute(ins) - - def populate_tenant_table(self, with_desc_enab=False, - with_desc_enab_domain=False): - # Populate the appropriate fields in the tenant or - # project table, depending on the parameters - # - # Default: id, name, extra - # desc_enab: Add description, enabled as well - # desc_enab_domain: Add description, enabled and domain as well, - # plus use project instead of tenant - # - if with_desc_enab_domain: - # By this time tenants are now projects - this_table = sqlalchemy.Table("project", - self.metadata, + 'domain_id': user['domain_id'], + 'password': user['password'], + 'enabled': user['enabled'], + 'extra': user['extra'], + 'default_project_id': user['default_project_id']}) + ins.execute() + + def get_users_from_db(user_table, local_user_table, password_table): + sel = ( + sqlalchemy.select([user_table.c.id, + user_table.c.enabled, + user_table.c.extra, + user_table.c.default_project_id, + local_user_table.c.name, + local_user_table.c.domain_id, + password_table.c.password]) + .select_from(user_table.join(local_user_table, + user_table.c.id == + local_user_table.c.user_id) + .join(password_table, + local_user_table.c.id == + password_table.c.local_user_id)) + ) + user_rows = sel.execute() + users = [] + for row in user_rows: + users.append( + {'id': row['id'], + 'name': row['name'], + 'domain_id': row['domain_id'], + 'password': row['password'], + 'enabled': row['enabled'], + 'extra': row['extra'], + 'default_project_id': row['default_project_id']}) + return users + + meta = sqlalchemy.MetaData() + meta.bind = self.engine + + user_table_name = 'user' + local_user_table_name = 'local_user' + password_table_name = 'password' + + # populate current user table + self.upgrade(90) + user_table = sqlalchemy.Table(user_table_name, meta, autoload=True) + expected_users = get_expected_users() + add_users_to_db(expected_users, user_table) + + # upgrade to migration and test + self.upgrade(91) + self.assertTableCountsMatch(user_table_name, local_user_table_name) + self.assertTableCountsMatch(local_user_table_name, password_table_name) + meta.clear() + user_table = sqlalchemy.Table(user_table_name, meta, autoload=True) + local_user_table = sqlalchemy.Table(local_user_table_name, meta, + autoload=True) + password_table = sqlalchemy.Table(password_table_name, meta, autoload=True) + actual_users = get_users_from_db(user_table, local_user_table, + password_table) + self.assertListEqual(expected_users, actual_users) + + def test_migrate_user_with_null_password_to_password_tables(self): + USER_TABLE_NAME = 'user' + LOCAL_USER_TABLE_NAME = 'local_user' + PASSWORD_TABLE_NAME = 'password' + self.upgrade(90) + user_ref = unit.new_user_ref(uuid.uuid4().hex) + user_ref.pop('password') + # pop extra attribute which doesn't recognized by SQL expression + # layer. + user_ref.pop('email') + session = self.Session() + self.insert_dict(session, USER_TABLE_NAME, user_ref) + self.metadata.clear() + self.upgrade(91) + # migration should be successful. + self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME) + # no new entry was added to the password table because the + # user doesn't have a password. + password_table = self.select_table(PASSWORD_TABLE_NAME) + rows = session.execute(password_table.count()).scalar() + self.assertEqual(0, rows) + + def test_migrate_user_skip_user_already_exist_in_local_user(self): + USER_TABLE_NAME = 'user' + LOCAL_USER_TABLE_NAME = 'local_user' + self.upgrade(90) + user1_ref = unit.new_user_ref(uuid.uuid4().hex) + # pop extra attribute which doesn't recognized by SQL expression + # layer. + user1_ref.pop('email') + user2_ref = unit.new_user_ref(uuid.uuid4().hex) + user2_ref.pop('email') + session = self.Session() + self.insert_dict(session, USER_TABLE_NAME, user1_ref) + self.insert_dict(session, USER_TABLE_NAME, user2_ref) + user_id = user1_ref.pop('id') + user_name = user1_ref.pop('name') + domain_id = user1_ref.pop('domain_id') + local_user_ref = {'user_id': user_id, 'name': user_name, + 'domain_id': domain_id} + self.insert_dict(session, LOCAL_USER_TABLE_NAME, local_user_ref) + self.metadata.clear() + self.upgrade(91) + # migration should be successful and user2_ref has been migrated to + # `local_user` table. + self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME) + + def test_implied_roles_fk_on_delete_cascade(self): + if self.engine.name == 'sqlite': + self.skipTest('sqlite backend does not support foreign keys') + + self.upgrade(92) + + def _create_three_roles(): + id_list = [] + for _ in range(3): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + id_list.append(role['id']) + return id_list + + role_id_list = _create_three_roles() + self.role_api.create_implied_role(role_id_list[0], role_id_list[1]) + self.role_api.create_implied_role(role_id_list[0], role_id_list[2]) + + # assert that there are two roles implied by role 0. + implied_roles = self.role_api.list_implied_roles(role_id_list[0]) + self.assertThat(implied_roles, matchers.HasLength(2)) + + self.role_api.delete_role(role_id_list[0]) + # assert the cascade deletion is effective. + implied_roles = self.role_api.list_implied_roles(role_id_list[0]) + self.assertThat(implied_roles, matchers.HasLength(0)) + + def test_domain_as_project_upgrade(self): + + def _populate_domain_and_project_tables(session): + # Three domains, with various different attributes + self.domains = [{'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True, + 'extra': {'description': uuid.uuid4().hex, + 'another_attribute': True}}, + {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True, + 'extra': {'description': uuid.uuid4().hex}}, + {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': False}] + # Four projects, two top level, two children + self.projects = [] + self.projects.append(unit.new_project_ref( + domain_id=self.domains[0]['id'], + parent_id=None)) + self.projects.append(unit.new_project_ref( + domain_id=self.domains[0]['id'], + parent_id=self.projects[0]['id'])) + self.projects.append(unit.new_project_ref( + domain_id=self.domains[1]['id'], + parent_id=None)) + self.projects.append(unit.new_project_ref( + domain_id=self.domains[1]['id'], + parent_id=self.projects[2]['id'])) + + for domain in self.domains: + this_domain = domain.copy() + if 'extra' in this_domain: + this_domain['extra'] = json.dumps(this_domain['extra']) + self.insert_dict(session, 'domain', this_domain) + for project in self.projects: + self.insert_dict(session, 'project', project) + + def _check_projects(projects): + + def _assert_domain_matches_project(project): + for domain in self.domains: + if project.id == domain['id']: + self.assertEqual(domain['name'], project.name) + self.assertEqual(domain['enabled'], project.enabled) + if domain['id'] == self.domains[0]['id']: + self.assertEqual(domain['extra']['description'], + project.description) + self.assertEqual({'another_attribute': True}, + json.loads(project.extra)) + elif domain['id'] == self.domains[1]['id']: + self.assertEqual(domain['extra']['description'], + project.description) + self.assertEqual({}, json.loads(project.extra)) + + # We had domains 3 we created, which should now be projects acting + # as domains, To this we add the 4 original projects, plus the root + # of all domains row. + self.assertEqual(8, projects.count()) + + project_ids = [] + for project in projects: + if project.is_domain: + self.assertEqual(NULL_DOMAIN_ID, project.domain_id) + self.assertIsNone(project.parent_id) + else: + self.assertIsNotNone(project.domain_id) + self.assertIsNotNone(project.parent_id) + project_ids.append(project.id) + + for domain in self.domains: + self.assertIn(domain['id'], project_ids) + for project in self.projects: + self.assertIn(project['id'], project_ids) + + # Now check the attributes of the domains came across OK + for project in projects: + _assert_domain_matches_project(project) + + NULL_DOMAIN_ID = '<<keystone.domain.root>>' + self.upgrade(92) + + session = self.Session() + + _populate_domain_and_project_tables(session) + + self.upgrade(93) + proj_table = sqlalchemy.Table('project', self.metadata, autoload=True) + + projects = session.query(proj_table) + _check_projects(projects) + + def test_add_federated_user_table(self): + federated_user_table = 'federated_user' + self.upgrade(93) + self.assertTableDoesNotExist(federated_user_table) + self.upgrade(94) + self.assertTableColumns(federated_user_table, + ['id', + 'user_id', + 'idp_id', + 'protocol_id', + 'unique_id', + 'display_name']) + + def test_add_int_pkey_to_revocation_event_table(self): + meta = sqlalchemy.MetaData() + meta.bind = self.engine + REVOCATION_EVENT_TABLE_NAME = 'revocation_event' + self.upgrade(94) + revocation_event_table = sqlalchemy.Table(REVOCATION_EVENT_TABLE_NAME, + meta, autoload=True) + # assert id column is a string (before) + self.assertEqual('VARCHAR(64)', str(revocation_event_table.c.id.type)) + self.upgrade(95) + meta.clear() + revocation_event_table = sqlalchemy.Table(REVOCATION_EVENT_TABLE_NAME, + meta, autoload=True) + # assert id column is an integer (after) + self.assertEqual('INTEGER', str(revocation_event_table.c.id.type)) + + def _add_unique_constraint_to_role_name(self, + constraint_name='ixu_role_name'): + meta = sqlalchemy.MetaData() + meta.bind = self.engine + role_table = sqlalchemy.Table('role', meta, autoload=True) + migrate.UniqueConstraint(role_table.c.name, + name=constraint_name).create() + + def _drop_unique_constraint_to_role_name(self, + constraint_name='ixu_role_name'): + role_table = sqlalchemy.Table('role', self.metadata, autoload=True) + migrate.UniqueConstraint(role_table.c.name, + name=constraint_name).drop() + + def test_migration_88_drops_unique_constraint(self): + self.upgrade(87) + if self.engine.name == 'mysql': + self.assertTrue(self.does_index_exist('role', 'ixu_role_name')) else: - this_table = sqlalchemy.Table("tenant", - self.metadata, - autoload=True) + self.assertTrue(self.does_constraint_exist('role', + 'ixu_role_name')) + self.upgrade(88) + if self.engine.name == 'mysql': + self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertFalse(self.does_constraint_exist('role', + 'ixu_role_name')) - for tenant in default_fixtures.TENANTS: - extra = copy.deepcopy(tenant) - extra.pop('id') - extra.pop('name') - - if with_desc_enab: - desc = extra.pop('description', None) - enabled = extra.pop('enabled', True) - ins = this_table.insert().values( - {'id': tenant['id'], - 'name': tenant['name'], - 'description': desc, - 'enabled': bool(enabled), - 'extra': json.dumps(extra)}) - else: - if with_desc_enab_domain: - desc = extra.pop('description', None) - enabled = extra.pop('enabled', True) - extra.pop('domain_id') - ins = this_table.insert().values( - {'id': tenant['id'], - 'name': tenant['name'], - 'domain_id': tenant['domain_id'], - 'description': desc, - 'enabled': bool(enabled), - 'extra': json.dumps(extra)}) - else: - ins = this_table.insert().values( - {'id': tenant['id'], - 'name': tenant['name'], - 'extra': json.dumps(extra)}) - self.engine.execute(ins) - - def _mysql_check_all_tables_innodb(self): - database = self.engine.url.database - - connection = self.engine.connect() - # sanity check - total = connection.execute("SELECT count(*) " - "from information_schema.TABLES " - "where TABLE_SCHEMA='%(database)s'" % - dict(database=database)) - self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?") - - noninnodb = connection.execute("SELECT table_name " - "from information_schema.TABLES " - "where TABLE_SCHEMA='%(database)s' " - "and ENGINE!='InnoDB' " - "and TABLE_NAME!='migrate_version'" % - dict(database=database)) - names = [x[0] for x in noninnodb] - self.assertEqual([], names, - "Non-InnoDB tables exist") - - connection.close() + def test_migration_88_inconsistent_constraint_name(self): + self.upgrade(87) + self._drop_unique_constraint_to_role_name() + + constraint_name = uuid.uuid4().hex + self._add_unique_constraint_to_role_name( + constraint_name=constraint_name) + + if self.engine.name == 'mysql': + self.assertTrue(self.does_index_exist('role', constraint_name)) + self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertTrue(self.does_constraint_exist('role', + constraint_name)) + self.assertFalse(self.does_constraint_exist('role', + 'ixu_role_name')) + + self.upgrade(88) + if self.engine.name == 'mysql': + self.assertFalse(self.does_index_exist('role', constraint_name)) + self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertFalse(self.does_constraint_exist('role', + constraint_name)) + self.assertFalse(self.does_constraint_exist('role', + 'ixu_role_name')) + + def test_migration_96(self): + self.upgrade(95) + if self.engine.name == 'mysql': + self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertFalse(self.does_constraint_exist('role', + 'ixu_role_name')) + + self.upgrade(96) + if self.engine.name == 'mysql': + self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertFalse(self.does_constraint_exist('role', + 'ixu_role_name')) + + def test_migration_96_constraint_exists(self): + self.upgrade(95) + self._add_unique_constraint_to_role_name() + + if self.engine.name == 'mysql': + self.assertTrue(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertTrue(self.does_constraint_exist('role', + 'ixu_role_name')) + + self.upgrade(96) + if self.engine.name == 'mysql': + self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) + else: + self.assertFalse(self.does_constraint_exist('role', + 'ixu_role_name')) class VersionTests(SqlMigrateBase): - _initial_db_version = migrate_repo.DB_INIT_VERSION + _initial_db_version = migration_helpers.get_init_version() def test_core_initial(self): """Get the version before migrated, it's the initial DB version.""" version = migration_helpers.get_db_version() - self.assertEqual(migrate_repo.DB_INIT_VERSION, version) + self.assertEqual(self._initial_db_version, version) def test_core_max(self): """When get the version after upgrading, it's the new version.""" @@ -793,97 +1181,15 @@ class VersionTests(SqlMigrateBase): migration_helpers.get_db_version, extension='federation') - def test_extension_initial(self): - """When get the initial version of an extension, it's 0.""" - for name, extension in EXTENSIONS.items(): - abs_path = migration_helpers.find_migrate_repo(extension) - migration.db_version_control(sql.get_engine(), abs_path) - version = migration_helpers.get_db_version(extension=name) - self.assertEqual(0, version, - 'Migrate version for %s is not 0' % name) - - def test_extension_migrated(self): - """When get the version after migrating an extension, it's not 0.""" - for name, extension in EXTENSIONS.items(): - abs_path = migration_helpers.find_migrate_repo(extension) - migration.db_version_control(sql.get_engine(), abs_path) - migration.db_sync(sql.get_engine(), abs_path) - version = migration_helpers.get_db_version(extension=name) - self.assertTrue( - version > 0, - "Version for %s didn't change after migrated?" % name) - # Verify downgrades cannot occur - self.assertRaises( - db_exception.DbMigrationError, - migration_helpers._sync_extension_repo, - extension=name, - version=0) - - def test_extension_federation_upgraded_values(self): - abs_path = migration_helpers.find_migrate_repo(federation) - migration.db_version_control(sql.get_engine(), abs_path) - migration.db_sync(sql.get_engine(), abs_path, version=6) - idp_table = sqlalchemy.Table("identity_provider", - self.metadata, - autoload=True) - idps = [{'id': uuid.uuid4().hex, - 'enabled': True, - 'description': uuid.uuid4().hex, - 'remote_id': uuid.uuid4().hex}, - {'id': uuid.uuid4().hex, - 'enabled': True, - 'description': uuid.uuid4().hex, - 'remote_id': uuid.uuid4().hex}] - for idp in idps: - ins = idp_table.insert().values({'id': idp['id'], - 'enabled': idp['enabled'], - 'description': idp['description'], - 'remote_id': idp['remote_id']}) - self.engine.execute(ins) - migration.db_sync(sql.get_engine(), abs_path) - idp_remote_ids_table = sqlalchemy.Table("idp_remote_ids", - self.metadata, - autoload=True) - for idp in idps: - s = idp_remote_ids_table.select().where( - idp_remote_ids_table.c.idp_id == idp['id']) - remote = self.engine.execute(s).fetchone() - self.assertEqual(idp['remote_id'], - remote['remote_id'], - 'remote_ids must be preserved during the ' - 'migration from identity_provider table to ' - 'idp_remote_ids table') - def test_unexpected_extension(self): - """The version for an extension that doesn't exist raises ImportError. - - """ - + """The version for a non-existent extension raises ImportError.""" extension_name = uuid.uuid4().hex self.assertRaises(ImportError, migration_helpers.get_db_version, extension=extension_name) def test_unversioned_extension(self): - """The version for extensions without migrations raise an exception. - - """ - + """The version for extensions without migrations raise an exception.""" self.assertRaises(exception.MigrationNotProvided, migration_helpers.get_db_version, extension='admin_crud') - - def test_initial_with_extension_version_None(self): - """When performing a default migration, also migrate extensions.""" - migration_helpers.sync_database_to_version(extension=None, - version=None) - for table in INITIAL_EXTENSION_TABLE_STRUCTURE: - self.assertTableColumns(table, - INITIAL_EXTENSION_TABLE_STRUCTURE[table]) - - def test_initial_with_extension_version_max(self): - """When migrating to max version, do not migrate extensions.""" - migration_helpers.sync_database_to_version(extension=None, - version=self.max_version) - for table in INITIAL_EXTENSION_TABLE_STRUCTURE: - self.assertTableDoesNotExist(table) |