diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common')
6 files changed, 428 insertions, 161 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_authorization.py b/keystone-moon/keystone/tests/unit/common/test_authorization.py new file mode 100644 index 00000000..73ddbc61 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_authorization.py @@ -0,0 +1,161 @@ +# Copyright 2015 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import copy +import uuid + +from keystone.common import authorization +from keystone import exception +from keystone.federation import constants as federation_constants +from keystone.models import token_model +from keystone.tests import unit +from keystone.tests.unit import test_token_provider + + +class TestTokenToAuthContext(unit.BaseTestCase): + def test_token_is_project_scoped_with_trust(self): + # Check auth_context result when the token is project-scoped and has + # trust info. + + # SAMPLE_V3_TOKEN has OS-TRUST:trust in it. + token_data = test_token_provider.SAMPLE_V3_TOKEN + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + auth_context = authorization.token_to_auth_context(token) + + self.assertEqual(token, auth_context['token']) + self.assertTrue(auth_context['is_delegated_auth']) + self.assertEqual(token_data['token']['user']['id'], + auth_context['user_id']) + self.assertEqual(token_data['token']['user']['domain']['id'], + auth_context['user_domain_id']) + self.assertEqual(token_data['token']['project']['id'], + auth_context['project_id']) + self.assertEqual(token_data['token']['project']['domain']['id'], + auth_context['project_domain_id']) + self.assertNotIn('domain_id', auth_context) + self.assertNotIn('domain_name', auth_context) + self.assertEqual(token_data['token']['OS-TRUST:trust']['id'], + auth_context['trust_id']) + self.assertEqual( + token_data['token']['OS-TRUST:trust']['trustor_user_id'], + auth_context['trustor_id']) + self.assertEqual( + token_data['token']['OS-TRUST:trust']['trustee_user_id'], + auth_context['trustee_id']) + self.assertItemsEqual( + [r['name'] for r in token_data['token']['roles']], + auth_context['roles']) + self.assertIsNone(auth_context['consumer_id']) + self.assertIsNone(auth_context['access_token_id']) + self.assertNotIn('group_ids', auth_context) + + def test_token_is_domain_scoped(self): + # Check contents of auth_context when token is domain-scoped. + token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) + del token_data['token']['project'] + + domain_id = uuid.uuid4().hex + domain_name = uuid.uuid4().hex + token_data['token']['domain'] = {'id': domain_id, 'name': domain_name} + + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + auth_context = authorization.token_to_auth_context(token) + + self.assertNotIn('project_id', auth_context) + self.assertNotIn('project_domain_id', auth_context) + + self.assertEqual(domain_id, auth_context['domain_id']) + self.assertEqual(domain_name, auth_context['domain_name']) + + def test_token_is_unscoped(self): + # Check contents of auth_context when the token is unscoped. + token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) + del token_data['token']['project'] + + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + auth_context = authorization.token_to_auth_context(token) + + self.assertNotIn('project_id', auth_context) + self.assertNotIn('project_domain_id', auth_context) + self.assertNotIn('domain_id', auth_context) + self.assertNotIn('domain_name', auth_context) + + def test_token_is_for_federated_user(self): + # When the token is for a federated user then group_ids is in + # auth_context. + token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) + + group_ids = [uuid.uuid4().hex for x in range(1, 5)] + + federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, + 'protocol': {'id': 'saml2'}, + 'groups': [{'id': gid} for gid in group_ids]} + token_data['token']['user'][federation_constants.FEDERATION] = ( + federation_data) + + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + auth_context = authorization.token_to_auth_context(token) + + self.assertItemsEqual(group_ids, auth_context['group_ids']) + + def test_oauth_variables_set_for_oauth_token(self): + token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) + access_token_id = uuid.uuid4().hex + consumer_id = uuid.uuid4().hex + token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id, + 'consumer_id': consumer_id} + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + auth_context = authorization.token_to_auth_context(token) + + self.assertEqual(access_token_id, auth_context['access_token_id']) + self.assertEqual(consumer_id, auth_context['consumer_id']) + + def test_oauth_variables_not_set(self): + token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + auth_context = authorization.token_to_auth_context(token) + + self.assertIsNone(auth_context['access_token_id']) + self.assertIsNone(auth_context['consumer_id']) + + def test_token_is_not_KeystoneToken_raises_exception(self): + # If the token isn't a KeystoneToken then an UnexpectedError exception + # is raised. + self.assertRaises(exception.UnexpectedError, + authorization.token_to_auth_context, {}) + + def test_user_id_missing_in_token_raises_exception(self): + # If there's no user ID in the token then an Unauthorized + # exception is raised. + token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN) + del token_data['token']['user']['id'] + + token = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=token_data) + + self.assertRaises(exception.Unauthorized, + authorization.token_to_auth_context, token) diff --git a/keystone-moon/keystone/tests/unit/common/test_ldap.py b/keystone-moon/keystone/tests/unit/common/test_ldap.py index e6e2c732..eed77286 100644 --- a/keystone-moon/keystone/tests/unit/common/test_ldap.py +++ b/keystone-moon/keystone/tests/unit/common/test_ldap.py @@ -27,6 +27,7 @@ from keystone.common.ldap import core as common_ldap_core from keystone.tests import unit from keystone.tests.unit import default_fixtures from keystone.tests.unit import fakeldap +from keystone.tests.unit.ksfixtures import database CONF = cfg.CONF @@ -195,8 +196,8 @@ class DnCompareTest(unit.BaseTestCase): def test_startswith_unicode(self): # dn_startswith accepts unicode. - child = u'cn=cn=fäké,ou=OpenStäck' - parent = 'ou=OpenStäck' + child = u'cn=fäké,ou=OpenStäck' + parent = u'ou=OpenStäck' self.assertTrue(ks_ldap.dn_startswith(child, parent)) @@ -207,6 +208,8 @@ class LDAPDeleteTreeTest(unit.TestCase): ks_ldap.register_handler('fake://', fakeldap.FakeLdapNoSubtreeDelete) + self.useFixture(database.Database(self.sql_driver_version_overrides)) + self.load_backends() self.load_fixtures(default_fixtures) @@ -226,11 +229,11 @@ class LDAPDeleteTreeTest(unit.TestCase): config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) return config_files - def test_deleteTree(self): + def test_delete_tree(self): """Test manually deleting a tree. Few LDAP servers support CONTROL_DELETETREE. This test - exercises the alternate code paths in BaseLdap.deleteTree. + exercises the alternate code paths in BaseLdap.delete_tree. """ conn = self.identity_api.user.get_connection() @@ -251,7 +254,7 @@ class LDAPDeleteTreeTest(unit.TestCase): # cn=base # cn=child,cn=base # cn=grandchild,cn=child,cn=base - # then attempt to deleteTree(cn=base) + # then attempt to delete_tree(cn=base) base_id = 'base' base_dn = create_entry(base_id) child_dn = create_entry('child', base_dn) @@ -273,8 +276,8 @@ class LDAPDeleteTreeTest(unit.TestCase): self.assertRaises(ldap.NOT_ALLOWED_ON_NONLEAF, conn.delete_s, child_dn) - # call our deleteTree implementation - self.identity_api.user.deleteTree(base_id) + # call our delete_tree implementation + self.identity_api.user.delete_tree(base_id) self.assertRaises(ldap.NO_SUCH_OBJECT, conn.search_s, base_dn, ldap.SCOPE_BASE) self.assertRaises(ldap.NO_SUCH_OBJECT, @@ -283,6 +286,24 @@ class LDAPDeleteTreeTest(unit.TestCase): conn.search_s, grandchild_dn, ldap.SCOPE_BASE) +class MultiURLTests(unit.TestCase): + """Tests for setting multiple LDAP URLs.""" + + def test_multiple_urls_with_comma_no_conn_pool(self): + urls = 'ldap://localhost,ldap://backup.localhost' + self.config_fixture.config(group='ldap', url=urls, use_pool=False) + base_ldap = ks_ldap.BaseLdap(CONF) + ldap_connection = base_ldap.get_connection() + self.assertEqual(urls, ldap_connection.conn.conn._uri) + + def test_multiple_urls_with_comma_with_conn_pool(self): + urls = 'ldap://localhost,ldap://backup.localhost' + self.config_fixture.config(group='ldap', url=urls, use_pool=True) + base_ldap = ks_ldap.BaseLdap(CONF) + ldap_connection = base_ldap.get_connection() + self.assertEqual(urls, ldap_connection.conn.conn_pool.uri) + + class SslTlsTest(unit.TestCase): """Tests for the SSL/TLS functionality in keystone.common.ldap.core.""" @@ -359,6 +380,7 @@ class LDAPPagedResultsTest(unit.TestCase): ks_ldap.register_handler('fake://', fakeldap.FakeLdap) self.addCleanup(common_ldap_core._HANDLERS.clear) + self.useFixture(database.Database(self.sql_driver_version_overrides)) self.load_backends() self.load_fixtures(default_fixtures) diff --git a/keystone-moon/keystone/tests/unit/common/test_manager.py b/keystone-moon/keystone/tests/unit/common/test_manager.py index 1bc19763..7ef91e15 100644 --- a/keystone-moon/keystone/tests/unit/common/test_manager.py +++ b/keystone-moon/keystone/tests/unit/common/test_manager.py @@ -24,7 +24,7 @@ class TestCreateLegacyDriver(unit.BaseTestCase): Driver = manager.create_legacy_driver(catalog.CatalogDriverV8) # NOTE(dstanek): I want to subvert the requirement for this - # class to implement all of the abstractmethods. + # class to implement all of the abstract methods. Driver.__abstractmethods__ = set() impl = Driver() @@ -32,8 +32,9 @@ class TestCreateLegacyDriver(unit.BaseTestCase): 'as_of': 'Liberty', 'what': 'keystone.catalog.core.Driver', 'in_favor_of': 'keystone.catalog.core.CatalogDriverV8', - 'remove_in': 'N', + 'remove_in': mock.ANY, } mock_reporter.assert_called_with(mock.ANY, mock.ANY, details) + self.assertEqual('N', mock_reporter.call_args[0][2]['remove_in'][0]) self.assertIsInstance(impl, catalog.CatalogDriverV8) diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py index 1ad8d50d..aa2e6f72 100644 --- a/keystone-moon/keystone/tests/unit/common/test_notifications.py +++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py @@ -43,9 +43,7 @@ class ArbitraryException(Exception): def register_callback(operation, resource_type=EXP_RESOURCE_TYPE): - """Helper for creating and registering a mock callback. - - """ + """Helper for creating and registering a mock callback.""" callback = mock.Mock(__name__='callback', im_class=mock.Mock(__name__='class')) notifications.register_event_callback(operation, resource_type, callback) @@ -95,89 +93,14 @@ class AuditNotificationsTestCase(unit.BaseTestCase): DISABLED_OPERATION) -class NotificationsWrapperTestCase(unit.BaseTestCase): - def create_fake_ref(self): - resource_id = uuid.uuid4().hex - return resource_id, { - 'id': resource_id, - 'key': uuid.uuid4().hex - } - - @notifications.created(EXP_RESOURCE_TYPE) - def create_resource(self, resource_id, data): - return data - - def test_resource_created_notification(self): - exp_resource_id, data = self.create_fake_ref() - callback = register_callback(CREATED_OPERATION) - - self.create_resource(exp_resource_id, data) - callback.assert_called_with('identity', EXP_RESOURCE_TYPE, - CREATED_OPERATION, - {'resource_info': exp_resource_id}) - - @notifications.updated(EXP_RESOURCE_TYPE) - def update_resource(self, resource_id, data): - return data - - def test_resource_updated_notification(self): - exp_resource_id, data = self.create_fake_ref() - callback = register_callback(UPDATED_OPERATION) - - self.update_resource(exp_resource_id, data) - callback.assert_called_with('identity', EXP_RESOURCE_TYPE, - UPDATED_OPERATION, - {'resource_info': exp_resource_id}) - - @notifications.deleted(EXP_RESOURCE_TYPE) - def delete_resource(self, resource_id): - pass - - def test_resource_deleted_notification(self): - exp_resource_id = uuid.uuid4().hex - callback = register_callback(DELETED_OPERATION) - - self.delete_resource(exp_resource_id) - callback.assert_called_with('identity', EXP_RESOURCE_TYPE, - DELETED_OPERATION, - {'resource_info': exp_resource_id}) - - @notifications.created(EXP_RESOURCE_TYPE) - def create_exception(self, resource_id): - raise ArbitraryException() - - def test_create_exception_without_notification(self): - callback = register_callback(CREATED_OPERATION) - self.assertRaises( - ArbitraryException, self.create_exception, uuid.uuid4().hex) - self.assertFalse(callback.called) - - @notifications.created(EXP_RESOURCE_TYPE) - def update_exception(self, resource_id): - raise ArbitraryException() - - def test_update_exception_without_notification(self): - callback = register_callback(UPDATED_OPERATION) - self.assertRaises( - ArbitraryException, self.update_exception, uuid.uuid4().hex) - self.assertFalse(callback.called) - - @notifications.deleted(EXP_RESOURCE_TYPE) - def delete_exception(self, resource_id): - raise ArbitraryException() - - def test_delete_exception_without_notification(self): - callback = register_callback(DELETED_OPERATION) - self.assertRaises( - ArbitraryException, self.delete_exception, uuid.uuid4().hex) - self.assertFalse(callback.called) - - class NotificationsTestCase(unit.BaseTestCase): def test_send_notification(self): - """Test the private method _send_notification to ensure event_type, - payload, and context are built and passed properly. + """Test _send_notification. + + Test the private method _send_notification to ensure event_type, + payload, and context are built and passed properly. + """ resource = uuid.uuid4().hex resource_type = EXP_RESOURCE_TYPE @@ -203,6 +126,82 @@ class NotificationsTestCase(unit.BaseTestCase): resource) mocked.assert_called_once_with(*expected_args) + def test_send_notification_with_opt_out(self): + """Test the private method _send_notification with opt-out. + + Test that _send_notification does not notify when a valid + notification_opt_out configuration is provided. + """ + resource = uuid.uuid4().hex + resource_type = EXP_RESOURCE_TYPE + operation = CREATED_OPERATION + event_type = 'identity.%s.created' % resource_type + + # NOTE(diazjf): Here we add notification_opt_out to the + # configuration so that we should return before _get_notifer is + # called. This is because we are opting out notifications for the + # passed resource_type and operation. + conf = self.useFixture(config_fixture.Config(CONF)) + conf.config(notification_opt_out=event_type) + + with mock.patch.object(notifications._get_notifier(), + '_notify') as mocked: + + notifications._send_notification(operation, resource_type, + resource) + mocked.assert_not_called() + + def test_send_audit_notification_with_opt_out(self): + """Test the private method _send_audit_notification with opt-out. + + Test that _send_audit_notification does not notify when a valid + notification_opt_out configuration is provided. + """ + resource_type = EXP_RESOURCE_TYPE + + action = CREATED_OPERATION + '.' + resource_type + initiator = mock + target = mock + outcome = 'success' + event_type = 'identity.%s.created' % resource_type + + conf = self.useFixture(config_fixture.Config(CONF)) + conf.config(notification_opt_out=event_type) + + with mock.patch.object(notifications._get_notifier(), + '_notify') as mocked: + + notifications._send_audit_notification(action, + initiator, + outcome, + target, + event_type) + mocked.assert_not_called() + + def test_opt_out_authenticate_event(self): + """Test that authenticate events are successfully opted out.""" + resource_type = EXP_RESOURCE_TYPE + + action = CREATED_OPERATION + '.' + resource_type + initiator = mock + target = mock + outcome = 'success' + event_type = 'identity.authenticate' + meter_name = '%s.%s' % (event_type, outcome) + + conf = self.useFixture(config_fixture.Config(CONF)) + conf.config(notification_opt_out=meter_name) + + with mock.patch.object(notifications._get_notifier(), + '_notify') as mocked: + + notifications._send_audit_notification(action, + initiator, + outcome, + target, + event_type) + mocked.assert_not_called() + class BaseNotificationTest(test_v3.RestfulTestCase): @@ -213,13 +212,17 @@ class BaseNotificationTest(test_v3.RestfulTestCase): self._audits = [] def fake_notify(operation, resource_type, resource_id, - public=True): + actor_dict=None, public=True): note = { 'resource_id': resource_id, 'operation': operation, 'resource_type': resource_type, 'send_notification_called': True, 'public': public} + if actor_dict: + note['actor_id'] = actor_dict.get('id') + note['actor_type'] = actor_dict.get('type') + note['actor_operation'] = actor_dict.get('actor_operation') self._notifications.append(note) self.useFixture(mockpatch.PatchObject( @@ -249,17 +252,23 @@ class BaseNotificationTest(test_v3.RestfulTestCase): self.useFixture(mockpatch.PatchObject( notifications, '_send_audit_notification', fake_audit)) - def _assert_last_note(self, resource_id, operation, resource_type): + def _assert_last_note(self, resource_id, operation, resource_type, + actor_id=None, actor_type=None, + actor_operation=None): # NOTE(stevemar): If 'basic' format is not used, then simply # return since this assertion is not valid. if CONF.notification_format != 'basic': return self.assertTrue(len(self._notifications) > 0) note = self._notifications[-1] - self.assertEqual(note['operation'], operation) - self.assertEqual(note['resource_id'], resource_id) - self.assertEqual(note['resource_type'], resource_type) + self.assertEqual(operation, note['operation']) + self.assertEqual(resource_id, note['resource_id']) + self.assertEqual(resource_type, note['resource_type']) self.assertTrue(note['send_notification_called']) + if actor_id: + self.assertEqual(actor_id, note['actor_id']) + self.assertEqual(actor_type, note['actor_type']) + self.assertEqual(actor_operation, note['actor_operation']) def _assert_last_audit(self, resource_id, operation, resource_type, target_uri): @@ -318,14 +327,14 @@ class BaseNotificationTest(test_v3.RestfulTestCase): class NotificationsForEntities(BaseNotificationTest): def test_create_group(self): - group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = unit.new_group_ref(domain_id=self.domain_id) group_ref = self.identity_api.create_group(group_ref) self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group') self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group', cadftaxonomy.SECURITY_GROUP) def test_create_project(self): - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) self._assert_last_note( project_ref['id'], CREATED_OPERATION, 'project') @@ -333,27 +342,27 @@ class NotificationsForEntities(BaseNotificationTest): 'project', cadftaxonomy.SECURITY_PROJECT) def test_create_role(self): - role_ref = self.new_role_ref() + role_ref = unit.new_role_ref() self.role_api.create_role(role_ref['id'], role_ref) self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role', cadftaxonomy.SECURITY_ROLE) def test_create_user(self): - user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = unit.new_user_ref(domain_id=self.domain_id) user_ref = self.identity_api.create_user(user_ref) self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user') self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user', cadftaxonomy.SECURITY_ACCOUNT_USER) def test_create_trust(self): - trustor = self.new_user_ref(domain_id=self.domain_id) + trustor = unit.new_user_ref(domain_id=self.domain_id) trustor = self.identity_api.create_user(trustor) - trustee = self.new_user_ref(domain_id=self.domain_id) + trustee = unit.new_user_ref(domain_id=self.domain_id) trustee = self.identity_api.create_user(trustee) - role_ref = self.new_role_ref() + role_ref = unit.new_role_ref() self.role_api.create_role(role_ref['id'], role_ref) - trust_ref = self.new_trust_ref(trustor['id'], + trust_ref = unit.new_trust_ref(trustor['id'], trustee['id']) self.trust_api.create_trust(trust_ref['id'], trust_ref, @@ -364,7 +373,7 @@ class NotificationsForEntities(BaseNotificationTest): 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) def test_delete_group(self): - group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = unit.new_group_ref(domain_id=self.domain_id) group_ref = self.identity_api.create_group(group_ref) self.identity_api.delete_group(group_ref['id']) self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group') @@ -372,7 +381,7 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_GROUP) def test_delete_project(self): - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) self.resource_api.delete_project(project_ref['id']) self._assert_last_note( @@ -381,7 +390,7 @@ class NotificationsForEntities(BaseNotificationTest): 'project', cadftaxonomy.SECURITY_PROJECT) def test_delete_role(self): - role_ref = self.new_role_ref() + role_ref = unit.new_role_ref() self.role_api.create_role(role_ref['id'], role_ref) self.role_api.delete_role(role_ref['id']) self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role') @@ -389,7 +398,7 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_ROLE) def test_delete_user(self): - user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = unit.new_user_ref(domain_id=self.domain_id) user_ref = self.identity_api.create_user(user_ref) self.identity_api.delete_user(user_ref['id']) self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user') @@ -397,14 +406,14 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_ACCOUNT_USER) def test_create_domain(self): - domain_ref = self.new_domain_ref() + domain_ref = unit.new_domain_ref() self.resource_api.create_domain(domain_ref['id'], domain_ref) self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain') self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain', cadftaxonomy.SECURITY_DOMAIN) def test_update_domain(self): - domain_ref = self.new_domain_ref() + domain_ref = unit.new_domain_ref() self.resource_api.create_domain(domain_ref['id'], domain_ref) domain_ref['description'] = uuid.uuid4().hex self.resource_api.update_domain(domain_ref['id'], domain_ref) @@ -413,7 +422,7 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_DOMAIN) def test_delete_domain(self): - domain_ref = self.new_domain_ref() + domain_ref = unit.new_domain_ref() self.resource_api.create_domain(domain_ref['id'], domain_ref) domain_ref['enabled'] = False self.resource_api.update_domain(domain_ref['id'], domain_ref) @@ -423,12 +432,12 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_DOMAIN) def test_delete_trust(self): - trustor = self.new_user_ref(domain_id=self.domain_id) + trustor = unit.new_user_ref(domain_id=self.domain_id) trustor = self.identity_api.create_user(trustor) - trustee = self.new_user_ref(domain_id=self.domain_id) + trustee = unit.new_user_ref(domain_id=self.domain_id) trustee = self.identity_api.create_user(trustee) - role_ref = self.new_role_ref() - trust_ref = self.new_trust_ref(trustor['id'], trustee['id']) + role_ref = unit.new_role_ref() + trust_ref = unit.new_trust_ref(trustor['id'], trustee['id']) self.trust_api.create_trust(trust_ref['id'], trust_ref, [role_ref]) @@ -439,7 +448,9 @@ class NotificationsForEntities(BaseNotificationTest): 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) def test_create_endpoint(self): - endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, + interface='public', + region_id=self.region_id) self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION, 'endpoint') @@ -447,7 +458,9 @@ class NotificationsForEntities(BaseNotificationTest): 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) def test_update_endpoint(self): - endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, + interface='public', + region_id=self.region_id) self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref) self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION, @@ -456,7 +469,9 @@ class NotificationsForEntities(BaseNotificationTest): 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) def test_delete_endpoint(self): - endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, + interface='public', + region_id=self.region_id) self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) self.catalog_api.delete_endpoint(endpoint_ref['id']) self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION, @@ -465,7 +480,7 @@ class NotificationsForEntities(BaseNotificationTest): 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) def test_create_service(self): - service_ref = self.new_service_ref() + service_ref = unit.new_service_ref() self.catalog_api.create_service(service_ref['id'], service_ref) self._assert_notify_sent(service_ref['id'], CREATED_OPERATION, 'service') @@ -473,7 +488,7 @@ class NotificationsForEntities(BaseNotificationTest): 'service', cadftaxonomy.SECURITY_SERVICE) def test_update_service(self): - service_ref = self.new_service_ref() + service_ref = unit.new_service_ref() self.catalog_api.create_service(service_ref['id'], service_ref) self.catalog_api.update_service(service_ref['id'], service_ref) self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION, @@ -482,7 +497,7 @@ class NotificationsForEntities(BaseNotificationTest): 'service', cadftaxonomy.SECURITY_SERVICE) def test_delete_service(self): - service_ref = self.new_service_ref() + service_ref = unit.new_service_ref() self.catalog_api.create_service(service_ref['id'], service_ref) self.catalog_api.delete_service(service_ref['id']) self._assert_notify_sent(service_ref['id'], DELETED_OPERATION, @@ -491,7 +506,7 @@ class NotificationsForEntities(BaseNotificationTest): 'service', cadftaxonomy.SECURITY_SERVICE) def test_create_region(self): - region_ref = self.new_region_ref() + region_ref = unit.new_region_ref() self.catalog_api.create_region(region_ref) self._assert_notify_sent(region_ref['id'], CREATED_OPERATION, 'region') @@ -499,7 +514,7 @@ class NotificationsForEntities(BaseNotificationTest): 'region', cadftaxonomy.SECURITY_REGION) def test_update_region(self): - region_ref = self.new_region_ref() + region_ref = unit.new_region_ref() self.catalog_api.create_region(region_ref) self.catalog_api.update_region(region_ref['id'], region_ref) self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION, @@ -508,7 +523,7 @@ class NotificationsForEntities(BaseNotificationTest): 'region', cadftaxonomy.SECURITY_REGION) def test_delete_region(self): - region_ref = self.new_region_ref() + region_ref = unit.new_region_ref() self.catalog_api.create_region(region_ref) self.catalog_api.delete_region(region_ref['id']) self._assert_notify_sent(region_ref['id'], DELETED_OPERATION, @@ -517,7 +532,7 @@ class NotificationsForEntities(BaseNotificationTest): 'region', cadftaxonomy.SECURITY_REGION) def test_create_policy(self): - policy_ref = self.new_policy_ref() + policy_ref = unit.new_policy_ref() self.policy_api.create_policy(policy_ref['id'], policy_ref) self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION, 'policy') @@ -525,7 +540,7 @@ class NotificationsForEntities(BaseNotificationTest): 'policy', cadftaxonomy.SECURITY_POLICY) def test_update_policy(self): - policy_ref = self.new_policy_ref() + policy_ref = unit.new_policy_ref() self.policy_api.create_policy(policy_ref['id'], policy_ref) self.policy_api.update_policy(policy_ref['id'], policy_ref) self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION, @@ -534,7 +549,7 @@ class NotificationsForEntities(BaseNotificationTest): 'policy', cadftaxonomy.SECURITY_POLICY) def test_delete_policy(self): - policy_ref = self.new_policy_ref() + policy_ref = unit.new_policy_ref() self.policy_api.create_policy(policy_ref['id'], policy_ref) self.policy_api.delete_policy(policy_ref['id']) self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION, @@ -543,7 +558,7 @@ class NotificationsForEntities(BaseNotificationTest): 'policy', cadftaxonomy.SECURITY_POLICY) def test_disable_domain(self): - domain_ref = self.new_domain_ref() + domain_ref = unit.new_domain_ref() self.resource_api.create_domain(domain_ref['id'], domain_ref) domain_ref['enabled'] = False self.resource_api.update_domain(domain_ref['id'], domain_ref) @@ -551,8 +566,7 @@ class NotificationsForEntities(BaseNotificationTest): public=False) def test_disable_of_disabled_domain_does_not_notify(self): - domain_ref = self.new_domain_ref() - domain_ref['enabled'] = False + domain_ref = unit.new_domain_ref(enabled=False) self.resource_api.create_domain(domain_ref['id'], domain_ref) # The domain_ref above is not changed during the create process. We # can use the same ref to perform the update. @@ -561,7 +575,7 @@ class NotificationsForEntities(BaseNotificationTest): public=False) def test_update_group(self): - group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = unit.new_group_ref(domain_id=self.domain_id) group_ref = self.identity_api.create_group(group_ref) self.identity_api.update_group(group_ref['id'], group_ref) self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group') @@ -569,7 +583,7 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_GROUP) def test_update_project(self): - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) self.resource_api.update_project(project_ref['id'], project_ref) self._assert_notify_sent( @@ -578,7 +592,7 @@ class NotificationsForEntities(BaseNotificationTest): 'project', cadftaxonomy.SECURITY_PROJECT) def test_disable_project(self): - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) project_ref['enabled'] = False self.resource_api.update_project(project_ref['id'], project_ref) @@ -586,8 +600,8 @@ class NotificationsForEntities(BaseNotificationTest): public=False) def test_disable_of_disabled_project_does_not_notify(self): - project_ref = self.new_project_ref(domain_id=self.domain_id) - project_ref['enabled'] = False + project_ref = unit.new_project_ref(domain_id=self.domain_id, + enabled=False) self.resource_api.create_project(project_ref['id'], project_ref) # The project_ref above is not changed during the create process. We # can use the same ref to perform the update. @@ -596,7 +610,7 @@ class NotificationsForEntities(BaseNotificationTest): public=False) def test_update_project_does_not_send_disable(self): - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) project_ref['enabled'] = True self.resource_api.update_project(project_ref['id'], project_ref) @@ -605,7 +619,7 @@ class NotificationsForEntities(BaseNotificationTest): self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project') def test_update_role(self): - role_ref = self.new_role_ref() + role_ref = unit.new_role_ref() self.role_api.create_role(role_ref['id'], role_ref) self.role_api.update_role(role_ref['id'], role_ref) self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role') @@ -613,7 +627,7 @@ class NotificationsForEntities(BaseNotificationTest): cadftaxonomy.SECURITY_ROLE) def test_update_user(self): - user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = unit.new_user_ref(domain_id=self.domain_id) user_ref = self.identity_api.create_user(user_ref) self.identity_api.update_user(user_ref['id'], user_ref) self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user') @@ -622,7 +636,7 @@ class NotificationsForEntities(BaseNotificationTest): def test_config_option_no_events(self): self.config_fixture.config(notification_format='basic') - role_ref = self.new_role_ref() + role_ref = unit.new_role_ref() self.role_api.create_role(role_ref['id'], role_ref) # The regular notifications will still be emitted, since they are # used for callback handling. @@ -630,6 +644,28 @@ class NotificationsForEntities(BaseNotificationTest): # No audit event should have occurred self.assertEqual(0, len(self._audits)) + def test_add_user_to_group(self): + user_ref = unit.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + group_ref = unit.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self.identity_api.add_user_to_group(user_ref['id'], group_ref['id']) + self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group', + actor_id=user_ref['id'], actor_type='user', + actor_operation='added') + + def test_remove_user_from_group(self): + user_ref = unit.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + group_ref = unit.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self.identity_api.add_user_to_group(user_ref['id'], group_ref['id']) + self.identity_api.remove_user_from_group(user_ref['id'], + group_ref['id']) + self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group', + actor_id=user_ref['id'], actor_type='user', + actor_operation='removed') + class CADFNotificationsForEntities(NotificationsForEntities): @@ -638,7 +674,7 @@ class CADFNotificationsForEntities(NotificationsForEntities): self.config_fixture.config(notification_format='cadf') def test_initiator_data_is_set(self): - ref = self.new_domain_ref() + ref = unit.new_domain_ref() resp = self.post('/domains', body={'domain': ref}) resource_id = resp.result.get('domain').get('id') self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain', @@ -809,7 +845,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase): def test_notification_received(self): callback = register_callback(CREATED_OPERATION, 'project') - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) self.assertTrue(callback.called) @@ -854,7 +890,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase): callback_called.append(True) Foo() - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) self.assertEqual([True], callback_called) @@ -877,7 +913,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase): callback_called.append('cb1') Foo() - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project_ref['id'], project_ref) self.assertItemsEqual(['cb1', 'cb0'], callback_called) @@ -919,7 +955,7 @@ class TestEventCallbacks(test_v3.RestfulTestCase): # something like: # self.assertRaises(TypeError, Foo) Foo() - project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref = unit.new_project_ref(domain_id=self.domain_id) self.assertRaises(TypeError, self.resource_api.create_project, project_ref['id'], project_ref) @@ -963,13 +999,13 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): def _assert_last_note(self, action, user_id, event_type=None): self.assertTrue(self._notifications) note = self._notifications[-1] - self.assertEqual(note['action'], action) + self.assertEqual(action, note['action']) initiator = note['initiator'] - self.assertEqual(initiator.id, user_id) - self.assertEqual(initiator.host.address, self.LOCAL_HOST) + self.assertEqual(user_id, initiator.id) + self.assertEqual(self.LOCAL_HOST, initiator.host.address) self.assertTrue(note['send_notification_called']) if event_type: - self.assertEqual(note['event_type'], event_type) + self.assertEqual(event_type, note['event_type']) def _assert_event(self, role_id, project=None, domain=None, user=None, group=None, inherit=False): @@ -1006,7 +1042,6 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1' } """ - note = self._notifications[-1] event = note['event'] if project: @@ -1073,7 +1108,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): user=self.user_id) def test_group_domain_grant(self): - group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = unit.new_group_ref(domain_id=self.domain_id) group = self.identity_api.create_group(group_ref) self.identity_api.add_user_to_group(self.user_id, group['id']) url = ('/domains/%s/groups/%s/roles/%s' % @@ -1087,7 +1122,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): # A notification is sent when add_role_to_user_and_project is called on # the assignment manager. - project_ref = self.new_project_ref(self.domain_id) + project_ref = unit.new_project_ref(self.domain_id) project = self.resource_api.create_project( project_ref['id'], project_ref) tenant_id = project['id'] @@ -1097,7 +1132,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): self.assertTrue(self._notifications) note = self._notifications[-1] - self.assertEqual(note['action'], 'created.role_assignment') + self.assertEqual('created.role_assignment', note['action']) self.assertTrue(note['send_notification_called']) self._assert_event(self.role_id, project=tenant_id, user=self.user_id) @@ -1111,7 +1146,7 @@ class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): self.assertTrue(self._notifications) note = self._notifications[-1] - self.assertEqual(note['action'], 'deleted.role_assignment') + self.assertEqual('deleted.role_assignment', note['action']) self.assertTrue(note['send_notification_called']) self._assert_event(self.role_id, project=self.project_id, @@ -1126,7 +1161,9 @@ class TestCallbackRegistration(unit.BaseTestCase): self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG def verify_log_message(self, data): - """Tests that use this are a little brittle because adding more + """Verify log message. + + Tests that use this are a little brittle because adding more logging can break them. TODO(dstanek): remove the need for this in a future refactoring diff --git a/keystone-moon/keystone/tests/unit/common/test_sql_core.py b/keystone-moon/keystone/tests/unit/common/test_sql_core.py index b110ed08..7d20eb03 100644 --- a/keystone-moon/keystone/tests/unit/common/test_sql_core.py +++ b/keystone-moon/keystone/tests/unit/common/test_sql_core.py @@ -32,14 +32,14 @@ class TestModelDictMixin(unit.BaseTestCase): def test_creating_a_model_instance_from_a_dict(self): d = {'id': utils.new_uuid(), 'text': utils.new_uuid()} m = TestModel.from_dict(d) - self.assertEqual(m.id, d['id']) - self.assertEqual(m.text, d['text']) + self.assertEqual(d['id'], m.id) + self.assertEqual(d['text'], m.text) def test_creating_a_dict_from_a_model_instance(self): m = TestModel(id=utils.new_uuid(), text=utils.new_uuid()) d = m.to_dict() - self.assertEqual(m.id, d['id']) - self.assertEqual(m.text, d['text']) + self.assertEqual(d['id'], m.id) + self.assertEqual(d['text'], m.text) def test_creating_a_model_instance_from_an_invalid_dict(self): d = {'id': utils.new_uuid(), 'text': utils.new_uuid(), 'extra': None} @@ -49,4 +49,4 @@ class TestModelDictMixin(unit.BaseTestCase): expected = {'id': utils.new_uuid(), 'text': utils.new_uuid()} m = TestModel(id=expected['id'], text=expected['text']) m.extra = 'this should not be in the dictionary' - self.assertEqual(m.to_dict(), expected) + self.assertEqual(expected, m.to_dict()) diff --git a/keystone-moon/keystone/tests/unit/common/test_utils.py b/keystone-moon/keystone/tests/unit/common/test_utils.py index d52eb729..3641aacd 100644 --- a/keystone-moon/keystone/tests/unit/common/test_utils.py +++ b/keystone-moon/keystone/tests/unit/common/test_utils.py @@ -1,3 +1,4 @@ +# encoding: utf-8 # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -16,12 +17,13 @@ import uuid from oslo_config import cfg from oslo_config import fixture as config_fixture from oslo_serialization import jsonutils +import six from keystone.common import utils as common_utils from keystone import exception -from keystone import service from keystone.tests import unit from keystone.tests.unit import utils +from keystone.version import service CONF = cfg.CONF @@ -36,6 +38,38 @@ class UtilsTestCase(unit.BaseTestCase): super(UtilsTestCase, self).setUp() self.config_fixture = self.useFixture(config_fixture.Config(CONF)) + def test_resource_uuid(self): + uuid_str = '536e28c2017e405e89b25a1ed777b952' + self.assertEqual(uuid_str, common_utils.resource_uuid(uuid_str)) + + # Exact 64 length string. + uuid_str = ('536e28c2017e405e89b25a1ed777b952' + 'f13de678ac714bb1b7d1e9a007c10db5') + resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE + transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex + self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str)) + + # Non-ASCII character test. + non_ascii_ = 'ß' * 32 + transformed_id = uuid.uuid5(resource_id_namespace, non_ascii_).hex + self.assertEqual(transformed_id, + common_utils.resource_uuid(non_ascii_)) + + # This input is invalid because it's length is more than 64. + invalid_input = 'x' * 65 + self.assertRaises(ValueError, common_utils.resource_uuid, + invalid_input) + + # 64 length unicode string, to mimic what is returned from mapping_id + # backend. + uuid_str = six.text_type('536e28c2017e405e89b25a1ed777b952' + 'f13de678ac714bb1b7d1e9a007c10db5') + resource_id_namespace = common_utils.RESOURCE_ID_NAMESPACE + if six.PY2: + uuid_str = uuid_str.encode('utf-8') + transformed_id = uuid.uuid5(resource_id_namespace, uuid_str).hex + self.assertEqual(transformed_id, common_utils.resource_uuid(uuid_str)) + def test_hash(self): password = 'right' wrong = 'wrongwrong' # Two wrongs don't make a right @@ -153,6 +187,18 @@ class UtilsTestCase(unit.BaseTestCase): expected_json = '{"field":"value"}' self.assertEqual(expected_json, json) + def test_url_safe_check(self): + base_str = 'i am safe' + self.assertFalse(common_utils.is_not_url_safe(base_str)) + for i in common_utils.URL_RESERVED_CHARS: + self.assertTrue(common_utils.is_not_url_safe(base_str + i)) + + def test_url_safe_with_unicode_check(self): + base_str = u'i am \xe7afe' + self.assertFalse(common_utils.is_not_url_safe(base_str)) + for i in common_utils.URL_RESERVED_CHARS: + self.assertTrue(common_utils.is_not_url_safe(base_str + i)) + class ServiceHelperTests(unit.BaseTestCase): |