aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/common
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common')
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_authorization.py161
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_ldap.py36
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_manager.py5
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_notifications.py329
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_sql_core.py10
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_utils.py48
6 files changed, 428 insertions, 161 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_authorization.py b/keystone-moon/keystone/tests/unit/common/test_authorization.py
new file mode 100644
index 00000000..73ddbc61
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/common/test_authorization.py
@@ -0,0 +1,161 @@
+# Copyright 2015 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import copy
+import uuid
+
+from keystone.common import authorization
+from keystone import exception
+from keystone.federation import constants as federation_constants
+from keystone.models import token_model
+from keystone.tests import unit
+from keystone.tests.unit import test_token_provider
+
+
+class TestTokenToAuthContext(unit.BaseTestCase):
+ def test_token_is_project_scoped_with_trust(self):
+ # Check auth_context result when the token is project-scoped and has
+ # trust info.
+
+ # SAMPLE_V3_TOKEN has OS-TRUST:trust in it.
+ token_data = test_token_provider.SAMPLE_V3_TOKEN
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ auth_context = authorization.token_to_auth_context(token)
+
+ self.assertEqual(token, auth_context['token'])
+ self.assertTrue(auth_context['is_delegated_auth'])
+ self.assertEqual(token_data['token']['user']['id'],
+ auth_context['user_id'])
+ self.assertEqual(token_data['token']['user']['domain']['id'],
+ auth_context['user_domain_id'])
+ self.assertEqual(token_data['token']['project']['id'],
+ auth_context['project_id'])
+ self.assertEqual(token_data['token']['project']['domain']['id'],
+ auth_context['project_domain_id'])
+ self.assertNotIn('domain_id', auth_context)
+ self.assertNotIn('domain_name', auth_context)
+ self.assertEqual(token_data['token']['OS-TRUST:trust']['id'],
+ auth_context['trust_id'])
+ self.assertEqual(
+ token_data['token']['OS-TRUST:trust']['trustor_user_id'],
+ auth_context['trustor_id'])
+ self.assertEqual(
+ token_data['token']['OS-TRUST:trust']['trustee_user_id'],
+ auth_context['trustee_id'])
+ self.assertItemsEqual(
+ [r['name'] for r in token_data['token']['roles']],
+ auth_context['roles'])
+ self.assertIsNone(auth_context['consumer_id'])
+ self.assertIsNone(auth_context['access_token_id'])
+ self.assertNotIn('group_ids', auth_context)
+
+ def test_token_is_domain_scoped(self):
+ # Check contents of auth_context when token is domain-scoped.
+ token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
+ del token_data['token']['project']
+
+ domain_id = uuid.uuid4().hex
+ domain_name = uuid.uuid4().hex
+ token_data['token']['domain'] = {'id': domain_id, 'name': domain_name}
+
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ auth_context = authorization.token_to_auth_context(token)
+
+ self.assertNotIn('project_id', auth_context)
+ self.assertNotIn('project_domain_id', auth_context)
+
+ self.assertEqual(domain_id, auth_context['domain_id'])
+ self.assertEqual(domain_name, auth_context['domain_name'])
+
+ def test_token_is_unscoped(self):
+ # Check contents of auth_context when the token is unscoped.
+ token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
+ del token_data['token']['project']
+
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ auth_context = authorization.token_to_auth_context(token)
+
+ self.assertNotIn('project_id', auth_context)
+ self.assertNotIn('project_domain_id', auth_context)
+ self.assertNotIn('domain_id', auth_context)
+ self.assertNotIn('domain_name', auth_context)
+
+ def test_token_is_for_federated_user(self):
+ # When the token is for a federated user then group_ids is in
+ # auth_context.
+ token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
+
+ group_ids = [uuid.uuid4().hex for x in range(1, 5)]
+
+ federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
+ 'protocol': {'id': 'saml2'},
+ 'groups': [{'id': gid} for gid in group_ids]}
+ token_data['token']['user'][federation_constants.FEDERATION] = (
+ federation_data)
+
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ auth_context = authorization.token_to_auth_context(token)
+
+ self.assertItemsEqual(group_ids, auth_context['group_ids'])
+
+ def test_oauth_variables_set_for_oauth_token(self):
+ token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
+ access_token_id = uuid.uuid4().hex
+ consumer_id = uuid.uuid4().hex
+ token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id,
+ 'consumer_id': consumer_id}
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ auth_context = authorization.token_to_auth_context(token)
+
+ self.assertEqual(access_token_id, auth_context['access_token_id'])
+ self.assertEqual(consumer_id, auth_context['consumer_id'])
+
+ def test_oauth_variables_not_set(self):
+ token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ auth_context = authorization.token_to_auth_context(token)
+
+ self.assertIsNone(auth_context['access_token_id'])
+ self.assertIsNone(auth_context['consumer_id'])
+
+ def test_token_is_not_KeystoneToken_raises_exception(self):
+ # If the token isn't a KeystoneToken then an UnexpectedError exception
+ # is raised.
+ self.assertRaises(exception.UnexpectedError,
+ authorization.token_to_auth_context, {})
+
+ def test_user_id_missing_in_token_raises_exception(self):
+ # If there's no user ID in the token then an Unauthorized
+ # exception is raised.
+ token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)
+ del token_data['token']['user']['id']
+
+ token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
+ token_data=token_data)
+
+ self.assertRaises(exception.Unauthorized,
+ authorization.token_to_auth_context, token)
diff --git a/keystone-moon/keystone/tests/unit/common/test_ldap.py b/keystone-moon/keystone/tests/unit/common/test_ldap.py
index e6e2c732..eed77286 100644
--- a/keystone-moon/keystone/tests/unit/common/test_ldap.py
+++ b/keystone-moon/keystone/tests/unit/common/test_ldap.py
@@ -27,6 +27,7 @@ from keystone.common.ldap import core as common_ldap_core
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import fakeldap
+from keystone.tests.unit.ksfixtures import database
CONF = cfg.CONF
@@ -195,8 +196,8 @@ class DnCompareTest(unit.BaseTestCase):
def test_startswith_unicode(self):
# dn_startswith accepts unicode.
- child = u'cn=cn=fäké,ou=OpenStäck'
- parent = 'ou=OpenStäck'
+ child = u'cn=fäké,ou=OpenStäck'
+ parent = u'ou=OpenStäck'
self.assertTrue(ks_ldap.dn_startswith(child, parent))
@@ -207,6 +208,8 @@ class LDAPDeleteTreeTest(unit.TestCase):
ks_ldap.register_handler('fake://',
fakeldap.FakeLdapNoSubtreeDelete)
+ self.useFixture(database.Database(self.sql_driver_version_overrides))
+
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -226,11 +229,11 @@ class LDAPDeleteTreeTest(unit.TestCase):
config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
return config_files
- def test_deleteTree(self):
+ def test_delete_tree(self):
"""Test manually deleting a tree.
Few LDAP servers support CONTROL_DELETETREE. This test
- exercises the alternate code paths in BaseLdap.deleteTree.
+ exercises the alternate code paths in BaseLdap.delete_tree.
"""
conn = self.identity_api.user.get_connection()
@@ -251,7 +254,7 @@ class LDAPDeleteTreeTest(unit.TestCase):
# cn=base
# cn=child,cn=base
# cn=grandchild,cn=child,cn=base
- # then attempt to deleteTree(cn=base)
+ # then attempt to delete_tree(cn=base)
base_id = 'base'
base_dn = create_entry(base_id)
child_dn = create_entry('child', base_dn)
@@ -273,8 +276,8 @@ class LDAPDeleteTreeTest(unit.TestCase):
self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF,
conn.delete_s, child_dn)
- # call our deleteTree implementation
- self.identity_api.user.deleteTree(base_id)
+ # call our delete_tree implementation
+ self.identity_api.user.delete_tree(base_id)
self.assertRaises(ldap.NO_SUCH_OBJECT,
conn.search_s, base_dn, ldap.SCOPE_BASE)
self.assertRaises(ldap.NO_SUCH_OBJECT,
@@ -283,6 +286,24 @@ class LDAPDeleteTreeTest(unit.TestCase):
conn.search_s, grandchild_dn, ldap.SCOPE_BASE)
+class MultiURLTests(unit.TestCase):
+ """Tests for setting multiple LDAP URLs."""
+
+ def test_multiple_urls_with_comma_no_conn_pool(self):
+ urls = 'ldap://localhost,ldap://backup.localhost'
+ self.config_fixture.config(group='ldap', url=urls, use_pool=False)
+ base_ldap = ks_ldap.BaseLdap(CONF)
+ ldap_connection = base_ldap.get_connection()
+ self.assertEqual(urls, ldap_connection.conn.conn._uri)
+
+ def test_multiple_urls_with_comma_with_conn_pool(self):
+ urls = 'ldap://localhost,ldap://backup.localhost'
+ self.config_fixture.config(group='ldap', url=urls, use_pool=True)
+ base_ldap = ks_ldap.BaseLdap(CONF)
+ ldap_connection = base_ldap.get_connection()
+ self.assertEqual(urls, ldap_connection.conn.conn_pool.uri)
+
+
class SslTlsTest(unit.TestCase):
"""Tests for the SSL/TLS functionality in keystone.common.ldap.core."""
@@ -359,6 +380,7 @@ class LDAPPagedResultsTest(unit.TestCase):
ks_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.addCleanup(common_ldap_core._HANDLERS.clear)
+ self.useFixture(database.Database(self.sql_driver_version_overrides))
self.load_backends()
self.load_fixtures(default_fixtures)
diff --git a/keystone-moon/keystone/tests/unit/common/test_manager.py b/keystone-moon/keystone/tests/unit/common/test_manager.py
index 1bc19763..7ef91e15 100644
--- a/keystone-moon/keystone/tests/unit/common/test_manager.py
+++ b/keystone-moon/keystone/tests/unit/common/test_manager.py
@@ -24,7 +24,7 @@ class TestCreateLegacyDriver(unit.BaseTestCase):
Driver = manager.create_legacy_driver(catalog.CatalogDriverV8)
# NOTE(dstanek): I want to subvert the requirement for this
- # class to implement all of the abstractmethods.
+ # class to implement all of the abstract methods.
Driver.__abstractmethods__ = set()
impl = Driver()
@@ -32,8 +32,9 @@ class TestCreateLegacyDriver(unit.BaseTestCase):
'as_of': 'Liberty',
'what': 'keystone.catalog.core.Driver',
'in_favor_of': 'keystone.catalog.core.CatalogDriverV8',
- 'remove_in': 'N',
+ 'remove_in': mock.ANY,
}
mock_reporter.assert_called_with(mock.ANY, mock.ANY, details)
+ self.assertEqual('N', mock_reporter.call_args[0][2]['remove_in'][0])
self.assertIsInstance(impl, catalog.CatalogDriverV8)
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py
index 1ad8d50d..aa2e6f72 100644
--- a/keystone-moon/keystone/tests/unit/common/test_notifications.py
+++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py
@@ -43,9 +43,7 @@ class ArbitraryException(Exception):
def register_callback(operation, resource_type=EXP_RESOURCE_TYPE):
- """Helper for creating and registering a mock callback.
-
- """
+ """Helper for creating and registering a mock callback."""
callback = mock.Mock(__name__='callback',
im_class=mock.Mock(__name__='class'))
notifications.register_event_callback(operation, resource_type, callback)
@@ -95,89 +93,14 @@ class AuditNotificationsTestCase(unit.BaseTestCase):
DISABLED_OPERATION)
-class NotificationsWrapperTestCase(unit.BaseTestCase):
- def create_fake_ref(self):
- resource_id = uuid.uuid4().hex
- return resource_id, {
- 'id': resource_id,
- 'key': uuid.uuid4().hex
- }
-
- @notifications.created(EXP_RESOURCE_TYPE)
- def create_resource(self, resource_id, data):
- return data
-
- def test_resource_created_notification(self):
- exp_resource_id, data = self.create_fake_ref()
- callback = register_callback(CREATED_OPERATION)
-
- self.create_resource(exp_resource_id, data)
- callback.assert_called_with('identity', EXP_RESOURCE_TYPE,
- CREATED_OPERATION,
- {'resource_info': exp_resource_id})
-
- @notifications.updated(EXP_RESOURCE_TYPE)
- def update_resource(self, resource_id, data):
- return data
-
- def test_resource_updated_notification(self):
- exp_resource_id, data = self.create_fake_ref()
- callback = register_callback(UPDATED_OPERATION)
-
- self.update_resource(exp_resource_id, data)
- callback.assert_called_with('identity', EXP_RESOURCE_TYPE,
- UPDATED_OPERATION,
- {'resource_info': exp_resource_id})
-
- @notifications.deleted(EXP_RESOURCE_TYPE)
- def delete_resource(self, resource_id):
- pass
-
- def test_resource_deleted_notification(self):
- exp_resource_id = uuid.uuid4().hex
- callback = register_callback(DELETED_OPERATION)
-
- self.delete_resource(exp_resource_id)
- callback.assert_called_with('identity', EXP_RESOURCE_TYPE,
- DELETED_OPERATION,
- {'resource_info': exp_resource_id})
-
- @notifications.created(EXP_RESOURCE_TYPE)
- def create_exception(self, resource_id):
- raise ArbitraryException()
-
- def test_create_exception_without_notification(self):
- callback = register_callback(CREATED_OPERATION)
- self.assertRaises(
- ArbitraryException, self.create_exception, uuid.uuid4().hex)
- self.assertFalse(callback.called)
-
- @notifications.created(EXP_RESOURCE_TYPE)
- def update_exception(self, resource_id):
- raise ArbitraryException()
-
- def test_update_exception_without_notification(self):
- callback = register_callback(UPDATED_OPERATION)
- self.assertRaises(
- ArbitraryException, self.update_exception, uuid.uuid4().hex)
- self.assertFalse(callback.called)
-
- @notifications.deleted(EXP_RESOURCE_TYPE)
- def delete_exception(self, resource_id):
- raise ArbitraryException()
-
- def test_delete_exception_without_notification(self):
- callback = register_callback(DELETED_OPERATION)
- self.assertRaises(
- ArbitraryException, self.delete_exception, uuid.uuid4().hex)
- self.assertFalse(callback.called)
-
-
class NotificationsTestCase(unit.BaseTestCase):
def test_send_notification(self):
- """Test the private method _send_notification to ensure event_type,
- payload, and context are built and passed properly.
+ """Test _send_notification.
+
+ Test the private method _send_notification to ensure event_type,
+ payload, and context are built and passed properly.
+
"""
resource = uuid.uuid4().hex
resource_type = EXP_RESOURCE_TYPE
@@ -203,6 +126,82 @@ class NotificationsTestCase(unit.BaseTestCase):
resource)
mocked.assert_called_once_with(*expected_args)
+ def test_send_notification_with_opt_out(self):
+ """Test the private method _send_notification with opt-out.
+
+ Test that _send_notification does not notify when a valid
+ notification_opt_out configuration is provided.
+ """
+ resource = uuid.uuid4().hex
+ resource_type = EXP_RESOURCE_TYPE
+ operation = CREATED_OPERATION
+ event_type = 'identity.%s.created' % resource_type
+
+ # NOTE(diazjf): Here we add notification_opt_out to the
+ # configuration so that we should return before _get_notifer is
+ # called. This is because we are opting out notifications for the
+ # passed resource_type and operation.
+ conf = self.useFixture(config_fixture.Config(CONF))
+ conf.config(notification_opt_out=event_type)
+
+ with mock.patch.object(notifications._get_notifier(),
+ '_notify') as mocked:
+
+ notifications._send_notification(operation, resource_type,
+ resource)
+ mocked.assert_not_called()
+
+ def test_send_audit_notification_with_opt_out(self):
+ """Test the private method _send_audit_notification with opt-out.
+
+ Test that _send_audit_notification does not notify when a valid
+ notification_opt_out configuration is provided.
+ """
+ resource_type = EXP_RESOURCE_TYPE
+
+ action = CREATED_OPERATION + '.' + resource_type
+ initiator = mock
+ target = mock
+ outcome = 'success'
+ event_type = 'identity.%s.created' % resource_type
+
+ conf = self.useFixture(config_fixture.Config(CONF))
+ conf.config(notification_opt_out=event_type)
+
+ with mock.patch.object(notifications._get_notifier(),
+ '_notify') as mocked:
+
+ notifications._send_audit_notification(action,
+ initiator,
+ outcome,
+ target,
+ event_type)
+ mocked.assert_not_called()
+
+ def test_opt_out_authenticate_event(self):
+ """Test that authenticate events are successfully opted out."""
+ resource_type = EXP_RESOURCE_TYPE
+
+ action = CREATED_OPERATION + '.' + resource_type
+ initiator = mock
+ target = mock
+ outcome = 'success'
+ event_type = 'identity.authenticate'
+ meter_name = '%s.%s' % (event_type, outcome)
+
+ conf = self.useFixture(config_fixture.Config(CONF))
+ conf.config(notification_opt_out=meter_name)
+
+ with mock.patch.object(notifications._get_notifier(),
+ '_notify') as mocked:
+
+ notifications._send_audit_notification(action,
+ initiator,
+ outcome,
+ target,
+ event_type)
+ mocked.assert_not_called()
+
class BaseNotificationTest(test_v3.RestfulTestCase):
@@ -213,13 +212,17 @@ class BaseNotificationTest(test_v3.RestfulTestCase):
self._audits = []
def fake_notify(operation, resource_type, resource_id,
- public=True):
+ actor_dict=None, public=True):
note = {
'resource_id': resource_id,
'operation': operation,
'resource_type': resource_type,
'send_notification_called': True,
'public': public}
+ if actor_dict:
+ note['actor_id'] = actor_dict.get('id')
+ note['actor_type'] = actor_dict.get('type')
+ note['actor_operation'] = actor_dict.get('actor_operation')
self._notifications.append(note)
self.useFixture(mockpatch.PatchObject(
@@ -249,17 +252,23 @@ class BaseNotificationTest(test_v3.RestfulTestCase):
self.useFixture(mockpatch.PatchObject(
notifications, '_send_audit_notification', fake_audit))
- def _assert_last_note(self, resource_id, operation, resource_type):
+ def _assert_last_note(self, resource_id, operation, resource_type,
+ actor_id=None, actor_type=None,
+ actor_operation=None):
# NOTE(stevemar): If 'basic' format is not used, then simply
# return since this assertion is not valid.
if CONF.notification_format != 'basic':
return
self.assertTrue(len(self._notifications) > 0)
note = self._notifications[-1]
- self.assertEqual(note['operation'], operation)
- self.assertEqual(note['resource_id'], resource_id)
- self.assertEqual(note['resource_type'], resource_type)
+ self.assertEqual(operation, note['operation'])
+ self.assertEqual(resource_id, note['resource_id'])
+ self.assertEqual(resource_type, note['resource_type'])
self.assertTrue(note['send_notification_called'])
+ if actor_id:
+ self.assertEqual(actor_id, note['actor_id'])
+ self.assertEqual(actor_type, note['actor_type'])
+ self.assertEqual(actor_operation, note['actor_operation'])
def _assert_last_audit(self, resource_id, operation, resource_type,
target_uri):
@@ -318,14 +327,14 @@ class BaseNotificationTest(test_v3.RestfulTestCase):
class NotificationsForEntities(BaseNotificationTest):
def test_create_group(self):
- group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = unit.new_group_ref(domain_id=self.domain_id)
group_ref = self.identity_api.create_group(group_ref)
self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group')
self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group',
cadftaxonomy.SECURITY_GROUP)
def test_create_project(self):
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
self._assert_last_note(
project_ref['id'], CREATED_OPERATION, 'project')
@@ -333,27 +342,27 @@ class NotificationsForEntities(BaseNotificationTest):
'project', cadftaxonomy.SECURITY_PROJECT)
def test_create_role(self):
- role_ref = self.new_role_ref()
+ role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role')
self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role',
cadftaxonomy.SECURITY_ROLE)
def test_create_user(self):
- user_ref = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = unit.new_user_ref(domain_id=self.domain_id)
user_ref = self.identity_api.create_user(user_ref)
self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user')
self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user',
cadftaxonomy.SECURITY_ACCOUNT_USER)
def test_create_trust(self):
- trustor = self.new_user_ref(domain_id=self.domain_id)
+ trustor = unit.new_user_ref(domain_id=self.domain_id)
trustor = self.identity_api.create_user(trustor)
- trustee = self.new_user_ref(domain_id=self.domain_id)
+ trustee = unit.new_user_ref(domain_id=self.domain_id)
trustee = self.identity_api.create_user(trustee)
- role_ref = self.new_role_ref()
+ role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
- trust_ref = self.new_trust_ref(trustor['id'],
+ trust_ref = unit.new_trust_ref(trustor['id'],
trustee['id'])
self.trust_api.create_trust(trust_ref['id'],
trust_ref,
@@ -364,7 +373,7 @@ class NotificationsForEntities(BaseNotificationTest):
'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST)
def test_delete_group(self):
- group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = unit.new_group_ref(domain_id=self.domain_id)
group_ref = self.identity_api.create_group(group_ref)
self.identity_api.delete_group(group_ref['id'])
self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group')
@@ -372,7 +381,7 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_GROUP)
def test_delete_project(self):
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
self.resource_api.delete_project(project_ref['id'])
self._assert_last_note(
@@ -381,7 +390,7 @@ class NotificationsForEntities(BaseNotificationTest):
'project', cadftaxonomy.SECURITY_PROJECT)
def test_delete_role(self):
- role_ref = self.new_role_ref()
+ role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
self.role_api.delete_role(role_ref['id'])
self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role')
@@ -389,7 +398,7 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_ROLE)
def test_delete_user(self):
- user_ref = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = unit.new_user_ref(domain_id=self.domain_id)
user_ref = self.identity_api.create_user(user_ref)
self.identity_api.delete_user(user_ref['id'])
self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user')
@@ -397,14 +406,14 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_ACCOUNT_USER)
def test_create_domain(self):
- domain_ref = self.new_domain_ref()
+ domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain')
self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain',
cadftaxonomy.SECURITY_DOMAIN)
def test_update_domain(self):
- domain_ref = self.new_domain_ref()
+ domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
domain_ref['description'] = uuid.uuid4().hex
self.resource_api.update_domain(domain_ref['id'], domain_ref)
@@ -413,7 +422,7 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_DOMAIN)
def test_delete_domain(self):
- domain_ref = self.new_domain_ref()
+ domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
domain_ref['enabled'] = False
self.resource_api.update_domain(domain_ref['id'], domain_ref)
@@ -423,12 +432,12 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_DOMAIN)
def test_delete_trust(self):
- trustor = self.new_user_ref(domain_id=self.domain_id)
+ trustor = unit.new_user_ref(domain_id=self.domain_id)
trustor = self.identity_api.create_user(trustor)
- trustee = self.new_user_ref(domain_id=self.domain_id)
+ trustee = unit.new_user_ref(domain_id=self.domain_id)
trustee = self.identity_api.create_user(trustee)
- role_ref = self.new_role_ref()
- trust_ref = self.new_trust_ref(trustor['id'], trustee['id'])
+ role_ref = unit.new_role_ref()
+ trust_ref = unit.new_trust_ref(trustor['id'], trustee['id'])
self.trust_api.create_trust(trust_ref['id'],
trust_ref,
[role_ref])
@@ -439,7 +448,9 @@ class NotificationsForEntities(BaseNotificationTest):
'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST)
def test_create_endpoint(self):
- endpoint_ref = self.new_endpoint_ref(service_id=self.service_id)
+ endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id,
+ interface='public',
+ region_id=self.region_id)
self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION,
'endpoint')
@@ -447,7 +458,9 @@ class NotificationsForEntities(BaseNotificationTest):
'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
def test_update_endpoint(self):
- endpoint_ref = self.new_endpoint_ref(service_id=self.service_id)
+ endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id,
+ interface='public',
+ region_id=self.region_id)
self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref)
self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION,
@@ -456,7 +469,9 @@ class NotificationsForEntities(BaseNotificationTest):
'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
def test_delete_endpoint(self):
- endpoint_ref = self.new_endpoint_ref(service_id=self.service_id)
+ endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id,
+ interface='public',
+ region_id=self.region_id)
self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref)
self.catalog_api.delete_endpoint(endpoint_ref['id'])
self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION,
@@ -465,7 +480,7 @@ class NotificationsForEntities(BaseNotificationTest):
'endpoint', cadftaxonomy.SECURITY_ENDPOINT)
def test_create_service(self):
- service_ref = self.new_service_ref()
+ service_ref = unit.new_service_ref()
self.catalog_api.create_service(service_ref['id'], service_ref)
self._assert_notify_sent(service_ref['id'], CREATED_OPERATION,
'service')
@@ -473,7 +488,7 @@ class NotificationsForEntities(BaseNotificationTest):
'service', cadftaxonomy.SECURITY_SERVICE)
def test_update_service(self):
- service_ref = self.new_service_ref()
+ service_ref = unit.new_service_ref()
self.catalog_api.create_service(service_ref['id'], service_ref)
self.catalog_api.update_service(service_ref['id'], service_ref)
self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION,
@@ -482,7 +497,7 @@ class NotificationsForEntities(BaseNotificationTest):
'service', cadftaxonomy.SECURITY_SERVICE)
def test_delete_service(self):
- service_ref = self.new_service_ref()
+ service_ref = unit.new_service_ref()
self.catalog_api.create_service(service_ref['id'], service_ref)
self.catalog_api.delete_service(service_ref['id'])
self._assert_notify_sent(service_ref['id'], DELETED_OPERATION,
@@ -491,7 +506,7 @@ class NotificationsForEntities(BaseNotificationTest):
'service', cadftaxonomy.SECURITY_SERVICE)
def test_create_region(self):
- region_ref = self.new_region_ref()
+ region_ref = unit.new_region_ref()
self.catalog_api.create_region(region_ref)
self._assert_notify_sent(region_ref['id'], CREATED_OPERATION,
'region')
@@ -499,7 +514,7 @@ class NotificationsForEntities(BaseNotificationTest):
'region', cadftaxonomy.SECURITY_REGION)
def test_update_region(self):
- region_ref = self.new_region_ref()
+ region_ref = unit.new_region_ref()
self.catalog_api.create_region(region_ref)
self.catalog_api.update_region(region_ref['id'], region_ref)
self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION,
@@ -508,7 +523,7 @@ class NotificationsForEntities(BaseNotificationTest):
'region', cadftaxonomy.SECURITY_REGION)
def test_delete_region(self):
- region_ref = self.new_region_ref()
+ region_ref = unit.new_region_ref()
self.catalog_api.create_region(region_ref)
self.catalog_api.delete_region(region_ref['id'])
self._assert_notify_sent(region_ref['id'], DELETED_OPERATION,
@@ -517,7 +532,7 @@ class NotificationsForEntities(BaseNotificationTest):
'region', cadftaxonomy.SECURITY_REGION)
def test_create_policy(self):
- policy_ref = self.new_policy_ref()
+ policy_ref = unit.new_policy_ref()
self.policy_api.create_policy(policy_ref['id'], policy_ref)
self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION,
'policy')
@@ -525,7 +540,7 @@ class NotificationsForEntities(BaseNotificationTest):
'policy', cadftaxonomy.SECURITY_POLICY)
def test_update_policy(self):
- policy_ref = self.new_policy_ref()
+ policy_ref = unit.new_policy_ref()
self.policy_api.create_policy(policy_ref['id'], policy_ref)
self.policy_api.update_policy(policy_ref['id'], policy_ref)
self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION,
@@ -534,7 +549,7 @@ class NotificationsForEntities(BaseNotificationTest):
'policy', cadftaxonomy.SECURITY_POLICY)
def test_delete_policy(self):
- policy_ref = self.new_policy_ref()
+ policy_ref = unit.new_policy_ref()
self.policy_api.create_policy(policy_ref['id'], policy_ref)
self.policy_api.delete_policy(policy_ref['id'])
self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION,
@@ -543,7 +558,7 @@ class NotificationsForEntities(BaseNotificationTest):
'policy', cadftaxonomy.SECURITY_POLICY)
def test_disable_domain(self):
- domain_ref = self.new_domain_ref()
+ domain_ref = unit.new_domain_ref()
self.resource_api.create_domain(domain_ref['id'], domain_ref)
domain_ref['enabled'] = False
self.resource_api.update_domain(domain_ref['id'], domain_ref)
@@ -551,8 +566,7 @@ class NotificationsForEntities(BaseNotificationTest):
public=False)
def test_disable_of_disabled_domain_does_not_notify(self):
- domain_ref = self.new_domain_ref()
- domain_ref['enabled'] = False
+ domain_ref = unit.new_domain_ref(enabled=False)
self.resource_api.create_domain(domain_ref['id'], domain_ref)
# The domain_ref above is not changed during the create process. We
# can use the same ref to perform the update.
@@ -561,7 +575,7 @@ class NotificationsForEntities(BaseNotificationTest):
public=False)
def test_update_group(self):
- group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = unit.new_group_ref(domain_id=self.domain_id)
group_ref = self.identity_api.create_group(group_ref)
self.identity_api.update_group(group_ref['id'], group_ref)
self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group')
@@ -569,7 +583,7 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_GROUP)
def test_update_project(self):
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
self.resource_api.update_project(project_ref['id'], project_ref)
self._assert_notify_sent(
@@ -578,7 +592,7 @@ class NotificationsForEntities(BaseNotificationTest):
'project', cadftaxonomy.SECURITY_PROJECT)
def test_disable_project(self):
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
project_ref['enabled'] = False
self.resource_api.update_project(project_ref['id'], project_ref)
@@ -586,8 +600,8 @@ class NotificationsForEntities(BaseNotificationTest):
public=False)
def test_disable_of_disabled_project_does_not_notify(self):
- project_ref = self.new_project_ref(domain_id=self.domain_id)
- project_ref['enabled'] = False
+ project_ref = unit.new_project_ref(domain_id=self.domain_id,
+ enabled=False)
self.resource_api.create_project(project_ref['id'], project_ref)
# The project_ref above is not changed during the create process. We
# can use the same ref to perform the update.
@@ -596,7 +610,7 @@ class NotificationsForEntities(BaseNotificationTest):
public=False)
def test_update_project_does_not_send_disable(self):
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
project_ref['enabled'] = True
self.resource_api.update_project(project_ref['id'], project_ref)
@@ -605,7 +619,7 @@ class NotificationsForEntities(BaseNotificationTest):
self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project')
def test_update_role(self):
- role_ref = self.new_role_ref()
+ role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
self.role_api.update_role(role_ref['id'], role_ref)
self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role')
@@ -613,7 +627,7 @@ class NotificationsForEntities(BaseNotificationTest):
cadftaxonomy.SECURITY_ROLE)
def test_update_user(self):
- user_ref = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = unit.new_user_ref(domain_id=self.domain_id)
user_ref = self.identity_api.create_user(user_ref)
self.identity_api.update_user(user_ref['id'], user_ref)
self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user')
@@ -622,7 +636,7 @@ class NotificationsForEntities(BaseNotificationTest):
def test_config_option_no_events(self):
self.config_fixture.config(notification_format='basic')
- role_ref = self.new_role_ref()
+ role_ref = unit.new_role_ref()
self.role_api.create_role(role_ref['id'], role_ref)
# The regular notifications will still be emitted, since they are
# used for callback handling.
@@ -630,6 +644,28 @@ class NotificationsForEntities(BaseNotificationTest):
# No audit event should have occurred
self.assertEqual(0, len(self._audits))
+ def test_add_user_to_group(self):
+ user_ref = unit.new_user_ref(domain_id=self.domain_id)
+ user_ref = self.identity_api.create_user(user_ref)
+ group_ref = unit.new_group_ref(domain_id=self.domain_id)
+ group_ref = self.identity_api.create_group(group_ref)
+ self.identity_api.add_user_to_group(user_ref['id'], group_ref['id'])
+ self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group',
+ actor_id=user_ref['id'], actor_type='user',
+ actor_operation='added')
+
+ def test_remove_user_from_group(self):
+ user_ref = unit.new_user_ref(domain_id=self.domain_id)
+ user_ref = self.identity_api.create_user(user_ref)
+ group_ref = unit.new_group_ref(domain_id=self.domain_id)
+ group_ref = self.identity_api.create_group(group_ref)
+ self.identity_api.add_user_to_group(user_ref['id'], group_ref['id'])
+ self.identity_api.remove_user_from_group(user_ref['id'],
+ group_ref['id'])
+ self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group',
+ actor_id=user_ref['id'], actor_type='user',
+ actor_operation='removed')
+
class CADFNotificationsForEntities(NotificationsForEntities):
@@ -638,7 +674,7 @@ class CADFNotificationsForEntities(NotificationsForEntities):
self.config_fixture.config(notification_format='cadf')
def test_initiator_data_is_set(self):
- ref = self.new_domain_ref()
+ ref = unit.new_domain_ref()
resp = self.post('/domains', body={'domain': ref})
resource_id = resp.result.get('domain').get('id')
self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain',
@@ -809,7 +845,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
def test_notification_received(self):
callback = register_callback(CREATED_OPERATION, 'project')
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
self.assertTrue(callback.called)
@@ -854,7 +890,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
callback_called.append(True)
Foo()
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
self.assertEqual([True], callback_called)
@@ -877,7 +913,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
callback_called.append('cb1')
Foo()
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project_ref['id'], project_ref)
self.assertItemsEqual(['cb1', 'cb0'], callback_called)
@@ -919,7 +955,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase):
# something like:
# self.assertRaises(TypeError, Foo)
Foo()
- project_ref = self.new_project_ref(domain_id=self.domain_id)
+ project_ref = unit.new_project_ref(domain_id=self.domain_id)
self.assertRaises(TypeError, self.resource_api.create_project,
project_ref['id'], project_ref)
@@ -963,13 +999,13 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
def _assert_last_note(self, action, user_id, event_type=None):
self.assertTrue(self._notifications)
note = self._notifications[-1]
- self.assertEqual(note['action'], action)
+ self.assertEqual(action, note['action'])
initiator = note['initiator']
- self.assertEqual(initiator.id, user_id)
- self.assertEqual(initiator.host.address, self.LOCAL_HOST)
+ self.assertEqual(user_id, initiator.id)
+ self.assertEqual(self.LOCAL_HOST, initiator.host.address)
self.assertTrue(note['send_notification_called'])
if event_type:
- self.assertEqual(note['event_type'], event_type)
+ self.assertEqual(event_type, note['event_type'])
def _assert_event(self, role_id, project=None, domain=None,
user=None, group=None, inherit=False):
@@ -1006,7 +1042,6 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1'
}
"""
-
note = self._notifications[-1]
event = note['event']
if project:
@@ -1073,7 +1108,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
user=self.user_id)
def test_group_domain_grant(self):
- group_ref = self.new_group_ref(domain_id=self.domain_id)
+ group_ref = unit.new_group_ref(domain_id=self.domain_id)
group = self.identity_api.create_group(group_ref)
self.identity_api.add_user_to_group(self.user_id, group['id'])
url = ('/domains/%s/groups/%s/roles/%s' %
@@ -1087,7 +1122,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
# A notification is sent when add_role_to_user_and_project is called on
# the assignment manager.
- project_ref = self.new_project_ref(self.domain_id)
+ project_ref = unit.new_project_ref(self.domain_id)
project = self.resource_api.create_project(
project_ref['id'], project_ref)
tenant_id = project['id']
@@ -1097,7 +1132,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
self.assertTrue(self._notifications)
note = self._notifications[-1]
- self.assertEqual(note['action'], 'created.role_assignment')
+ self.assertEqual('created.role_assignment', note['action'])
self.assertTrue(note['send_notification_called'])
self._assert_event(self.role_id, project=tenant_id, user=self.user_id)
@@ -1111,7 +1146,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase):
self.assertTrue(self._notifications)
note = self._notifications[-1]
- self.assertEqual(note['action'], 'deleted.role_assignment')
+ self.assertEqual('deleted.role_assignment', note['action'])
self.assertTrue(note['send_notification_called'])
self._assert_event(self.role_id, project=self.project_id,
@@ -1126,7 +1161,9 @@ class TestCallbackRegistration(unit.BaseTestCase):
self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG
def verify_log_message(self, data):
- """Tests that use this are a little brittle because adding more
+ """Verify log message.
+
+ Tests that use this are a little brittle because adding more
logging can break them.
TODO(dstanek): remove the need for this in a future refactoring
diff --git a/keystone-moon/keystone/tests/unit/common/test_sql_core.py b/keystone-moon/keystone/tests/unit/common/test_sql_core.py
index b110ed08..7d20eb03 100644
--- a/keystone-moon/keystone/tests/unit/common/test_sql_core.py
+++ b/keystone-moon/keystone/tests/unit/common/test_sql_core.py
@@ -32,14 +32,14 @@ class TestModelDictMixin(unit.BaseTestCase):
def test_creating_a_model_instance_from_a_dict(self):
d = {'id': utils.new_uuid(), 'text': utils.new_uuid()}
m = TestModel.from_dict(d)
- self.assertEqual(m.id, d['id'])
- self.assertEqual(m.text, d['text'])
+ self.assertEqual(d['id'], m.id)
+ self.assertEqual(d['text'], m.text)
def test_creating_a_dict_from_a_model_instance(self):
m = TestModel(id=utils.new_uuid(), text=utils.new_uuid())
d = m.to_dict()
- self.assertEqual(m.id, d['id'])
- self.assertEqual(m.text, d['text'])
+ self.assertEqual(d['id'], m.id)
+ self.assertEqual(d['text'], m.text)
def test_creating_a_model_instance_from_an_invalid_dict(self):
d = {'id': utils.new_uuid(), 'text': utils.new_uuid(), 'extra': None}
@@ -49,4 +49,4 @@ class TestModelDictMixin(unit.BaseTestCase):
expected = {'id': utils.new_uuid(), 'text': utils.new_uuid()}
m = TestModel(id=expected['id'], text=expected['text'])
m.extra = 'this should not be in the dictionary'
- self.assertEqual(m.to_dict(), expected)
+ self.assertEqual(expected, m.to_dict())
diff --git a/keystone-moon/keystone/tests/unit/common/test_utils.py b/keystone-moon/keystone/tests/unit/common/test_utils.py
index d52eb729..3641aacd 100644
--- a/keystone-moon/keystone/tests/unit/common/test_utils.py
+++ b/keystone-moon/keystone/tests/unit/common/test_utils.py
@@ -1,3 +1,4 @@
+# encoding: utf-8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -16,12 +17,13 @@ import uuid
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_serialization import jsonutils
+import six
from keystone.common import utils as common_utils
from keystone import exception
-from keystone import service
from keystone.tests import unit
from keystone.tests.unit import utils
+from keystone.version import service
CONF = cfg.CONF
@@ -36,6 +38,38 @@ class UtilsTestCase(unit.BaseTestCase):
super(UtilsTestCase, self).setUp()
self.config_fixture = self.useFixture(config_fixture.Config(CONF))
+ def test_resource_uuid(self):
+ uuid_str = '536e28c2017e405e89b25a1ed777b952'
+ self.assertEqual(uuid_str, common_utils.resource_uuid(uuid_str))
+
+ # Exact 64 length string.
+ uuid_str = ('536e28c2017e405e89b25a1ed777b952'
+ 'f13de678ac714bb1b7d1e9a007c10db5')
+ resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE
+ transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex
+ self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str))
+
+ # Non-ASCII character test.
+ non_ascii_ = 'ß' * 32
+ transformed_id = uuid.uuid5(resource_id_namespace, non_ascii_).hex
+ self.assertEqual(transformed_id,
+ common_utils.resource_uuid(non_ascii_))
+
+ # This input is invalid because it's length is more than 64.
+ invalid_input = 'x' * 65
+ self.assertRaises(ValueError, common_utils.resource_uuid,
+ invalid_input)
+
+ # 64 length unicode string, to mimic what is returned from mapping_id
+ # backend.
+ uuid_str = six.text_type('536e28c2017e405e89b25a1ed777b952'
+ 'f13de678ac714bb1b7d1e9a007c10db5')
+ resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE
+ if six.PY2:
+ uuid_str = uuid_str.encode('utf-8')
+ transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex
+ self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str))
+
def test_hash(self):
password = 'right'
wrong = 'wrongwrong' # Two wrongs don't make a right
@@ -153,6 +187,18 @@ class UtilsTestCase(unit.BaseTestCase):
expected_json = '{"field":"value"}'
self.assertEqual(expected_json, json)
+ def test_url_safe_check(self):
+ base_str = 'i am safe'
+ self.assertFalse(common_utils.is_not_url_safe(base_str))
+ for i in common_utils.URL_RESERVED_CHARS:
+ self.assertTrue(common_utils.is_not_url_safe(base_str + i))
+
+ def test_url_safe_with_unicode_check(self):
+ base_str = u'i am \xe7afe'
+ self.assertFalse(common_utils.is_not_url_safe(base_str))
+ for i in common_utils.URL_RESERVED_CHARS:
+ self.assertTrue(common_utils.is_not_url_safe(base_str + i))
+
class ServiceHelperTests(unit.BaseTestCase):