aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_backend_sql.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend_sql.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_backend_sql.py619
1 files changed, 361 insertions, 258 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend_sql.py b/keystone-moon/keystone/tests/unit/test_backend_sql.py
index 69fac63a..2e703fff 100644
--- a/keystone-moon/keystone/tests/unit/test_backend_sql.py
+++ b/keystone-moon/keystone/tests/unit/test_backend_sql.py
@@ -29,22 +29,28 @@ from keystone.common import driver_hints
from keystone.common import sql
from keystone import exception
from keystone.identity.backends import sql as identity_sql
+from keystone import resource
from keystone.tests import unit
+from keystone.tests.unit.assignment import test_backends as assignment_tests
+from keystone.tests.unit.catalog import test_backends as catalog_tests
from keystone.tests.unit import default_fixtures
+from keystone.tests.unit.identity import test_backends as identity_tests
from keystone.tests.unit.ksfixtures import database
-from keystone.tests.unit import test_backend
+from keystone.tests.unit.policy import test_backends as policy_tests
+from keystone.tests.unit.resource import test_backends as resource_tests
+from keystone.tests.unit.token import test_backends as token_tests
+from keystone.tests.unit.trust import test_backends as trust_tests
from keystone.token.persistence.backends import sql as token_sql
CONF = cfg.CONF
-DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class SqlTests(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
super(SqlTests, self).setUp()
- self.useFixture(database.Database())
+ self.useFixture(database.Database(self.sql_driver_version_overrides))
self.load_backends()
# populate the engine with tables & fixtures
@@ -124,14 +130,33 @@ class SqlModels(SqlTests):
def test_user_model(self):
cols = (('id', sql.String, 64),
- ('name', sql.String, 255),
- ('password', sql.String, 128),
- ('domain_id', sql.String, 64),
('default_project_id', sql.String, 64),
('enabled', sql.Boolean, None),
('extra', sql.JsonBlob, None))
self.assertExpectedSchema('user', cols)
+ def test_local_user_model(self):
+ cols = (('id', sql.Integer, None),
+ ('user_id', sql.String, 64),
+ ('name', sql.String, 255),
+ ('domain_id', sql.String, 64))
+ self.assertExpectedSchema('local_user', cols)
+
+ def test_password_model(self):
+ cols = (('id', sql.Integer, None),
+ ('local_user_id', sql.Integer, None),
+ ('password', sql.String, 128))
+ self.assertExpectedSchema('password', cols)
+
+ def test_federated_user_model(self):
+ cols = (('id', sql.Integer, None),
+ ('user_id', sql.String, 64),
+ ('idp_id', sql.String, 64),
+ ('protocol_id', sql.String, 64),
+ ('unique_id', sql.String, 255),
+ ('display_name', sql.String, 255))
+ self.assertExpectedSchema('federated_user', cols)
+
def test_group_model(self):
cols = (('id', sql.String, 64),
('name', sql.String, 64),
@@ -171,17 +196,58 @@ class SqlModels(SqlTests):
('user_id', sql.String, 64))
self.assertExpectedSchema('user_group_membership', cols)
-
-class SqlIdentity(SqlTests, test_backend.IdentityTests):
+ def test_revocation_event_model(self):
+ cols = (('id', sql.Integer, None),
+ ('domain_id', sql.String, 64),
+ ('project_id', sql.String, 64),
+ ('user_id', sql.String, 64),
+ ('role_id', sql.String, 64),
+ ('trust_id', sql.String, 64),
+ ('consumer_id', sql.String, 64),
+ ('access_token_id', sql.String, 64),
+ ('issued_before', sql.DateTime, None),
+ ('expires_at', sql.DateTime, None),
+ ('revoked_at', sql.DateTime, None),
+ ('audit_id', sql.String, 32),
+ ('audit_chain_id', sql.String, 32))
+ self.assertExpectedSchema('revocation_event', cols)
+
+
+class SqlIdentity(SqlTests, identity_tests.IdentityTests,
+ assignment_tests.AssignmentTests,
+ resource_tests.ResourceTests):
def test_password_hashed(self):
- session = sql.get_session()
- user_ref = self.identity_api._get_user(session, self.user_foo['id'])
- self.assertNotEqual(user_ref['password'], self.user_foo['password'])
+ with sql.session_for_read() as session:
+ user_ref = self.identity_api._get_user(session,
+ self.user_foo['id'])
+ self.assertNotEqual(self.user_foo['password'],
+ user_ref['password'])
+
+ def test_create_user_with_null_password(self):
+ user_dict = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ user_dict["password"] = None
+ new_user_dict = self.identity_api.create_user(user_dict)
+ with sql.session_for_read() as session:
+ new_user_ref = self.identity_api._get_user(session,
+ new_user_dict['id'])
+ self.assertFalse(new_user_ref.local_user.passwords)
+
+ def test_update_user_with_null_password(self):
+ user_dict = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.assertTrue(user_dict['password'])
+ new_user_dict = self.identity_api.create_user(user_dict)
+ new_user_dict["password"] = None
+ new_user_dict = self.identity_api.update_user(new_user_dict['id'],
+ new_user_dict)
+ with sql.session_for_read() as session:
+ new_user_ref = self.identity_api._get_user(session,
+ new_user_dict['id'])
+ self.assertFalse(new_user_ref.local_user.passwords)
def test_delete_user_with_project_association(self):
- user = {'name': uuid.uuid4().hex,
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': uuid.uuid4().hex}
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.assignment_api.add_user_to_project(self.tenant_bar['id'],
user['id'])
@@ -191,16 +257,15 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
user['id'])
def test_create_null_user_name(self):
- user = {'name': None,
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': uuid.uuid4().hex}
+ user = unit.new_user_ref(name=None,
+ domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
user)
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user_by_name,
user['name'],
- DEFAULT_DOMAIN_ID)
+ CONF.identity.default_domain_id)
def test_create_user_case_sensitivity(self):
# user name case sensitivity is down to the fact that it is marked as
@@ -208,25 +273,59 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
# LDAP.
# create a ref with a lowercase name
- ref = {
- 'name': uuid.uuid4().hex.lower(),
- 'domain_id': DEFAULT_DOMAIN_ID}
+ ref = unit.new_user_ref(name=uuid.uuid4().hex.lower(),
+ domain_id=CONF.identity.default_domain_id)
ref = self.identity_api.create_user(ref)
# assign a new ID with the same name, but this time in uppercase
ref['name'] = ref['name'].upper()
self.identity_api.create_user(ref)
+ def test_create_federated_user_unique_constraint(self):
+ federated_dict = unit.new_federated_user_ref()
+ user_dict = self.shadow_users_api.create_federated_user(federated_dict)
+ user_dict = self.identity_api.get_user(user_dict["id"])
+ self.assertIsNotNone(user_dict["id"])
+ self.assertRaises(exception.Conflict,
+ self.shadow_users_api.create_federated_user,
+ federated_dict)
+
+ def test_get_federated_user(self):
+ federated_dict = unit.new_federated_user_ref()
+ user_dict_create = self.shadow_users_api.create_federated_user(
+ federated_dict)
+ user_dict_get = self.shadow_users_api.get_federated_user(
+ federated_dict["idp_id"],
+ federated_dict["protocol_id"],
+ federated_dict["unique_id"])
+ self.assertItemsEqual(user_dict_create, user_dict_get)
+ self.assertEqual(user_dict_create["id"], user_dict_get["id"])
+
+ def test_update_federated_user_display_name(self):
+ federated_dict = unit.new_federated_user_ref()
+ user_dict_create = self.shadow_users_api.create_federated_user(
+ federated_dict)
+ new_display_name = uuid.uuid4().hex
+ self.shadow_users_api.update_federated_user_display_name(
+ federated_dict["idp_id"],
+ federated_dict["protocol_id"],
+ federated_dict["unique_id"],
+ new_display_name)
+ user_ref = self.shadow_users_api._get_federated_user(
+ federated_dict["idp_id"],
+ federated_dict["protocol_id"],
+ federated_dict["unique_id"])
+ self.assertEqual(user_ref.federated_users[0].display_name,
+ new_display_name)
+ self.assertEqual(user_dict_create["id"], user_ref.id)
+
def test_create_project_case_sensitivity(self):
# project name case sensitivity is down to the fact that it is marked
# as an SQL UNIQUE column, which may not be valid for other backends,
# like LDAP.
# create a ref with a lowercase name
- ref = {
- 'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex.lower(),
- 'domain_id': DEFAULT_DOMAIN_ID}
+ ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(ref['id'], ref)
# assign a new ID with the same name, but this time in uppercase
@@ -235,25 +334,22 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
self.resource_api.create_project(ref['id'], ref)
def test_create_null_project_name(self):
- tenant = {'id': uuid.uuid4().hex,
- 'name': None,
- 'domain_id': DEFAULT_DOMAIN_ID}
+ project = unit.new_project_ref(
+ name=None, domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ValidationError,
self.resource_api.create_project,
- tenant['id'],
- tenant)
+ project['id'],
+ project)
self.assertRaises(exception.ProjectNotFound,
self.resource_api.get_project,
- tenant['id'])
+ project['id'])
self.assertRaises(exception.ProjectNotFound,
self.resource_api.get_project_by_name,
- tenant['name'],
- DEFAULT_DOMAIN_ID)
+ project['name'],
+ CONF.identity.default_domain_id)
def test_delete_project_with_user_association(self):
- user = {'name': 'fakeuser',
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': 'passwd'}
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.assignment_api.add_user_to_project(self.tenant_bar['id'],
user['id'])
@@ -261,52 +357,6 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
tenants = self.assignment_api.list_projects_for_user(user['id'])
self.assertEqual([], tenants)
- def test_metadata_removed_on_delete_user(self):
- # A test to check that the internal representation
- # or roles is correctly updated when a user is deleted
- user = {'name': uuid.uuid4().hex,
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': 'passwd'}
- user = self.identity_api.create_user(user)
- role = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
- self.role_api.create_role(role['id'], role)
- self.assignment_api.add_role_to_user_and_project(
- user['id'],
- self.tenant_bar['id'],
- role['id'])
- self.identity_api.delete_user(user['id'])
-
- # Now check whether the internal representation of roles
- # has been deleted
- self.assertRaises(exception.MetadataNotFound,
- self.assignment_api._get_metadata,
- user['id'],
- self.tenant_bar['id'])
-
- def test_metadata_removed_on_delete_project(self):
- # A test to check that the internal representation
- # or roles is correctly updated when a project is deleted
- user = {'name': uuid.uuid4().hex,
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': 'passwd'}
- user = self.identity_api.create_user(user)
- role = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
- self.role_api.create_role(role['id'], role)
- self.assignment_api.add_role_to_user_and_project(
- user['id'],
- self.tenant_bar['id'],
- role['id'])
- self.resource_api.delete_project(self.tenant_bar['id'])
-
- # Now check whether the internal representation of roles
- # has been deleted
- self.assertRaises(exception.MetadataNotFound,
- self.assignment_api._get_metadata,
- user['id'],
- self.tenant_bar['id'])
-
def test_update_project_returns_extra(self):
"""This tests for backwards-compatibility with an essex/folsom bug.
@@ -317,20 +367,17 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
This behavior is specific to the SQL driver.
"""
- tenant_id = uuid.uuid4().hex
arbitrary_key = uuid.uuid4().hex
arbitrary_value = uuid.uuid4().hex
- tenant = {
- 'id': tenant_id,
- 'name': uuid.uuid4().hex,
- 'domain_id': DEFAULT_DOMAIN_ID,
- arbitrary_key: arbitrary_value}
- ref = self.resource_api.create_project(tenant_id, tenant)
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project[arbitrary_key] = arbitrary_value
+ ref = self.resource_api.create_project(project['id'], project)
self.assertEqual(arbitrary_value, ref[arbitrary_key])
self.assertIsNone(ref.get('extra'))
- tenant['name'] = uuid.uuid4().hex
- ref = self.resource_api.update_project(tenant_id, tenant)
+ ref['name'] = uuid.uuid4().hex
+ ref = self.resource_api.update_project(ref['id'], ref)
self.assertEqual(arbitrary_value, ref[arbitrary_key])
self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key])
@@ -346,11 +393,9 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
"""
arbitrary_key = uuid.uuid4().hex
arbitrary_value = uuid.uuid4().hex
- user = {
- 'name': uuid.uuid4().hex,
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': uuid.uuid4().hex,
- arbitrary_key: arbitrary_value}
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user[arbitrary_key] = arbitrary_value
+ del user["id"]
ref = self.identity_api.create_user(user)
self.assertEqual(arbitrary_value, ref[arbitrary_key])
self.assertIsNone(ref.get('password'))
@@ -365,30 +410,25 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
self.assertEqual(arbitrary_value, ref['extra'][arbitrary_key])
def test_sql_user_to_dict_null_default_project_id(self):
- user = {
- 'name': uuid.uuid4().hex,
- 'domain_id': DEFAULT_DOMAIN_ID,
- 'password': uuid.uuid4().hex}
-
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
- session = sql.get_session()
- query = session.query(identity_sql.User)
- query = query.filter_by(id=user['id'])
- raw_user_ref = query.one()
- self.assertIsNone(raw_user_ref.default_project_id)
- user_ref = raw_user_ref.to_dict()
- self.assertNotIn('default_project_id', user_ref)
- session.close()
+ with sql.session_for_read() as session:
+ query = session.query(identity_sql.User)
+ query = query.filter_by(id=user['id'])
+ raw_user_ref = query.one()
+ self.assertIsNone(raw_user_ref.default_project_id)
+ user_ref = raw_user_ref.to_dict()
+ self.assertNotIn('default_project_id', user_ref)
+ session.close()
def test_list_domains_for_user(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
- user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'domain_id': domain['id'], 'enabled': True}
+ user = unit.new_user_ref(domain_id=domain['id'])
- test_domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ test_domain1 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain1['id'], test_domain1)
- test_domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ test_domain2 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain2['id'], test_domain2)
user = self.identity_api.create_user(user)
@@ -407,21 +447,20 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
# Create two groups each with a role on a different domain, and
# make user1 a member of both groups. Both these new domains
# should now be included, along with any direct user grants.
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
- user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'domain_id': domain['id'], 'enabled': True}
+ user = unit.new_user_ref(domain_id=domain['id'])
user = self.identity_api.create_user(user)
- group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
- group2 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group2 = unit.new_group_ref(domain_id=domain['id'])
group2 = self.identity_api.create_group(group2)
- test_domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ test_domain1 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain1['id'], test_domain1)
- test_domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ test_domain2 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain2['id'], test_domain2)
- test_domain3 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ test_domain3 = unit.new_domain_ref()
self.resource_api.create_domain(test_domain3['id'], test_domain3)
self.identity_api.add_user_to_group(user['id'], group1['id'])
@@ -451,17 +490,16 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
- When listing domains for user, neither domain should be returned
"""
- domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain1 = unit.new_domain_ref()
domain1 = self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain2 = unit.new_domain_ref()
domain2 = self.resource_api.create_domain(domain2['id'], domain2)
- user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'domain_id': domain1['id'], 'enabled': True}
+ user = unit.new_user_ref(domain_id=domain1['id'])
user = self.identity_api.create_user(user)
- group = {'name': uuid.uuid4().hex, 'domain_id': domain1['id']}
+ group = unit.new_group_ref(domain_id=domain1['id'])
group = self.identity_api.create_group(group)
self.identity_api.add_user_to_group(user['id'], group['id'])
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# Create a grant on each domain, one user grant, one group grant,
@@ -480,25 +518,143 @@ class SqlIdentity(SqlTests, test_backend.IdentityTests):
# roles assignments.
self.assertThat(user_domains, matchers.HasLength(0))
+ def test_storing_null_domain_id_in_project_ref(self):
+ """Test the special storage of domain_id=None in sql resource driver.
+
+ The resource driver uses a special value in place of None for domain_id
+ in the project record. This shouldn't escape the driver. Hence we test
+ the interface to ensure that you can store a domain_id of None, and
+ that any special value used inside the driver does not escape through
+ the interface.
+
+ """
+ spoiler_project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(spoiler_project['id'],
+ spoiler_project)
+
+ # First let's create a project with a None domain_id and make sure we
+ # can read it back.
+ project = unit.new_project_ref(domain_id=None, is_domain=True)
+ project = self.resource_api.create_project(project['id'], project)
+ ref = self.resource_api.get_project(project['id'])
+ self.assertDictEqual(project, ref)
+
+ # Can we get it by name?
+ ref = self.resource_api.get_project_by_name(project['name'], None)
+ self.assertDictEqual(project, ref)
+
+ # Can we filter for them - create a second domain to ensure we are
+ # testing the receipt of more than one.
+ project2 = unit.new_project_ref(domain_id=None, is_domain=True)
+ project2 = self.resource_api.create_project(project2['id'], project2)
+ hints = driver_hints.Hints()
+ hints.add_filter('domain_id', None)
+ refs = self.resource_api.list_projects(hints)
+ self.assertThat(refs, matchers.HasLength(2 + self.domain_count))
+ self.assertIn(project, refs)
+ self.assertIn(project2, refs)
+
+ # Can we update it?
+ project['name'] = uuid.uuid4().hex
+ self.resource_api.update_project(project['id'], project)
+ ref = self.resource_api.get_project(project['id'])
+ self.assertDictEqual(project, ref)
+
+ # Finally, make sure we can delete it
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
+ self.resource_api.delete_project(project['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project['id'])
+
+ def test_hidden_project_domain_root_is_really_hidden(self):
+ """Ensure we cannot access the hidden root of all project domains.
+
+ Calling any of the driver methods should result in the same as
+ would be returned if we passed a project that does not exist. We don't
+ test create_project, since we do not allow a caller of our API to
+ specify their own ID for a new entity.
+
+ """
+ def _exercise_project_api(ref_id):
+ driver = self.resource_api.driver
+ self.assertRaises(exception.ProjectNotFound,
+ driver.get_project,
+ ref_id)
+
+ self.assertRaises(exception.ProjectNotFound,
+ driver.get_project_by_name,
+ resource.NULL_DOMAIN_ID,
+ ref_id)
+
+ project_ids = [x['id'] for x in
+ driver.list_projects(driver_hints.Hints())]
+ self.assertNotIn(ref_id, project_ids)
+
+ projects = driver.list_projects_from_ids([ref_id])
+ self.assertThat(projects, matchers.HasLength(0))
-class SqlTrust(SqlTests, test_backend.TrustTests):
+ project_ids = [x for x in
+ driver.list_project_ids_from_domain_ids([ref_id])]
+ self.assertNotIn(ref_id, project_ids)
+
+ self.assertRaises(exception.DomainNotFound,
+ driver.list_projects_in_domain,
+ ref_id)
+
+ project_ids = [
+ x['id'] for x in
+ driver.list_projects_acting_as_domain(driver_hints.Hints())]
+ self.assertNotIn(ref_id, project_ids)
+
+ projects = driver.list_projects_in_subtree(ref_id)
+ self.assertThat(projects, matchers.HasLength(0))
+
+ self.assertRaises(exception.ProjectNotFound,
+ driver.list_project_parents,
+ ref_id)
+
+ # A non-existing project just returns True from the driver
+ self.assertTrue(driver.is_leaf_project(ref_id))
+
+ self.assertRaises(exception.ProjectNotFound,
+ driver.update_project,
+ ref_id,
+ {})
+
+ self.assertRaises(exception.ProjectNotFound,
+ driver.delete_project,
+ ref_id)
+
+ # Deleting list of projects that includes a non-existing project
+ # should be silent
+ driver.delete_projects_from_ids([ref_id])
+
+ _exercise_project_api(uuid.uuid4().hex)
+ _exercise_project_api(resource.NULL_DOMAIN_ID)
+
+
+class SqlTrust(SqlTests, trust_tests.TrustTests):
pass
-class SqlToken(SqlTests, test_backend.TokenTests):
+class SqlToken(SqlTests, token_tests.TokenTests):
def test_token_revocation_list_uses_right_columns(self):
# This query used to be heavy with too many columns. We want
# to make sure it is only running with the minimum columns
# necessary.
expected_query_args = (token_sql.TokenModel.id,
- token_sql.TokenModel.expires)
+ token_sql.TokenModel.expires,
+ token_sql.TokenModel.extra,)
with mock.patch.object(token_sql, 'sql') as mock_sql:
tok = token_sql.Token()
tok.list_revoked_tokens()
- mock_query = mock_sql.get_session().query
+ mock_query = mock_sql.session_for_read().__enter__().query
mock_query.assert_called_with(*expected_query_args)
def test_flush_expired_tokens_batch(self):
@@ -523,8 +679,12 @@ class SqlToken(SqlTests, test_backend.TokenTests):
# other tests below test the differences between how they use the batch
# strategy
with mock.patch.object(token_sql, 'sql') as mock_sql:
- mock_sql.get_session().query().filter().delete.return_value = 0
- mock_sql.get_session().bind.dialect.name = 'mysql'
+ mock_sql.session_for_write().__enter__(
+ ).query().filter().delete.return_value = 0
+
+ mock_sql.session_for_write().__enter__(
+ ).bind.dialect.name = 'mysql'
+
tok = token_sql.Token()
expiry_mock = mock.Mock()
ITERS = [1, 2, 3]
@@ -535,7 +695,10 @@ class SqlToken(SqlTests, test_backend.TokenTests):
# The expiry strategy is only invoked once, the other calls are via
# the yield return.
self.assertEqual(1, expiry_mock.call_count)
- mock_delete = mock_sql.get_session().query().filter().delete
+
+ mock_delete = mock_sql.session_for_write().__enter__(
+ ).query().filter().delete
+
self.assertThat(mock_delete.call_args_list,
matchers.HasLength(len(ITERS)))
@@ -550,12 +713,12 @@ class SqlToken(SqlTests, test_backend.TokenTests):
if i == 0:
# The first time the batch iterator returns, it should return
# the first result that comes back from the database.
- self.assertEqual(x, 'test')
+ self.assertEqual('test', x)
elif i == 1:
# The second time, the database range function should return
# nothing, so the batch iterator returns the result of the
# upper_bound function
- self.assertEqual(x, "final value")
+ self.assertEqual("final value", x)
else:
self.fail("range batch function returned more than twice")
@@ -568,39 +731,30 @@ class SqlToken(SqlTests, test_backend.TokenTests):
tok = token_sql.Token()
db2_strategy = tok._expiry_range_strategy('ibm_db_sa')
self.assertIsInstance(db2_strategy, functools.partial)
- self.assertEqual(db2_strategy.func, token_sql._expiry_range_batched)
- self.assertEqual(db2_strategy.keywords, {'batch_size': 100})
+ self.assertEqual(token_sql._expiry_range_batched, db2_strategy.func)
+ self.assertEqual({'batch_size': 100}, db2_strategy.keywords)
def test_expiry_range_strategy_mysql(self):
tok = token_sql.Token()
mysql_strategy = tok._expiry_range_strategy('mysql')
self.assertIsInstance(mysql_strategy, functools.partial)
- self.assertEqual(mysql_strategy.func, token_sql._expiry_range_batched)
- self.assertEqual(mysql_strategy.keywords, {'batch_size': 1000})
+ self.assertEqual(token_sql._expiry_range_batched, mysql_strategy.func)
+ self.assertEqual({'batch_size': 1000}, mysql_strategy.keywords)
-class SqlCatalog(SqlTests, test_backend.CatalogTests):
+class SqlCatalog(SqlTests, catalog_tests.CatalogTests):
_legacy_endpoint_id_in_endpoint = True
_enabled_default_to_true_when_creating_endpoint = True
def test_catalog_ignored_malformed_urls(self):
- service = {
- 'id': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- }
- self.catalog_api.create_service(service['id'], service.copy())
+ service = unit.new_service_ref()
+ self.catalog_api.create_service(service['id'], service)
malformed_url = "http://192.168.1.104:8774/v2/$(tenant)s"
- endpoint = {
- 'id': uuid.uuid4().hex,
- 'region_id': None,
- 'service_id': service['id'],
- 'interface': 'public',
- 'url': malformed_url,
- }
+ endpoint = unit.new_endpoint_ref(service_id=service['id'],
+ url=malformed_url,
+ region_id=None)
self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
# NOTE(dstanek): there are no valid URLs, so nothing is in the catalog
@@ -608,21 +762,11 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
self.assertEqual({}, catalog)
def test_get_catalog_with_empty_public_url(self):
- service = {
- 'id': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- }
- self.catalog_api.create_service(service['id'], service.copy())
-
- endpoint = {
- 'id': uuid.uuid4().hex,
- 'region_id': None,
- 'interface': 'public',
- 'url': '',
- 'service_id': service['id'],
- }
+ service = unit.new_service_ref()
+ self.catalog_api.create_service(service['id'], service)
+
+ endpoint = unit.new_endpoint_ref(url='', service_id=service['id'],
+ region_id=None)
self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())
catalog = self.catalog_api.get_catalog('user', 'tenant')
@@ -633,22 +777,12 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
self.assertIsNone(catalog_endpoint.get('adminURL'))
self.assertIsNone(catalog_endpoint.get('internalURL'))
- def test_create_endpoint_region_404(self):
- service = {
- 'id': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- }
- self.catalog_api.create_service(service['id'], service.copy())
-
- endpoint = {
- 'id': uuid.uuid4().hex,
- 'region_id': uuid.uuid4().hex,
- 'service_id': service['id'],
- 'interface': 'public',
- 'url': uuid.uuid4().hex,
- }
+ def test_create_endpoint_region_returns_not_found(self):
+ service = unit.new_service_ref()
+ self.catalog_api.create_service(service['id'], service)
+
+ endpoint = unit.new_endpoint_ref(region_id=uuid.uuid4().hex,
+ service_id=service['id'])
self.assertRaises(exception.ValidationError,
self.catalog_api.create_endpoint,
@@ -656,21 +790,14 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
endpoint.copy())
def test_create_region_invalid_id(self):
- region = {
- 'id': '0' * 256,
- 'description': '',
- 'extra': {},
- }
+ region = unit.new_region_ref(id='0' * 256)
self.assertRaises(exception.StringLengthExceeded,
self.catalog_api.create_region,
- region.copy())
+ region)
def test_create_region_invalid_parent_id(self):
- region = {
- 'id': uuid.uuid4().hex,
- 'parent_region_id': '0' * 256,
- }
+ region = unit.new_region_ref(parent_region_id='0' * 256)
self.assertRaises(exception.RegionNotFound,
self.catalog_api.create_region,
@@ -678,77 +805,57 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
def test_delete_region_with_endpoint(self):
# create a region
- region = {
- 'id': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- }
+ region = unit.new_region_ref()
self.catalog_api.create_region(region)
# create a child region
- child_region = {
- 'id': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'parent_id': region['id']
- }
+ child_region = unit.new_region_ref(parent_region_id=region['id'])
self.catalog_api.create_region(child_region)
# create a service
- service = {
- 'id': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- }
+ service = unit.new_service_ref()
self.catalog_api.create_service(service['id'], service)
# create an endpoint attached to the service and child region
- child_endpoint = {
- 'id': uuid.uuid4().hex,
- 'region_id': child_region['id'],
- 'interface': uuid.uuid4().hex[:8],
- 'url': uuid.uuid4().hex,
- 'service_id': service['id'],
- }
+ child_endpoint = unit.new_endpoint_ref(region_id=child_region['id'],
+ service_id=service['id'])
+
self.catalog_api.create_endpoint(child_endpoint['id'], child_endpoint)
self.assertRaises(exception.RegionDeletionError,
self.catalog_api.delete_region,
child_region['id'])
# create an endpoint attached to the service and parent region
- endpoint = {
- 'id': uuid.uuid4().hex,
- 'region_id': region['id'],
- 'interface': uuid.uuid4().hex[:8],
- 'url': uuid.uuid4().hex,
- 'service_id': service['id'],
- }
+ endpoint = unit.new_endpoint_ref(region_id=region['id'],
+ service_id=service['id'])
+
self.catalog_api.create_endpoint(endpoint['id'], endpoint)
self.assertRaises(exception.RegionDeletionError,
self.catalog_api.delete_region,
region['id'])
-class SqlPolicy(SqlTests, test_backend.PolicyTests):
+class SqlPolicy(SqlTests, policy_tests.PolicyTests):
pass
-class SqlInheritance(SqlTests, test_backend.InheritanceTests):
+class SqlInheritance(SqlTests, assignment_tests.InheritanceTests):
pass
-class SqlTokenCacheInvalidation(SqlTests, test_backend.TokenCacheInvalidation):
+class SqlImpliedRoles(SqlTests, assignment_tests.ImpliedRoleTests):
+ pass
+
+
+class SqlTokenCacheInvalidation(SqlTests, token_tests.TokenCacheInvalidation):
def setUp(self):
super(SqlTokenCacheInvalidation, self).setUp()
self._create_test_data()
-class SqlFilterTests(SqlTests, test_backend.FilterTests):
-
- def _get_user_name_field_size(self):
- return identity_sql.User.name.type.length
+class SqlFilterTests(SqlTests, identity_tests.FilterTests):
def clean_up_entities(self):
"""Clean up entity test data from Filter Test Cases."""
-
for entity in ['user', 'group', 'project']:
self._delete_test_data(entity, self.entity_list[entity])
self._delete_test_data(entity, self.domain1_entity_list[entity])
@@ -760,11 +867,12 @@ class SqlFilterTests(SqlTests, test_backend.FilterTests):
del self.domain1
def test_list_entities_filtered_by_domain(self):
- # NOTE(henry-nash): This method is here rather than in test_backend
- # since any domain filtering with LDAP is handled by the manager
- # layer (and is already tested elsewhere) not at the driver level.
+ # NOTE(henry-nash): This method is here rather than in
+ # unit.identity.test_backends since any domain filtering with LDAP is
+ # handled by the manager layer (and is already tested elsewhere) not at
+ # the driver level.
self.addCleanup(self.clean_up_entities)
- self.domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.domain1 = unit.new_domain_ref()
self.resource_api.create_domain(self.domain1['id'], self.domain1)
self.entity_list = {}
@@ -804,7 +912,7 @@ class SqlFilterTests(SqlTests, test_backend.FilterTests):
# See if we can add a SQL command...use the group table instead of the
# user table since 'user' is reserved word for SQLAlchemy.
- group = {'name': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID}
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
hints = driver_hints.Hints()
@@ -816,10 +924,10 @@ class SqlFilterTests(SqlTests, test_backend.FilterTests):
self.assertTrue(len(groups) > 0)
-class SqlLimitTests(SqlTests, test_backend.LimitTests):
+class SqlLimitTests(SqlTests, identity_tests.LimitTests):
def setUp(self):
super(SqlLimitTests, self).setUp()
- test_backend.LimitTests.setUp(self)
+ identity_tests.LimitTests.setUp(self)
class FakeTable(sql.ModelBase):
@@ -850,11 +958,6 @@ class SqlDecorators(unit.TestCase):
tt = FakeTable(col='a')
self.assertEqual('a', tt.col)
- def test_non_ascii_init(self):
- # NOTE(I159): Non ASCII characters must cause UnicodeDecodeError
- # if encoding is not provided explicitly.
- self.assertRaises(UnicodeDecodeError, FakeTable, col='Я')
-
def test_conflict_happend(self):
self.assertRaises(exception.Conflict, FakeTable().insert)
self.assertRaises(exception.UnexpectedError, FakeTable().update)
@@ -876,21 +979,15 @@ class SqlModuleInitialization(unit.TestCase):
class SqlCredential(SqlTests):
def _create_credential_with_user_id(self, user_id=uuid.uuid4().hex):
- credential_id = uuid.uuid4().hex
- new_credential = {
- 'id': credential_id,
- 'user_id': user_id,
- 'project_id': uuid.uuid4().hex,
- 'blob': uuid.uuid4().hex,
- 'type': uuid.uuid4().hex,
- 'extra': uuid.uuid4().hex
- }
- self.credential_api.create_credential(credential_id, new_credential)
- return new_credential
+ credential = unit.new_credential_ref(user_id=user_id,
+ extra=uuid.uuid4().hex,
+ type=uuid.uuid4().hex)
+ self.credential_api.create_credential(credential['id'], credential)
+ return credential
def _validateCredentialList(self, retrieved_credentials,
expected_credentials):
- self.assertEqual(len(retrieved_credentials), len(expected_credentials))
+ self.assertEqual(len(expected_credentials), len(retrieved_credentials))
retrived_ids = [c['id'] for c in retrieved_credentials]
for cred in expected_credentials:
self.assertIn(cred['id'], retrived_ids)
@@ -920,3 +1017,9 @@ class SqlCredential(SqlTests):
credentials = self.credential_api.list_credentials_for_user(
self.user_foo['id'])
self._validateCredentialList(credentials, self.user_credentials)
+
+ def test_list_credentials_for_user_and_type(self):
+ cred = self.user_credentials[0]
+ credentials = self.credential_api.list_credentials_for_user(
+ self.user_foo['id'], type=cred['type'])
+ self._validateCredentialList(credentials, [cred])