diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common/test_notifications.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/common/test_notifications.py | 974 |
1 files changed, 974 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py new file mode 100644 index 00000000..55dd556d --- /dev/null +++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py @@ -0,0 +1,974 @@ +# Copyright 2013 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import uuid + +import mock +from oslo_config import cfg +from oslo_config import fixture as config_fixture +from oslotest import mockpatch +from pycadf import cadftaxonomy +from pycadf import cadftype +from pycadf import eventfactory +from pycadf import resource as cadfresource +import testtools + +from keystone.common import dependency +from keystone import notifications +from keystone.tests.unit import test_v3 + + +CONF = cfg.CONF + +EXP_RESOURCE_TYPE = uuid.uuid4().hex +CREATED_OPERATION = notifications.ACTIONS.created +UPDATED_OPERATION = notifications.ACTIONS.updated +DELETED_OPERATION = notifications.ACTIONS.deleted +DISABLED_OPERATION = notifications.ACTIONS.disabled + + +class ArbitraryException(Exception): + pass + + +def register_callback(operation, resource_type=EXP_RESOURCE_TYPE): + """Helper for creating and registering a mock callback. + + """ + callback = mock.Mock(__name__='callback', + im_class=mock.Mock(__name__='class')) + notifications.register_event_callback(operation, resource_type, callback) + return callback + + +class AuditNotificationsTestCase(testtools.TestCase): + def setUp(self): + super(AuditNotificationsTestCase, self).setUp() + self.config_fixture = self.useFixture(config_fixture.Config(CONF)) + self.addCleanup(notifications.clear_subscribers) + + def _test_notification_operation(self, notify_function, operation): + exp_resource_id = uuid.uuid4().hex + callback = register_callback(operation) + notify_function(EXP_RESOURCE_TYPE, exp_resource_id) + callback.assert_called_once_with('identity', EXP_RESOURCE_TYPE, + operation, + {'resource_info': exp_resource_id}) + self.config_fixture.config(notification_format='cadf') + with mock.patch( + 'keystone.notifications._create_cadf_payload') as cadf_notify: + notify_function(EXP_RESOURCE_TYPE, exp_resource_id) + initiator = None + cadf_notify.assert_called_once_with( + operation, EXP_RESOURCE_TYPE, exp_resource_id, + notifications.taxonomy.OUTCOME_SUCCESS, initiator) + notify_function(EXP_RESOURCE_TYPE, exp_resource_id, public=False) + cadf_notify.assert_called_once_with( + operation, EXP_RESOURCE_TYPE, exp_resource_id, + notifications.taxonomy.OUTCOME_SUCCESS, initiator) + + def test_resource_created_notification(self): + self._test_notification_operation(notifications.Audit.created, + CREATED_OPERATION) + + def test_resource_updated_notification(self): + self._test_notification_operation(notifications.Audit.updated, + UPDATED_OPERATION) + + def test_resource_deleted_notification(self): + self._test_notification_operation(notifications.Audit.deleted, + DELETED_OPERATION) + + def test_resource_disabled_notification(self): + self._test_notification_operation(notifications.Audit.disabled, + DISABLED_OPERATION) + + +class NotificationsWrapperTestCase(testtools.TestCase): + def create_fake_ref(self): + resource_id = uuid.uuid4().hex + return resource_id, { + 'id': resource_id, + 'key': uuid.uuid4().hex + } + + @notifications.created(EXP_RESOURCE_TYPE) + def create_resource(self, resource_id, data): + return data + + def test_resource_created_notification(self): + exp_resource_id, data = self.create_fake_ref() + callback = register_callback(CREATED_OPERATION) + + self.create_resource(exp_resource_id, data) + callback.assert_called_with('identity', EXP_RESOURCE_TYPE, + CREATED_OPERATION, + {'resource_info': exp_resource_id}) + + @notifications.updated(EXP_RESOURCE_TYPE) + def update_resource(self, resource_id, data): + return data + + def test_resource_updated_notification(self): + exp_resource_id, data = self.create_fake_ref() + callback = register_callback(UPDATED_OPERATION) + + self.update_resource(exp_resource_id, data) + callback.assert_called_with('identity', EXP_RESOURCE_TYPE, + UPDATED_OPERATION, + {'resource_info': exp_resource_id}) + + @notifications.deleted(EXP_RESOURCE_TYPE) + def delete_resource(self, resource_id): + pass + + def test_resource_deleted_notification(self): + exp_resource_id = uuid.uuid4().hex + callback = register_callback(DELETED_OPERATION) + + self.delete_resource(exp_resource_id) + callback.assert_called_with('identity', EXP_RESOURCE_TYPE, + DELETED_OPERATION, + {'resource_info': exp_resource_id}) + + @notifications.created(EXP_RESOURCE_TYPE) + def create_exception(self, resource_id): + raise ArbitraryException() + + def test_create_exception_without_notification(self): + callback = register_callback(CREATED_OPERATION) + self.assertRaises( + ArbitraryException, self.create_exception, uuid.uuid4().hex) + self.assertFalse(callback.called) + + @notifications.created(EXP_RESOURCE_TYPE) + def update_exception(self, resource_id): + raise ArbitraryException() + + def test_update_exception_without_notification(self): + callback = register_callback(UPDATED_OPERATION) + self.assertRaises( + ArbitraryException, self.update_exception, uuid.uuid4().hex) + self.assertFalse(callback.called) + + @notifications.deleted(EXP_RESOURCE_TYPE) + def delete_exception(self, resource_id): + raise ArbitraryException() + + def test_delete_exception_without_notification(self): + callback = register_callback(DELETED_OPERATION) + self.assertRaises( + ArbitraryException, self.delete_exception, uuid.uuid4().hex) + self.assertFalse(callback.called) + + +class NotificationsTestCase(testtools.TestCase): + def setUp(self): + super(NotificationsTestCase, self).setUp() + + # these should use self.config_fixture.config(), but they haven't + # been registered yet + CONF.rpc_backend = 'fake' + CONF.notification_driver = ['fake'] + + def test_send_notification(self): + """Test the private method _send_notification to ensure event_type, + payload, and context are built and passed properly. + """ + resource = uuid.uuid4().hex + resource_type = EXP_RESOURCE_TYPE + operation = CREATED_OPERATION + + # NOTE(ldbragst): Even though notifications._send_notification doesn't + # contain logic that creates cases, this is supposed to test that + # context is always empty and that we ensure the resource ID of the + # resource in the notification is contained in the payload. It was + # agreed that context should be empty in Keystone's case, which is + # also noted in the /keystone/notifications.py module. This test + # ensures and maintains these conditions. + expected_args = [ + {}, # empty context + 'identity.%s.created' % resource_type, # event_type + {'resource_info': resource}, # payload + 'INFO', # priority is always INFO... + ] + + with mock.patch.object(notifications._get_notifier(), + '_notify') as mocked: + notifications._send_notification(operation, resource_type, + resource) + mocked.assert_called_once_with(*expected_args) + + +class BaseNotificationTest(test_v3.RestfulTestCase): + + def setUp(self): + super(BaseNotificationTest, self).setUp() + + self._notifications = [] + self._audits = [] + + def fake_notify(operation, resource_type, resource_id, + public=True): + note = { + 'resource_id': resource_id, + 'operation': operation, + 'resource_type': resource_type, + 'send_notification_called': True, + 'public': public} + self._notifications.append(note) + + self.useFixture(mockpatch.PatchObject( + notifications, '_send_notification', fake_notify)) + + def fake_audit(action, initiator, outcome, target, + event_type, **kwargs): + service_security = cadftaxonomy.SERVICE_SECURITY + + event = eventfactory.EventFactory().new_event( + eventType=cadftype.EVENTTYPE_ACTIVITY, + outcome=outcome, + action=action, + initiator=initiator, + target=target, + observer=cadfresource.Resource(typeURI=service_security)) + + for key, value in kwargs.items(): + setattr(event, key, value) + + audit = { + 'payload': event.as_dict(), + 'event_type': event_type, + 'send_notification_called': True} + self._audits.append(audit) + + self.useFixture(mockpatch.PatchObject( + notifications, '_send_audit_notification', fake_audit)) + + def _assert_last_note(self, resource_id, operation, resource_type): + # NOTE(stevemar): If 'basic' format is not used, then simply + # return since this assertion is not valid. + if CONF.notification_format != 'basic': + return + self.assertTrue(len(self._notifications) > 0) + note = self._notifications[-1] + self.assertEqual(note['operation'], operation) + self.assertEqual(note['resource_id'], resource_id) + self.assertEqual(note['resource_type'], resource_type) + self.assertTrue(note['send_notification_called']) + + def _assert_last_audit(self, resource_id, operation, resource_type, + target_uri): + # NOTE(stevemar): If 'cadf' format is not used, then simply + # return since this assertion is not valid. + if CONF.notification_format != 'cadf': + return + self.assertTrue(len(self._audits) > 0) + audit = self._audits[-1] + payload = audit['payload'] + self.assertEqual(resource_id, payload['resource_info']) + action = '%s.%s' % (operation, resource_type) + self.assertEqual(action, payload['action']) + self.assertEqual(target_uri, payload['target']['typeURI']) + self.assertEqual(resource_id, payload['target']['id']) + event_type = '%s.%s.%s' % ('identity', resource_type, operation) + self.assertEqual(event_type, audit['event_type']) + self.assertTrue(audit['send_notification_called']) + + def _assert_notify_not_sent(self, resource_id, operation, resource_type, + public=True): + unexpected = { + 'resource_id': resource_id, + 'operation': operation, + 'resource_type': resource_type, + 'send_notification_called': True, + 'public': public} + for note in self._notifications: + self.assertNotEqual(unexpected, note) + + def _assert_notify_sent(self, resource_id, operation, resource_type, + public=True): + expected = { + 'resource_id': resource_id, + 'operation': operation, + 'resource_type': resource_type, + 'send_notification_called': True, + 'public': public} + for note in self._notifications: + if expected == note: + break + else: + self.fail("Notification not sent.") + + +class NotificationsForEntities(BaseNotificationTest): + + def test_create_group(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group') + self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group', + cadftaxonomy.SECURITY_GROUP) + + def test_create_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self._assert_last_note( + project_ref['id'], CREATED_OPERATION, 'project') + self._assert_last_audit(project_ref['id'], CREATED_OPERATION, + 'project', cadftaxonomy.SECURITY_PROJECT) + + def test_create_role(self): + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') + self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_create_user(self): + user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user') + self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_create_trust(self): + trustor = self.new_user_ref(domain_id=self.domain_id) + trustor = self.identity_api.create_user(trustor) + trustee = self.new_user_ref(domain_id=self.domain_id) + trustee = self.identity_api.create_user(trustee) + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + trust_ref = self.new_trust_ref(trustor['id'], + trustee['id']) + self.trust_api.create_trust(trust_ref['id'], + trust_ref, + [role_ref]) + self._assert_last_note( + trust_ref['id'], CREATED_OPERATION, 'OS-TRUST:trust') + self._assert_last_audit(trust_ref['id'], CREATED_OPERATION, + 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) + + def test_delete_group(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self.identity_api.delete_group(group_ref['id']) + self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group') + self._assert_last_audit(group_ref['id'], DELETED_OPERATION, 'group', + cadftaxonomy.SECURITY_GROUP) + + def test_delete_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assignment_api.delete_project(project_ref['id']) + self._assert_last_note( + project_ref['id'], DELETED_OPERATION, 'project') + self._assert_last_audit(project_ref['id'], DELETED_OPERATION, + 'project', cadftaxonomy.SECURITY_PROJECT) + + def test_delete_role(self): + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self.role_api.delete_role(role_ref['id']) + self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role') + self._assert_last_audit(role_ref['id'], DELETED_OPERATION, 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_delete_user(self): + user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + self.identity_api.delete_user(user_ref['id']) + self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user') + self._assert_last_audit(user_ref['id'], DELETED_OPERATION, 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_create_domain(self): + domain_ref = self.new_domain_ref() + self.resource_api.create_domain(domain_ref['id'], domain_ref) + self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain') + self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + def test_update_domain(self): + domain_ref = self.new_domain_ref() + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + domain_ref['description'] = uuid.uuid4().hex + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self._assert_last_note(domain_ref['id'], UPDATED_OPERATION, 'domain') + self._assert_last_audit(domain_ref['id'], UPDATED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + def test_delete_domain(self): + domain_ref = self.new_domain_ref() + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + domain_ref['enabled'] = False + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self.assignment_api.delete_domain(domain_ref['id']) + self._assert_last_note(domain_ref['id'], DELETED_OPERATION, 'domain') + self._assert_last_audit(domain_ref['id'], DELETED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + def test_delete_trust(self): + trustor = self.new_user_ref(domain_id=self.domain_id) + trustor = self.identity_api.create_user(trustor) + trustee = self.new_user_ref(domain_id=self.domain_id) + trustee = self.identity_api.create_user(trustee) + role_ref = self.new_role_ref() + trust_ref = self.new_trust_ref(trustor['id'], trustee['id']) + self.trust_api.create_trust(trust_ref['id'], + trust_ref, + [role_ref]) + self.trust_api.delete_trust(trust_ref['id']) + self._assert_last_note( + trust_ref['id'], DELETED_OPERATION, 'OS-TRUST:trust') + self._assert_last_audit(trust_ref['id'], DELETED_OPERATION, + 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) + + def test_create_endpoint(self): + endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) + self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION, + 'endpoint') + self._assert_last_audit(endpoint_ref['id'], CREATED_OPERATION, + 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) + + def test_update_endpoint(self): + endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) + self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref) + self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION, + 'endpoint') + self._assert_last_audit(endpoint_ref['id'], UPDATED_OPERATION, + 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) + + def test_delete_endpoint(self): + endpoint_ref = self.new_endpoint_ref(service_id=self.service_id) + self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) + self.catalog_api.delete_endpoint(endpoint_ref['id']) + self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION, + 'endpoint') + self._assert_last_audit(endpoint_ref['id'], DELETED_OPERATION, + 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) + + def test_create_service(self): + service_ref = self.new_service_ref() + self.catalog_api.create_service(service_ref['id'], service_ref) + self._assert_notify_sent(service_ref['id'], CREATED_OPERATION, + 'service') + self._assert_last_audit(service_ref['id'], CREATED_OPERATION, + 'service', cadftaxonomy.SECURITY_SERVICE) + + def test_update_service(self): + service_ref = self.new_service_ref() + self.catalog_api.create_service(service_ref['id'], service_ref) + self.catalog_api.update_service(service_ref['id'], service_ref) + self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION, + 'service') + self._assert_last_audit(service_ref['id'], UPDATED_OPERATION, + 'service', cadftaxonomy.SECURITY_SERVICE) + + def test_delete_service(self): + service_ref = self.new_service_ref() + self.catalog_api.create_service(service_ref['id'], service_ref) + self.catalog_api.delete_service(service_ref['id']) + self._assert_notify_sent(service_ref['id'], DELETED_OPERATION, + 'service') + self._assert_last_audit(service_ref['id'], DELETED_OPERATION, + 'service', cadftaxonomy.SECURITY_SERVICE) + + def test_create_region(self): + region_ref = self.new_region_ref() + self.catalog_api.create_region(region_ref) + self._assert_notify_sent(region_ref['id'], CREATED_OPERATION, + 'region') + self._assert_last_audit(region_ref['id'], CREATED_OPERATION, + 'region', cadftaxonomy.SECURITY_REGION) + + def test_update_region(self): + region_ref = self.new_region_ref() + self.catalog_api.create_region(region_ref) + self.catalog_api.update_region(region_ref['id'], region_ref) + self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION, + 'region') + self._assert_last_audit(region_ref['id'], UPDATED_OPERATION, + 'region', cadftaxonomy.SECURITY_REGION) + + def test_delete_region(self): + region_ref = self.new_region_ref() + self.catalog_api.create_region(region_ref) + self.catalog_api.delete_region(region_ref['id']) + self._assert_notify_sent(region_ref['id'], DELETED_OPERATION, + 'region') + self._assert_last_audit(region_ref['id'], DELETED_OPERATION, + 'region', cadftaxonomy.SECURITY_REGION) + + def test_create_policy(self): + policy_ref = self.new_policy_ref() + self.policy_api.create_policy(policy_ref['id'], policy_ref) + self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION, + 'policy') + self._assert_last_audit(policy_ref['id'], CREATED_OPERATION, + 'policy', cadftaxonomy.SECURITY_POLICY) + + def test_update_policy(self): + policy_ref = self.new_policy_ref() + self.policy_api.create_policy(policy_ref['id'], policy_ref) + self.policy_api.update_policy(policy_ref['id'], policy_ref) + self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION, + 'policy') + self._assert_last_audit(policy_ref['id'], UPDATED_OPERATION, + 'policy', cadftaxonomy.SECURITY_POLICY) + + def test_delete_policy(self): + policy_ref = self.new_policy_ref() + self.policy_api.create_policy(policy_ref['id'], policy_ref) + self.policy_api.delete_policy(policy_ref['id']) + self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION, + 'policy') + self._assert_last_audit(policy_ref['id'], DELETED_OPERATION, + 'policy', cadftaxonomy.SECURITY_POLICY) + + def test_disable_domain(self): + domain_ref = self.new_domain_ref() + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + domain_ref['enabled'] = False + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self._assert_notify_sent(domain_ref['id'], 'disabled', 'domain', + public=False) + + def test_disable_of_disabled_domain_does_not_notify(self): + domain_ref = self.new_domain_ref() + domain_ref['enabled'] = False + self.assignment_api.create_domain(domain_ref['id'], domain_ref) + # The domain_ref above is not changed during the create process. We + # can use the same ref to perform the update. + self.assignment_api.update_domain(domain_ref['id'], domain_ref) + self._assert_notify_not_sent(domain_ref['id'], 'disabled', 'domain', + public=False) + + def test_update_group(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group_ref = self.identity_api.create_group(group_ref) + self.identity_api.update_group(group_ref['id'], group_ref) + self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group') + self._assert_last_audit(group_ref['id'], UPDATED_OPERATION, 'group', + cadftaxonomy.SECURITY_GROUP) + + def test_update_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_notify_sent( + project_ref['id'], UPDATED_OPERATION, 'project', public=True) + self._assert_last_audit(project_ref['id'], UPDATED_OPERATION, + 'project', cadftaxonomy.SECURITY_PROJECT) + + def test_disable_project(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + project_ref['enabled'] = False + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_notify_sent(project_ref['id'], 'disabled', 'project', + public=False) + + def test_disable_of_disabled_project_does_not_notify(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + project_ref['enabled'] = False + self.assignment_api.create_project(project_ref['id'], project_ref) + # The project_ref above is not changed during the create process. We + # can use the same ref to perform the update. + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project', + public=False) + + def test_update_project_does_not_send_disable(self): + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + project_ref['enabled'] = True + self.assignment_api.update_project(project_ref['id'], project_ref) + self._assert_last_note( + project_ref['id'], UPDATED_OPERATION, 'project') + self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project') + + def test_update_role(self): + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + self.role_api.update_role(role_ref['id'], role_ref) + self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role') + self._assert_last_audit(role_ref['id'], UPDATED_OPERATION, 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_update_user(self): + user_ref = self.new_user_ref(domain_id=self.domain_id) + user_ref = self.identity_api.create_user(user_ref) + self.identity_api.update_user(user_ref['id'], user_ref) + self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user') + self._assert_last_audit(user_ref['id'], UPDATED_OPERATION, 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_config_option_no_events(self): + self.config_fixture.config(notification_format='basic') + role_ref = self.new_role_ref() + self.role_api.create_role(role_ref['id'], role_ref) + # The regular notifications will still be emitted, since they are + # used for callback handling. + self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') + # No audit event should have occurred + self.assertEqual(0, len(self._audits)) + + +class CADFNotificationsForEntities(NotificationsForEntities): + + def setUp(self): + super(CADFNotificationsForEntities, self).setUp() + self.config_fixture.config(notification_format='cadf') + + def test_initiator_data_is_set(self): + ref = self.new_domain_ref() + resp = self.post('/domains', body={'domain': ref}) + resource_id = resp.result.get('domain').get('id') + self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain', + cadftaxonomy.SECURITY_DOMAIN) + self.assertTrue(len(self._audits) > 0) + audit = self._audits[-1] + payload = audit['payload'] + self.assertEqual(self.user_id, payload['initiator']['id']) + self.assertEqual(self.project_id, payload['initiator']['project_id']) + + +class TestEventCallbacks(test_v3.RestfulTestCase): + + def setUp(self): + super(TestEventCallbacks, self).setUp() + self.has_been_called = False + + def _project_deleted_callback(self, service, resource_type, operation, + payload): + self.has_been_called = True + + def _project_created_callback(self, service, resource_type, operation, + payload): + self.has_been_called = True + + def test_notification_received(self): + callback = register_callback(CREATED_OPERATION, 'project') + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assertTrue(callback.called) + + def test_notification_method_not_callable(self): + fake_method = None + self.assertRaises(TypeError, + notifications.register_event_callback, + UPDATED_OPERATION, + 'project', + [fake_method]) + + def test_notification_event_not_valid(self): + self.assertRaises(ValueError, + notifications.register_event_callback, + uuid.uuid4().hex, + 'project', + self._project_deleted_callback) + + def test_event_registration_for_unknown_resource_type(self): + # Registration for unknown resource types should succeed. If no event + # is issued for that resource type, the callback wont be triggered. + notifications.register_event_callback(DELETED_OPERATION, + uuid.uuid4().hex, + self._project_deleted_callback) + resource_type = uuid.uuid4().hex + notifications.register_event_callback(DELETED_OPERATION, + resource_type, + self._project_deleted_callback) + + def test_provider_event_callbacks_subscription(self): + callback_called = [] + + @dependency.provider('foo_api') + class Foo(object): + def __init__(self): + self.event_callbacks = { + CREATED_OPERATION: {'project': [self.foo_callback]}} + + def foo_callback(self, service, resource_type, operation, + payload): + # uses callback_called from the closure + callback_called.append(True) + + Foo() + project_ref = self.new_project_ref(domain_id=self.domain_id) + self.assignment_api.create_project(project_ref['id'], project_ref) + self.assertEqual([True], callback_called) + + def test_invalid_event_callbacks(self): + @dependency.provider('foo_api') + class Foo(object): + def __init__(self): + self.event_callbacks = 'bogus' + + self.assertRaises(ValueError, Foo) + + def test_invalid_event_callbacks_event(self): + @dependency.provider('foo_api') + class Foo(object): + def __init__(self): + self.event_callbacks = {CREATED_OPERATION: 'bogus'} + + self.assertRaises(ValueError, Foo) + + +class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): + + LOCAL_HOST = 'localhost' + ACTION = 'authenticate' + ROLE_ASSIGNMENT = 'role_assignment' + + def setUp(self): + super(CadfNotificationsWrapperTestCase, self).setUp() + self._notifications = [] + + def fake_notify(action, initiator, outcome, target, + event_type, **kwargs): + service_security = cadftaxonomy.SERVICE_SECURITY + + event = eventfactory.EventFactory().new_event( + eventType=cadftype.EVENTTYPE_ACTIVITY, + outcome=outcome, + action=action, + initiator=initiator, + target=target, + observer=cadfresource.Resource(typeURI=service_security)) + + for key, value in kwargs.items(): + setattr(event, key, value) + + note = { + 'action': action, + 'initiator': initiator, + 'event': event, + 'send_notification_called': True} + self._notifications.append(note) + + self.useFixture(mockpatch.PatchObject( + notifications, '_send_audit_notification', fake_notify)) + + def _assert_last_note(self, action, user_id): + self.assertTrue(self._notifications) + note = self._notifications[-1] + self.assertEqual(note['action'], action) + initiator = note['initiator'] + self.assertEqual(initiator.id, user_id) + self.assertEqual(initiator.host.address, self.LOCAL_HOST) + self.assertTrue(note['send_notification_called']) + + def _assert_event(self, role_id, project=None, domain=None, + user=None, group=None, inherit=False): + """Assert that the CADF event is valid. + + In the case of role assignments, the event will have extra data, + specifically, the role, target, actor, and if the role is inherited. + + An example event, as a dictionary is seen below: + { + 'typeURI': 'http://schemas.dmtf.org/cloud/audit/1.0/event', + 'initiator': { + 'typeURI': 'service/security/account/user', + 'host': {'address': 'localhost'}, + 'id': 'openstack:0a90d95d-582c-4efb-9cbc-e2ca7ca9c341', + 'name': u'bccc2d9bfc2a46fd9e33bcf82f0b5c21' + }, + 'target': { + 'typeURI': 'service/security/account/user', + 'id': 'openstack:d48ea485-ef70-4f65-8d2b-01aa9d7ec12d' + }, + 'observer': { + 'typeURI': 'service/security', + 'id': 'openstack:d51dd870-d929-4aba-8d75-dcd7555a0c95' + }, + 'eventType': 'activity', + 'eventTime': '2014-08-21T21:04:56.204536+0000', + 'role': u'0e6b990380154a2599ce6b6e91548a68', + 'domain': u'24bdcff1aab8474895dbaac509793de1', + 'inherited_to_projects': False, + 'group': u'c1e22dc67cbd469ea0e33bf428fe597a', + 'action': 'created.role_assignment', + 'outcome': 'success', + 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1' + } + """ + + note = self._notifications[-1] + event = note['event'] + if project: + self.assertEqual(project, event.project) + if domain: + self.assertEqual(domain, event.domain) + if user: + self.assertEqual(user, event.user) + if group: + self.assertEqual(group, event.group) + self.assertEqual(role_id, event.role) + self.assertEqual(inherit, event.inherited_to_projects) + + def test_v3_authenticate_user_name_and_domain_id(self): + user_id = self.user_id + user_name = self.user['name'] + password = self.user['password'] + domain_id = self.domain_id + data = self.build_authentication_request(username=user_name, + user_domain_id=domain_id, + password=password) + self.post('/auth/tokens', body=data) + self._assert_last_note(self.ACTION, user_id) + + def test_v3_authenticate_user_id(self): + user_id = self.user_id + password = self.user['password'] + data = self.build_authentication_request(user_id=user_id, + password=password) + self.post('/auth/tokens', body=data) + self._assert_last_note(self.ACTION, user_id) + + def test_v3_authenticate_user_name_and_domain_name(self): + user_id = self.user_id + user_name = self.user['name'] + password = self.user['password'] + domain_name = self.domain['name'] + data = self.build_authentication_request(username=user_name, + user_domain_name=domain_name, + password=password) + self.post('/auth/tokens', body=data) + self._assert_last_note(self.ACTION, user_id) + + def _test_role_assignment(self, url, role, project=None, domain=None, + user=None, group=None): + self.put(url) + action = "%s.%s" % (CREATED_OPERATION, self.ROLE_ASSIGNMENT) + self._assert_last_note(action, self.user_id) + self._assert_event(role, project, domain, user, group) + self.delete(url) + action = "%s.%s" % (DELETED_OPERATION, self.ROLE_ASSIGNMENT) + self._assert_last_note(action, self.user_id) + self._assert_event(role, project, domain, user, group) + + def test_user_project_grant(self): + url = ('/projects/%s/users/%s/roles/%s' % + (self.project_id, self.user_id, self.role_id)) + self._test_role_assignment(url, self.role_id, + project=self.project_id, + user=self.user_id) + + def test_group_domain_grant(self): + group_ref = self.new_group_ref(domain_id=self.domain_id) + group = self.identity_api.create_group(group_ref) + url = ('/domains/%s/groups/%s/roles/%s' % + (self.domain_id, group['id'], self.role_id)) + self._test_role_assignment(url, self.role_id, + domain=self.domain_id, + group=group['id']) + + +class TestCallbackRegistration(testtools.TestCase): + def setUp(self): + super(TestCallbackRegistration, self).setUp() + self.mock_log = mock.Mock() + # Force the callback logging to occur + self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG + + def verify_log_message(self, data): + """Tests that use this are a little brittle because adding more + logging can break them. + + TODO(dstanek): remove the need for this in a future refactoring + + """ + log_fn = self.mock_log.debug + self.assertEqual(len(data), log_fn.call_count) + for datum in data: + log_fn.assert_any_call(mock.ANY, datum) + + def test_a_function_callback(self): + def callback(*args, **kwargs): + pass + + resource_type = 'thing' + with mock.patch('keystone.notifications.LOG', self.mock_log): + notifications.register_event_callback( + CREATED_OPERATION, resource_type, callback) + + callback = 'keystone.tests.unit.common.test_notifications.callback' + expected_log_data = { + 'callback': callback, + 'event': 'identity.%s.created' % resource_type + } + self.verify_log_message([expected_log_data]) + + def test_a_method_callback(self): + class C(object): + def callback(self, *args, **kwargs): + pass + + with mock.patch('keystone.notifications.LOG', self.mock_log): + notifications.register_event_callback( + CREATED_OPERATION, 'thing', C.callback) + + callback = 'keystone.tests.unit.common.test_notifications.C.callback' + expected_log_data = { + 'callback': callback, + 'event': 'identity.thing.created' + } + self.verify_log_message([expected_log_data]) + + def test_a_list_of_callbacks(self): + def callback(*args, **kwargs): + pass + + class C(object): + def callback(self, *args, **kwargs): + pass + + with mock.patch('keystone.notifications.LOG', self.mock_log): + notifications.register_event_callback( + CREATED_OPERATION, 'thing', [callback, C.callback]) + + callback_1 = 'keystone.tests.unit.common.test_notifications.callback' + callback_2 = 'keystone.tests.unit.common.test_notifications.C.callback' + expected_log_data = [ + { + 'callback': callback_1, + 'event': 'identity.thing.created' + }, + { + 'callback': callback_2, + 'event': 'identity.thing.created' + }, + ] + self.verify_log_message(expected_log_data) + + def test_an_invalid_callback(self): + self.assertRaises(TypeError, + notifications.register_event_callback, + (CREATED_OPERATION, 'thing', object())) + + def test_an_invalid_event(self): + def callback(*args, **kwargs): + pass + + self.assertRaises(ValueError, + notifications.register_event_callback, + uuid.uuid4().hex, + 'thing', + callback) |