diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend_sql.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_backend_sql.py | 619 |
1 files changed, 361 insertions, 258 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend_sql.py b/keystone-moon/keystone/tests/unit/test_backend_sql.py index 69fac63a..2e703fff 100644 --- a/keystone-moon/keystone/tests/unit/test_backend_sql.py +++ b/keystone-moon/keystone/tests/unit/test_backend_sql.py @@ -29,22 +29,28 @@ from keystone.common import driver_hints from keystone.common import sql from keystone import exception from keystone.identity.backends import sql as identity_sql +from keystone import resource from keystone.tests import unit +from keystone.tests.unit.assignment import test_backends as assignment_tests +from keystone.tests.unit.catalog import test_backends as catalog_tests from keystone.tests.unit import default_fixtures +from keystone.tests.unit.identity import test_backends as identity_tests from keystone.tests.unit.ksfixtures import database -from keystone.tests.unit import test_backend +from keystone.tests.unit.policy import test_backends as policy_tests +from keystone.tests.unit.resource import test_backends as resource_tests +from keystone.tests.unit.token import test_backends as token_tests +from keystone.tests.unit.trust import test_backends as trust_tests from keystone.token.persistence.backends import sql as token_sql CONF = cfg.CONF -DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class SqlTests(unit.SQLDriverOverrides, unit.TestCase): def setUp(self): super(SqlTests, self).setUp() - self.useFixture(database.Database()) + self.useFixture(database.Database(self.sql_driver_version_overrides)) self.load_backends() # populate the engine with tables & fixtures @@ -124,14 +130,33 @@ class SqlModels(SqlTests): def test_user_model(self): cols = (('id', sql.String, 64), - ('name', sql.String, 255), - ('password', sql.String, 128), - ('domain_id', sql.String, 64), ('default_project_id', sql.String, 64), ('enabled', sql.Boolean, None), ('extra', sql.JsonBlob, None)) self.assertExpectedSchema('user', cols) + def test_local_user_model(self): + cols = (('id', sql.Integer, None), + ('user_id', sql.String, 64), + ('name', sql.String, 255), + ('domain_id', sql.String, 64)) + self.assertExpectedSchema('local_user', cols) + + def test_password_model(self): + cols = (('id', sql.Integer, None), + ('local_user_id', sql.Integer, None), + ('password', sql.String, 128)) + self.assertExpectedSchema('password', cols) + + def test_federated_user_model(self): + cols = (('id', sql.Integer, None), + ('user_id', sql.String, 64), + ('idp_id', sql.String, 64), + ('protocol_id', sql.String, 64), + ('unique_id', sql.String, 255), + ('display_name', sql.String, 255)) + self.assertExpectedSchema('federated_user', cols) + def test_group_model(self): cols = (('id', sql.String, 64), ('name', sql.String, 64), @@ -171,17 +196,58 @@ class SqlModels(SqlTests): ('user_id', sql.String, 64)) self.assertExpectedSchema('user_group_membership', cols) - -class SqlIdentity(SqlTests, test_backend.IdentityTests): + def test_revocation_event_model(self): + cols = (('id', sql.Integer, None), + ('domain_id', sql.String, 64), + ('project_id', sql.String, 64), + ('user_id', sql.String, 64), + ('role_id', sql.String, 64), + ('trust_id', sql.String, 64), + ('consumer_id', sql.String, 64), + ('access_token_id', sql.String, 64), + ('issued_before', sql.DateTime, None), + ('expires_at', sql.DateTime, None), + ('revoked_at', sql.DateTime, None), + ('audit_id', sql.String, 32), + ('audit_chain_id', sql.String, 32)) + self.assertExpectedSchema('revocation_event', cols) + + +class SqlIdentity(SqlTests, identity_tests.IdentityTests, + assignment_tests.AssignmentTests, + resource_tests.ResourceTests): def test_password_hashed(self): - session = sql.get_session() - user_ref = self.identity_api._get_user(session, self.user_foo['id']) - self.assertNotEqual(user_ref['password'], self.user_foo['password']) + with sql.session_for_read() as session: + user_ref = self.identity_api._get_user(session, + self.user_foo['id']) + self.assertNotEqual(self.user_foo['password'], + user_ref['password']) + + def test_create_user_with_null_password(self): + user_dict = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id) + user_dict["password"] = None + new_user_dict = self.identity_api.create_user(user_dict) + with sql.session_for_read() as session: + new_user_ref = self.identity_api._get_user(session, + new_user_dict['id']) + self.assertFalse(new_user_ref.local_user.passwords) + + def test_update_user_with_null_password(self): + user_dict = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id) + self.assertTrue(user_dict['password']) + new_user_dict = self.identity_api.create_user(user_dict) + new_user_dict["password"] = None + new_user_dict = self.identity_api.update_user(new_user_dict['id'], + new_user_dict) + with sql.session_for_read() as session: + new_user_ref = self.identity_api._get_user(session, + new_user_dict['id']) + self.assertFalse(new_user_ref.local_user.passwords) def test_delete_user_with_project_association(self): - user = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) self.assignment_api.add_user_to_project(self.tenant_bar['id'], user['id']) @@ -191,16 +257,15 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): user['id']) def test_create_null_user_name(self): - user = {'name': None, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} + user = unit.new_user_ref(name=None, + domain_id=CONF.identity.default_domain_id) self.assertRaises(exception.ValidationError, self.identity_api.create_user, user) self.assertRaises(exception.UserNotFound, self.identity_api.get_user_by_name, user['name'], - DEFAULT_DOMAIN_ID) + CONF.identity.default_domain_id) def test_create_user_case_sensitivity(self): # user name case sensitivity is down to the fact that it is marked as @@ -208,25 +273,59 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): # LDAP. # create a ref with a lowercase name - ref = { - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID} + ref = unit.new_user_ref(name=uuid.uuid4().hex.lower(), + domain_id=CONF.identity.default_domain_id) ref = self.identity_api.create_user(ref) # assign a new ID with the same name, but this time in uppercase ref['name'] = ref['name'].upper() self.identity_api.create_user(ref) + def test_create_federated_user_unique_constraint(self): + federated_dict = unit.new_federated_user_ref() + user_dict = self.shadow_users_api.create_federated_user(federated_dict) + user_dict = self.identity_api.get_user(user_dict["id"]) + self.assertIsNotNone(user_dict["id"]) + self.assertRaises(exception.Conflict, + self.shadow_users_api.create_federated_user, + federated_dict) + + def test_get_federated_user(self): + federated_dict = unit.new_federated_user_ref() + user_dict_create = self.shadow_users_api.create_federated_user( + federated_dict) + user_dict_get = self.shadow_users_api.get_federated_user( + federated_dict["idp_id"], + federated_dict["protocol_id"], + federated_dict["unique_id"]) + self.assertItemsEqual(user_dict_create, user_dict_get) + self.assertEqual(user_dict_create["id"], user_dict_get["id"]) + + def test_update_federated_user_display_name(self): + federated_dict = unit.new_federated_user_ref() + user_dict_create = self.shadow_users_api.create_federated_user( + federated_dict) + new_display_name = uuid.uuid4().hex + self.shadow_users_api.update_federated_user_display_name( + federated_dict["idp_id"], + federated_dict["protocol_id"], + federated_dict["unique_id"], + new_display_name) + user_ref = self.shadow_users_api._get_federated_user( + federated_dict["idp_id"], + federated_dict["protocol_id"], + federated_dict["unique_id"]) + self.assertEqual(user_ref.federated_users[0].display_name, + new_display_name) + self.assertEqual(user_dict_create["id"], user_ref.id) + def test_create_project_case_sensitivity(self): # project name case sensitivity is down to the fact that it is marked # as an SQL UNIQUE column, which may not be valid for other backends, # like LDAP. # create a ref with a lowercase name - ref = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex.lower(), - 'domain_id': DEFAULT_DOMAIN_ID} + ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id) self.resource_api.create_project(ref['id'], ref) # assign a new ID with the same name, but this time in uppercase @@ -235,25 +334,22 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): self.resource_api.create_project(ref['id'], ref) def test_create_null_project_name(self): - tenant = {'id': uuid.uuid4().hex, - 'name': None, - 'domain_id': DEFAULT_DOMAIN_ID} + project = unit.new_project_ref( + name=None, domain_id=CONF.identity.default_domain_id) self.assertRaises(exception.ValidationError, self.resource_api.create_project, - tenant['id'], - tenant) + project['id'], + project) self.assertRaises(exception.ProjectNotFound, self.resource_api.get_project, - tenant['id']) + project['id']) self.assertRaises(exception.ProjectNotFound, self.resource_api.get_project_by_name, - tenant['name'], - DEFAULT_DOMAIN_ID) + project['name'], + CONF.identity.default_domain_id) def test_delete_project_with_user_association(self): - user = {'name': 'fakeuser', - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'passwd'} + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) self.assignment_api.add_user_to_project(self.tenant_bar['id'], user['id']) @@ -261,52 +357,6 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): tenants = self.assignment_api.list_projects_for_user(user['id']) self.assertEqual([], tenants) - def test_metadata_removed_on_delete_user(self): - # A test to check that the internal representation - # or roles is correctly updated when a user is deleted - user = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'passwd'} - user = self.identity_api.create_user(user) - role = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - self.assignment_api.add_role_to_user_and_project( - user['id'], - self.tenant_bar['id'], - role['id']) - self.identity_api.delete_user(user['id']) - - # Now check whether the internal representation of roles - # has been deleted - self.assertRaises(exception.MetadataNotFound, - self.assignment_api._get_metadata, - user['id'], - self.tenant_bar['id']) - - def test_metadata_removed_on_delete_project(self): - # A test to check that the internal representation - # or roles is correctly updated when a project is deleted - user = {'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': 'passwd'} - user = self.identity_api.create_user(user) - role = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.role_api.create_role(role['id'], role) - self.assignment_api.add_role_to_user_and_project( - user['id'], - self.tenant_bar['id'], - role['id']) - self.resource_api.delete_project(self.tenant_bar['id']) - - # Now check whether the internal representation of roles - # has been deleted - self.assertRaises(exception.MetadataNotFound, - self.assignment_api._get_metadata, - user['id'], - self.tenant_bar['id']) - def test_update_project_returns_extra(self): """This tests for backwards-compatibility with an essex/folsom bug. @@ -317,20 +367,17 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): This behavior is specific to the SQL driver. """ - tenant_id = uuid.uuid4().hex arbitrary_key = uuid.uuid4().hex arbitrary_value = uuid.uuid4().hex - tenant = { - 'id': tenant_id, - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - arbitrary_key: arbitrary_value} - ref = self.resource_api.create_project(tenant_id, tenant) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project[arbitrary_key] = arbitrary_value + ref = self.resource_api.create_project(project['id'], project) self.assertEqual(arbitrary_value, ref[arbitrary_key]) self.assertIsNone(ref.get('extra')) - tenant['name'] = uuid.uuid4().hex - ref = self.resource_api.update_project(tenant_id, tenant) + ref['name'] = uuid.uuid4().hex + ref = self.resource_api.update_project(ref['id'], ref) self.assertEqual(arbitrary_value, ref[arbitrary_key]) self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key]) @@ -346,11 +393,9 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): """ arbitrary_key = uuid.uuid4().hex arbitrary_value = uuid.uuid4().hex - user = { - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex, - arbitrary_key: arbitrary_value} + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user[arbitrary_key] = arbitrary_value + del user["id"] ref = self.identity_api.create_user(user) self.assertEqual(arbitrary_value, ref[arbitrary_key]) self.assertIsNone(ref.get('password')) @@ -365,30 +410,25 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key]) def test_sql_user_to_dict_null_default_project_id(self): - user = { - 'name': uuid.uuid4().hex, - 'domain_id': DEFAULT_DOMAIN_ID, - 'password': uuid.uuid4().hex} - + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) - session = sql.get_session() - query = session.query(identity_sql.User) - query = query.filter_by(id=user['id']) - raw_user_ref = query.one() - self.assertIsNone(raw_user_ref.default_project_id) - user_ref = raw_user_ref.to_dict() - self.assertNotIn('default_project_id', user_ref) - session.close() + with sql.session_for_read() as session: + query = session.query(identity_sql.User) + query = query.filter_by(id=user['id']) + raw_user_ref = query.one() + self.assertIsNone(raw_user_ref.default_project_id) + user_ref = raw_user_ref.to_dict() + self.assertNotIn('default_project_id', user_ref) + session.close() def test_list_domains_for_user(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} + user = unit.new_user_ref(domain_id=domain['id']) - test_domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + test_domain1 = unit.new_domain_ref() self.resource_api.create_domain(test_domain1['id'], test_domain1) - test_domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + test_domain2 = unit.new_domain_ref() self.resource_api.create_domain(test_domain2['id'], test_domain2) user = self.identity_api.create_user(user) @@ -407,21 +447,20 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): # Create two groups each with a role on a different domain, and # make user1 a member of both groups. Both these new domains # should now be included, along with any direct user grants. - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} + user = unit.new_user_ref(domain_id=domain['id']) user = self.identity_api.create_user(user) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group1 = unit.new_group_ref(domain_id=domain['id']) group1 = self.identity_api.create_group(group1) - group2 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group2 = unit.new_group_ref(domain_id=domain['id']) group2 = self.identity_api.create_group(group2) - test_domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + test_domain1 = unit.new_domain_ref() self.resource_api.create_domain(test_domain1['id'], test_domain1) - test_domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + test_domain2 = unit.new_domain_ref() self.resource_api.create_domain(test_domain2['id'], test_domain2) - test_domain3 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + test_domain3 = unit.new_domain_ref() self.resource_api.create_domain(test_domain3['id'], test_domain3) self.identity_api.add_user_to_group(user['id'], group1['id']) @@ -451,17 +490,16 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): - When listing domains for user, neither domain should be returned """ - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain1 = unit.new_domain_ref() domain1 = self.resource_api.create_domain(domain1['id'], domain1) - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain2 = unit.new_domain_ref() domain2 = self.resource_api.create_domain(domain2['id'], domain2) - user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain1['id'], 'enabled': True} + user = unit.new_user_ref(domain_id=domain1['id']) user = self.identity_api.create_user(user) - group = {'name': uuid.uuid4().hex, 'domain_id': domain1['id']} + group = unit.new_group_ref(domain_id=domain1['id']) group = self.identity_api.create_group(group) self.identity_api.add_user_to_group(user['id'], group['id']) - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # Create a grant on each domain, one user grant, one group grant, @@ -480,25 +518,143 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests): # roles assignments. self.assertThat(user_domains, matchers.HasLength(0)) + def test_storing_null_domain_id_in_project_ref(self): + """Test the special storage of domain_id=None in sql resource driver. + + The resource driver uses a special value in place of None for domain_id + in the project record. This shouldn't escape the driver. Hence we test + the interface to ensure that you can store a domain_id of None, and + that any special value used inside the driver does not escape through + the interface. + + """ + spoiler_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(spoiler_project['id'], + spoiler_project) + + # First let's create a project with a None domain_id and make sure we + # can read it back. + project = unit.new_project_ref(domain_id=None, is_domain=True) + project = self.resource_api.create_project(project['id'], project) + ref = self.resource_api.get_project(project['id']) + self.assertDictEqual(project, ref) + + # Can we get it by name? + ref = self.resource_api.get_project_by_name(project['name'], None) + self.assertDictEqual(project, ref) + + # Can we filter for them - create a second domain to ensure we are + # testing the receipt of more than one. + project2 = unit.new_project_ref(domain_id=None, is_domain=True) + project2 = self.resource_api.create_project(project2['id'], project2) + hints = driver_hints.Hints() + hints.add_filter('domain_id', None) + refs = self.resource_api.list_projects(hints) + self.assertThat(refs, matchers.HasLength(2 + self.domain_count)) + self.assertIn(project, refs) + self.assertIn(project2, refs) + + # Can we update it? + project['name'] = uuid.uuid4().hex + self.resource_api.update_project(project['id'], project) + ref = self.resource_api.get_project(project['id']) + self.assertDictEqual(project, ref) + + # Finally, make sure we can delete it + project['enabled'] = False + self.resource_api.update_project(project['id'], project) + self.resource_api.delete_project(project['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project['id']) + + def test_hidden_project_domain_root_is_really_hidden(self): + """Ensure we cannot access the hidden root of all project domains. + + Calling any of the driver methods should result in the same as + would be returned if we passed a project that does not exist. We don't + test create_project, since we do not allow a caller of our API to + specify their own ID for a new entity. + + """ + def _exercise_project_api(ref_id): + driver = self.resource_api.driver + self.assertRaises(exception.ProjectNotFound, + driver.get_project, + ref_id) + + self.assertRaises(exception.ProjectNotFound, + driver.get_project_by_name, + resource.NULL_DOMAIN_ID, + ref_id) + + project_ids = [x['id'] for x in + driver.list_projects(driver_hints.Hints())] + self.assertNotIn(ref_id, project_ids) + + projects = driver.list_projects_from_ids([ref_id]) + self.assertThat(projects, matchers.HasLength(0)) -class SqlTrust(SqlTests, test_backend.TrustTests): + project_ids = [x for x in + driver.list_project_ids_from_domain_ids([ref_id])] + self.assertNotIn(ref_id, project_ids) + + self.assertRaises(exception.DomainNotFound, + driver.list_projects_in_domain, + ref_id) + + project_ids = [ + x['id'] for x in + driver.list_projects_acting_as_domain(driver_hints.Hints())] + self.assertNotIn(ref_id, project_ids) + + projects = driver.list_projects_in_subtree(ref_id) + self.assertThat(projects, matchers.HasLength(0)) + + self.assertRaises(exception.ProjectNotFound, + driver.list_project_parents, + ref_id) + + # A non-existing project just returns True from the driver + self.assertTrue(driver.is_leaf_project(ref_id)) + + self.assertRaises(exception.ProjectNotFound, + driver.update_project, + ref_id, + {}) + + self.assertRaises(exception.ProjectNotFound, + driver.delete_project, + ref_id) + + # Deleting list of projects that includes a non-existing project + # should be silent + driver.delete_projects_from_ids([ref_id]) + + _exercise_project_api(uuid.uuid4().hex) + _exercise_project_api(resource.NULL_DOMAIN_ID) + + +class SqlTrust(SqlTests, trust_tests.TrustTests): pass -class SqlToken(SqlTests, test_backend.TokenTests): +class SqlToken(SqlTests, token_tests.TokenTests): def test_token_revocation_list_uses_right_columns(self): # This query used to be heavy with too many columns. We want # to make sure it is only running with the minimum columns # necessary. expected_query_args = (token_sql.TokenModel.id, - token_sql.TokenModel.expires) + token_sql.TokenModel.expires, + token_sql.TokenModel.extra,) with mock.patch.object(token_sql, 'sql') as mock_sql: tok = token_sql.Token() tok.list_revoked_tokens() - mock_query = mock_sql.get_session().query + mock_query = mock_sql.session_for_read().__enter__().query mock_query.assert_called_with(*expected_query_args) def test_flush_expired_tokens_batch(self): @@ -523,8 +679,12 @@ class SqlToken(SqlTests, test_backend.TokenTests): # other tests below test the differences between how they use the batch # strategy with mock.patch.object(token_sql, 'sql') as mock_sql: - mock_sql.get_session().query().filter().delete.return_value = 0 - mock_sql.get_session().bind.dialect.name = 'mysql' + mock_sql.session_for_write().__enter__( + ).query().filter().delete.return_value = 0 + + mock_sql.session_for_write().__enter__( + ).bind.dialect.name = 'mysql' + tok = token_sql.Token() expiry_mock = mock.Mock() ITERS = [1, 2, 3] @@ -535,7 +695,10 @@ class SqlToken(SqlTests, test_backend.TokenTests): # The expiry strategy is only invoked once, the other calls are via # the yield return. self.assertEqual(1, expiry_mock.call_count) - mock_delete = mock_sql.get_session().query().filter().delete + + mock_delete = mock_sql.session_for_write().__enter__( + ).query().filter().delete + self.assertThat(mock_delete.call_args_list, matchers.HasLength(len(ITERS))) @@ -550,12 +713,12 @@ class SqlToken(SqlTests, test_backend.TokenTests): if i == 0: # The first time the batch iterator returns, it should return # the first result that comes back from the database. - self.assertEqual(x, 'test') + self.assertEqual('test', x) elif i == 1: # The second time, the database range function should return # nothing, so the batch iterator returns the result of the # upper_bound function - self.assertEqual(x, "final value") + self.assertEqual("final value", x) else: self.fail("range batch function returned more than twice") @@ -568,39 +731,30 @@ class SqlToken(SqlTests, test_backend.TokenTests): tok = token_sql.Token() db2_strategy = tok._expiry_range_strategy('ibm_db_sa') self.assertIsInstance(db2_strategy, functools.partial) - self.assertEqual(db2_strategy.func, token_sql._expiry_range_batched) - self.assertEqual(db2_strategy.keywords, {'batch_size': 100}) + self.assertEqual(token_sql._expiry_range_batched, db2_strategy.func) + self.assertEqual({'batch_size': 100}, db2_strategy.keywords) def test_expiry_range_strategy_mysql(self): tok = token_sql.Token() mysql_strategy = tok._expiry_range_strategy('mysql') self.assertIsInstance(mysql_strategy, functools.partial) - self.assertEqual(mysql_strategy.func, token_sql._expiry_range_batched) - self.assertEqual(mysql_strategy.keywords, {'batch_size': 1000}) + self.assertEqual(token_sql._expiry_range_batched, mysql_strategy.func) + self.assertEqual({'batch_size': 1000}, mysql_strategy.keywords) -class SqlCatalog(SqlTests, test_backend.CatalogTests): +class SqlCatalog(SqlTests, catalog_tests.CatalogTests): _legacy_endpoint_id_in_endpoint = True _enabled_default_to_true_when_creating_endpoint = True def test_catalog_ignored_malformed_urls(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service.copy()) + service = unit.new_service_ref() + self.catalog_api.create_service(service['id'], service) malformed_url = "http://192.168.1.104:8774/v2/$(tenant)s" - endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': None, - 'service_id': service['id'], - 'interface': 'public', - 'url': malformed_url, - } + endpoint = unit.new_endpoint_ref(service_id=service['id'], + url=malformed_url, + region_id=None) self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy()) # NOTE(dstanek): there are no valid URLs, so nothing is in the catalog @@ -608,21 +762,11 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests): self.assertEqual({}, catalog) def test_get_catalog_with_empty_public_url(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service.copy()) - - endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': None, - 'interface': 'public', - 'url': '', - 'service_id': service['id'], - } + service = unit.new_service_ref() + self.catalog_api.create_service(service['id'], service) + + endpoint = unit.new_endpoint_ref(url='', service_id=service['id'], + region_id=None) self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy()) catalog = self.catalog_api.get_catalog('user', 'tenant') @@ -633,22 +777,12 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests): self.assertIsNone(catalog_endpoint.get('adminURL')) self.assertIsNone(catalog_endpoint.get('internalURL')) - def test_create_endpoint_region_404(self): - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } - self.catalog_api.create_service(service['id'], service.copy()) - - endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': uuid.uuid4().hex, - 'service_id': service['id'], - 'interface': 'public', - 'url': uuid.uuid4().hex, - } + def test_create_endpoint_region_returns_not_found(self): + service = unit.new_service_ref() + self.catalog_api.create_service(service['id'], service) + + endpoint = unit.new_endpoint_ref(region_id=uuid.uuid4().hex, + service_id=service['id']) self.assertRaises(exception.ValidationError, self.catalog_api.create_endpoint, @@ -656,21 +790,14 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests): endpoint.copy()) def test_create_region_invalid_id(self): - region = { - 'id': '0' * 256, - 'description': '', - 'extra': {}, - } + region = unit.new_region_ref(id='0' * 256) self.assertRaises(exception.StringLengthExceeded, self.catalog_api.create_region, - region.copy()) + region) def test_create_region_invalid_parent_id(self): - region = { - 'id': uuid.uuid4().hex, - 'parent_region_id': '0' * 256, - } + region = unit.new_region_ref(parent_region_id='0' * 256) self.assertRaises(exception.RegionNotFound, self.catalog_api.create_region, @@ -678,77 +805,57 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests): def test_delete_region_with_endpoint(self): # create a region - region = { - 'id': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } + region = unit.new_region_ref() self.catalog_api.create_region(region) # create a child region - child_region = { - 'id': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'parent_id': region['id'] - } + child_region = unit.new_region_ref(parent_region_id=region['id']) self.catalog_api.create_region(child_region) # create a service - service = { - 'id': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - } + service = unit.new_service_ref() self.catalog_api.create_service(service['id'], service) # create an endpoint attached to the service and child region - child_endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': child_region['id'], - 'interface': uuid.uuid4().hex[:8], - 'url': uuid.uuid4().hex, - 'service_id': service['id'], - } + child_endpoint = unit.new_endpoint_ref(region_id=child_region['id'], + service_id=service['id']) + self.catalog_api.create_endpoint(child_endpoint['id'], child_endpoint) self.assertRaises(exception.RegionDeletionError, self.catalog_api.delete_region, child_region['id']) # create an endpoint attached to the service and parent region - endpoint = { - 'id': uuid.uuid4().hex, - 'region_id': region['id'], - 'interface': uuid.uuid4().hex[:8], - 'url': uuid.uuid4().hex, - 'service_id': service['id'], - } + endpoint = unit.new_endpoint_ref(region_id=region['id'], + service_id=service['id']) + self.catalog_api.create_endpoint(endpoint['id'], endpoint) self.assertRaises(exception.RegionDeletionError, self.catalog_api.delete_region, region['id']) -class SqlPolicy(SqlTests, test_backend.PolicyTests): +class SqlPolicy(SqlTests, policy_tests.PolicyTests): pass -class SqlInheritance(SqlTests, test_backend.InheritanceTests): +class SqlInheritance(SqlTests, assignment_tests.InheritanceTests): pass -class SqlTokenCacheInvalidation(SqlTests, test_backend.TokenCacheInvalidation): +class SqlImpliedRoles(SqlTests, assignment_tests.ImpliedRoleTests): + pass + + +class SqlTokenCacheInvalidation(SqlTests, token_tests.TokenCacheInvalidation): def setUp(self): super(SqlTokenCacheInvalidation, self).setUp() self._create_test_data() -class SqlFilterTests(SqlTests, test_backend.FilterTests): - - def _get_user_name_field_size(self): - return identity_sql.User.name.type.length +class SqlFilterTests(SqlTests, identity_tests.FilterTests): def clean_up_entities(self): """Clean up entity test data from Filter Test Cases.""" - for entity in ['user', 'group', 'project']: self._delete_test_data(entity, self.entity_list[entity]) self._delete_test_data(entity, self.domain1_entity_list[entity]) @@ -760,11 +867,12 @@ class SqlFilterTests(SqlTests, test_backend.FilterTests): del self.domain1 def test_list_entities_filtered_by_domain(self): - # NOTE(henry-nash): This method is here rather than in test_backend - # since any domain filtering with LDAP is handled by the manager - # layer (and is already tested elsewhere) not at the driver level. + # NOTE(henry-nash): This method is here rather than in + # unit.identity.test_backends since any domain filtering with LDAP is + # handled by the manager layer (and is already tested elsewhere) not at + # the driver level. self.addCleanup(self.clean_up_entities) - self.domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.domain1 = unit.new_domain_ref() self.resource_api.create_domain(self.domain1['id'], self.domain1) self.entity_list = {} @@ -804,7 +912,7 @@ class SqlFilterTests(SqlTests, test_backend.FilterTests): # See if we can add a SQL command...use the group table instead of the # user table since 'user' is reserved word for SQLAlchemy. - group = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID} + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group = self.identity_api.create_group(group) hints = driver_hints.Hints() @@ -816,10 +924,10 @@ class SqlFilterTests(SqlTests, test_backend.FilterTests): self.assertTrue(len(groups) > 0) -class SqlLimitTests(SqlTests, test_backend.LimitTests): +class SqlLimitTests(SqlTests, identity_tests.LimitTests): def setUp(self): super(SqlLimitTests, self).setUp() - test_backend.LimitTests.setUp(self) + identity_tests.LimitTests.setUp(self) class FakeTable(sql.ModelBase): @@ -850,11 +958,6 @@ class SqlDecorators(unit.TestCase): tt = FakeTable(col='a') self.assertEqual('a', tt.col) - def test_non_ascii_init(self): - # NOTE(I159): Non ASCII characters must cause UnicodeDecodeError - # if encoding is not provided explicitly. - self.assertRaises(UnicodeDecodeError, FakeTable, col='Я') - def test_conflict_happend(self): self.assertRaises(exception.Conflict, FakeTable().insert) self.assertRaises(exception.UnexpectedError, FakeTable().update) @@ -876,21 +979,15 @@ class SqlModuleInitialization(unit.TestCase): class SqlCredential(SqlTests): def _create_credential_with_user_id(self, user_id=uuid.uuid4().hex): - credential_id = uuid.uuid4().hex - new_credential = { - 'id': credential_id, - 'user_id': user_id, - 'project_id': uuid.uuid4().hex, - 'blob': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'extra': uuid.uuid4().hex - } - self.credential_api.create_credential(credential_id, new_credential) - return new_credential + credential = unit.new_credential_ref(user_id=user_id, + extra=uuid.uuid4().hex, + type=uuid.uuid4().hex) + self.credential_api.create_credential(credential['id'], credential) + return credential def _validateCredentialList(self, retrieved_credentials, expected_credentials): - self.assertEqual(len(retrieved_credentials), len(expected_credentials)) + self.assertEqual(len(expected_credentials), len(retrieved_credentials)) retrived_ids = [c['id'] for c in retrieved_credentials] for cred in expected_credentials: self.assertIn(cred['id'], retrived_ids) @@ -920,3 +1017,9 @@ class SqlCredential(SqlTests): credentials = self.credential_api.list_credentials_for_user( self.user_foo['id']) self._validateCredentialList(credentials, self.user_credentials) + + def test_list_credentials_for_user_and_type(self): + cred = self.user_credentials[0] + credentials = self.credential_api.list_credentials_for_user( + self.user_foo['id'], type=cred['type']) + self._validateCredentialList(credentials, [cred]) |