diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/common/test_notifications.py | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common/test_notifications.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/common/test_notifications.py | 1248 |
1 files changed, 0 insertions, 1248 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py deleted file mode 100644 index aa2e6f72..00000000 --- a/keystone-moon/keystone/tests/unit/common/test_notifications.py +++ /dev/null @@ -1,1248 +0,0 @@ -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import uuid - -import mock -from oslo_config import cfg -from oslo_config import fixture as config_fixture -from oslotest import mockpatch -from pycadf import cadftaxonomy -from pycadf import cadftype -from pycadf import eventfactory -from pycadf import resource as cadfresource - -from keystone import notifications -from keystone.tests import unit -from keystone.tests.unit import test_v3 - - -CONF = cfg.CONF - -EXP_RESOURCE_TYPE = uuid.uuid4().hex -CREATED_OPERATION = notifications.ACTIONS.created -UPDATED_OPERATION = notifications.ACTIONS.updated -DELETED_OPERATION = notifications.ACTIONS.deleted -DISABLED_OPERATION = notifications.ACTIONS.disabled - - -class ArbitraryException(Exception): - pass - - -def register_callback(operation, resource_type=EXP_RESOURCE_TYPE): - """Helper for creating and registering a mock callback.""" - callback = mock.Mock(__name__='callback', - im_class=mock.Mock(__name__='class')) - notifications.register_event_callback(operation, resource_type, callback) - return callback - - -class AuditNotificationsTestCase(unit.BaseTestCase): - def setUp(self): - super(AuditNotificationsTestCase, self).setUp() - self.config_fixture = self.useFixture(config_fixture.Config(CONF)) - self.addCleanup(notifications.clear_subscribers) - - def _test_notification_operation(self, notify_function, operation): - exp_resource_id = uuid.uuid4().hex - callback = register_callback(operation) - notify_function(EXP_RESOURCE_TYPE, exp_resource_id) - callback.assert_called_once_with('identity', EXP_RESOURCE_TYPE, - operation, - {'resource_info': exp_resource_id}) - self.config_fixture.config(notification_format='cadf') - with mock.patch( - 'keystone.notifications._create_cadf_payload') as cadf_notify: - notify_function(EXP_RESOURCE_TYPE, exp_resource_id) - initiator = None - cadf_notify.assert_called_once_with( - operation, EXP_RESOURCE_TYPE, exp_resource_id, - notifications.taxonomy.OUTCOME_SUCCESS, initiator) - notify_function(EXP_RESOURCE_TYPE, exp_resource_id, public=False) - cadf_notify.assert_called_once_with( - operation, EXP_RESOURCE_TYPE, exp_resource_id, - notifications.taxonomy.OUTCOME_SUCCESS, initiator) - - def test_resource_created_notification(self): - self._test_notification_operation(notifications.Audit.created, - CREATED_OPERATION) - - def test_resource_updated_notification(self): - self._test_notification_operation(notifications.Audit.updated, - UPDATED_OPERATION) - - def test_resource_deleted_notification(self): - self._test_notification_operation(notifications.Audit.deleted, - DELETED_OPERATION) - - def test_resource_disabled_notification(self): - self._test_notification_operation(notifications.Audit.disabled, - DISABLED_OPERATION) - - -class NotificationsTestCase(unit.BaseTestCase): - - def test_send_notification(self): - """Test _send_notification. - - Test the private method _send_notification to ensure event_type, - payload, and context are built and passed properly. - - """ - resource = uuid.uuid4().hex - resource_type = EXP_RESOURCE_TYPE - operation = CREATED_OPERATION - - # NOTE(ldbragst): Even though notifications._send_notification doesn't - # contain logic that creates cases, this is supposed to test that - # context is always empty and that we ensure the resource ID of the - # resource in the notification is contained in the payload. It was - # agreed that context should be empty in Keystone's case, which is - # also noted in the /keystone/notifications.py module. This test - # ensures and maintains these conditions. - expected_args = [ - {}, # empty context - 'identity.%s.created' % resource_type, # event_type - {'resource_info': resource}, # payload - 'INFO', # priority is always INFO... - ] - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - notifications._send_notification(operation, resource_type, - resource) - mocked.assert_called_once_with(*expected_args) - - def test_send_notification_with_opt_out(self): - """Test the private method _send_notification with opt-out. - - Test that _send_notification does not notify when a valid - notification_opt_out configuration is provided. - """ - resource = uuid.uuid4().hex - resource_type = EXP_RESOURCE_TYPE - operation = CREATED_OPERATION - event_type = 'identity.%s.created' % resource_type - - # NOTE(diazjf): Here we add notification_opt_out to the - # configuration so that we should return before _get_notifer is - # called. This is because we are opting out notifications for the - # passed resource_type and operation. - conf = self.useFixture(config_fixture.Config(CONF)) - conf.config(notification_opt_out=event_type) - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - - notifications._send_notification(operation, resource_type, - resource) - mocked.assert_not_called() - - def test_send_audit_notification_with_opt_out(self): - """Test the private method _send_audit_notification with opt-out. - - Test that _send_audit_notification does not notify when a valid - notification_opt_out configuration is provided. - """ - resource_type = EXP_RESOURCE_TYPE - - action = CREATED_OPERATION + '.' + resource_type - initiator = mock - target = mock - outcome = 'success' - event_type = 'identity.%s.created' % resource_type - - conf = self.useFixture(config_fixture.Config(CONF)) - conf.config(notification_opt_out=event_type) - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - - notifications._send_audit_notification(action, - initiator, - outcome, - target, - event_type) - mocked.assert_not_called() - - def test_opt_out_authenticate_event(self): - """Test that authenticate events are successfully opted out.""" - resource_type = EXP_RESOURCE_TYPE - - action = CREATED_OPERATION + '.' + resource_type - initiator = mock - target = mock - outcome = 'success' - event_type = 'identity.authenticate' - meter_name = '%s.%s' % (event_type, outcome) - - conf = self.useFixture(config_fixture.Config(CONF)) - conf.config(notification_opt_out=meter_name) - - with mock.patch.object(notifications._get_notifier(), - '_notify') as mocked: - - notifications._send_audit_notification(action, - initiator, - outcome, - target, - event_type) - mocked.assert_not_called() - - -class BaseNotificationTest(test_v3.RestfulTestCase): - - def setUp(self): - super(BaseNotificationTest, self).setUp() - - self._notifications = [] - self._audits = [] - - def fake_notify(operation, resource_type, resource_id, - actor_dict=None, public=True): - note = { - 'resource_id': resource_id, - 'operation': operation, - 'resource_type': resource_type, - 'send_notification_called': True, - 'public': public} - if actor_dict: - note['actor_id'] = actor_dict.get('id') - note['actor_type'] = actor_dict.get('type') - note['actor_operation'] = actor_dict.get('actor_operation') - self._notifications.append(note) - - self.useFixture(mockpatch.PatchObject( - notifications, '_send_notification', fake_notify)) - - def fake_audit(action, initiator, outcome, target, - event_type, **kwargs): - service_security = cadftaxonomy.SERVICE_SECURITY - - event = eventfactory.EventFactory().new_event( - eventType=cadftype.EVENTTYPE_ACTIVITY, - outcome=outcome, - action=action, - initiator=initiator, - target=target, - observer=cadfresource.Resource(typeURI=service_security)) - - for key, value in kwargs.items(): - setattr(event, key, value) - - audit = { - 'payload': event.as_dict(), - 'event_type': event_type, - 'send_notification_called': True} - self._audits.append(audit) - - self.useFixture(mockpatch.PatchObject( - notifications, '_send_audit_notification', fake_audit)) - - def _assert_last_note(self, resource_id, operation, resource_type, - actor_id=None, actor_type=None, - actor_operation=None): - # NOTE(stevemar): If 'basic' format is not used, then simply - # return since this assertion is not valid. - if CONF.notification_format != 'basic': - return - self.assertTrue(len(self._notifications) > 0) - note = self._notifications[-1] - self.assertEqual(operation, note['operation']) - self.assertEqual(resource_id, note['resource_id']) - self.assertEqual(resource_type, note['resource_type']) - self.assertTrue(note['send_notification_called']) - if actor_id: - self.assertEqual(actor_id, note['actor_id']) - self.assertEqual(actor_type, note['actor_type']) - self.assertEqual(actor_operation, note['actor_operation']) - - def _assert_last_audit(self, resource_id, operation, resource_type, - target_uri): - # NOTE(stevemar): If 'cadf' format is not used, then simply - # return since this assertion is not valid. - if CONF.notification_format != 'cadf': - return - self.assertTrue(len(self._audits) > 0) - audit = self._audits[-1] - payload = audit['payload'] - self.assertEqual(resource_id, payload['resource_info']) - action = '%s.%s' % (operation, resource_type) - self.assertEqual(action, payload['action']) - self.assertEqual(target_uri, payload['target']['typeURI']) - self.assertEqual(resource_id, payload['target']['id']) - event_type = '%s.%s.%s' % ('identity', resource_type, operation) - self.assertEqual(event_type, audit['event_type']) - self.assertTrue(audit['send_notification_called']) - - def _assert_initiator_data_is_set(self, operation, resource_type, typeURI): - self.assertTrue(len(self._audits) > 0) - audit = self._audits[-1] - payload = audit['payload'] - self.assertEqual(self.user_id, payload['initiator']['id']) - self.assertEqual(self.project_id, payload['initiator']['project_id']) - self.assertEqual(typeURI, payload['target']['typeURI']) - action = '%s.%s' % (operation, resource_type) - self.assertEqual(action, payload['action']) - - def _assert_notify_not_sent(self, resource_id, operation, resource_type, - public=True): - unexpected = { - 'resource_id': resource_id, - 'operation': operation, - 'resource_type': resource_type, - 'send_notification_called': True, - 'public': public} - for note in self._notifications: - self.assertNotEqual(unexpected, note) - - def _assert_notify_sent(self, resource_id, operation, resource_type, - public=True): - expected = { - 'resource_id': resource_id, - 'operation': operation, - 'resource_type': resource_type, - 'send_notification_called': True, - 'public': public} - for note in self._notifications: - if expected == note: - break - else: - self.fail("Notification not sent.") - - -class NotificationsForEntities(BaseNotificationTest): - - def test_create_group(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self._assert_last_note(group_ref['id'], CREATED_OPERATION, 'group') - self._assert_last_audit(group_ref['id'], CREATED_OPERATION, 'group', - cadftaxonomy.SECURITY_GROUP) - - def test_create_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self._assert_last_note( - project_ref['id'], CREATED_OPERATION, 'project') - self._assert_last_audit(project_ref['id'], CREATED_OPERATION, - 'project', cadftaxonomy.SECURITY_PROJECT) - - def test_create_role(self): - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') - self._assert_last_audit(role_ref['id'], CREATED_OPERATION, 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_create_user(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - self._assert_last_note(user_ref['id'], CREATED_OPERATION, 'user') - self._assert_last_audit(user_ref['id'], CREATED_OPERATION, 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_create_trust(self): - trustor = unit.new_user_ref(domain_id=self.domain_id) - trustor = self.identity_api.create_user(trustor) - trustee = unit.new_user_ref(domain_id=self.domain_id) - trustee = self.identity_api.create_user(trustee) - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - trust_ref = unit.new_trust_ref(trustor['id'], - trustee['id']) - self.trust_api.create_trust(trust_ref['id'], - trust_ref, - [role_ref]) - self._assert_last_note( - trust_ref['id'], CREATED_OPERATION, 'OS-TRUST:trust') - self._assert_last_audit(trust_ref['id'], CREATED_OPERATION, - 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) - - def test_delete_group(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.delete_group(group_ref['id']) - self._assert_last_note(group_ref['id'], DELETED_OPERATION, 'group') - self._assert_last_audit(group_ref['id'], DELETED_OPERATION, 'group', - cadftaxonomy.SECURITY_GROUP) - - def test_delete_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.resource_api.delete_project(project_ref['id']) - self._assert_last_note( - project_ref['id'], DELETED_OPERATION, 'project') - self._assert_last_audit(project_ref['id'], DELETED_OPERATION, - 'project', cadftaxonomy.SECURITY_PROJECT) - - def test_delete_role(self): - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - self.role_api.delete_role(role_ref['id']) - self._assert_last_note(role_ref['id'], DELETED_OPERATION, 'role') - self._assert_last_audit(role_ref['id'], DELETED_OPERATION, 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_delete_user(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - self.identity_api.delete_user(user_ref['id']) - self._assert_last_note(user_ref['id'], DELETED_OPERATION, 'user') - self._assert_last_audit(user_ref['id'], DELETED_OPERATION, 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_create_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - self._assert_last_note(domain_ref['id'], CREATED_OPERATION, 'domain') - self._assert_last_audit(domain_ref['id'], CREATED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - def test_update_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - domain_ref['description'] = uuid.uuid4().hex - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self._assert_last_note(domain_ref['id'], UPDATED_OPERATION, 'domain') - self._assert_last_audit(domain_ref['id'], UPDATED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - def test_delete_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - domain_ref['enabled'] = False - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self.resource_api.delete_domain(domain_ref['id']) - self._assert_last_note(domain_ref['id'], DELETED_OPERATION, 'domain') - self._assert_last_audit(domain_ref['id'], DELETED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - def test_delete_trust(self): - trustor = unit.new_user_ref(domain_id=self.domain_id) - trustor = self.identity_api.create_user(trustor) - trustee = unit.new_user_ref(domain_id=self.domain_id) - trustee = self.identity_api.create_user(trustee) - role_ref = unit.new_role_ref() - trust_ref = unit.new_trust_ref(trustor['id'], trustee['id']) - self.trust_api.create_trust(trust_ref['id'], - trust_ref, - [role_ref]) - self.trust_api.delete_trust(trust_ref['id']) - self._assert_last_note( - trust_ref['id'], DELETED_OPERATION, 'OS-TRUST:trust') - self._assert_last_audit(trust_ref['id'], DELETED_OPERATION, - 'OS-TRUST:trust', cadftaxonomy.SECURITY_TRUST) - - def test_create_endpoint(self): - endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, - interface='public', - region_id=self.region_id) - self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) - self._assert_notify_sent(endpoint_ref['id'], CREATED_OPERATION, - 'endpoint') - self._assert_last_audit(endpoint_ref['id'], CREATED_OPERATION, - 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) - - def test_update_endpoint(self): - endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, - interface='public', - region_id=self.region_id) - self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) - self.catalog_api.update_endpoint(endpoint_ref['id'], endpoint_ref) - self._assert_notify_sent(endpoint_ref['id'], UPDATED_OPERATION, - 'endpoint') - self._assert_last_audit(endpoint_ref['id'], UPDATED_OPERATION, - 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) - - def test_delete_endpoint(self): - endpoint_ref = unit.new_endpoint_ref(service_id=self.service_id, - interface='public', - region_id=self.region_id) - self.catalog_api.create_endpoint(endpoint_ref['id'], endpoint_ref) - self.catalog_api.delete_endpoint(endpoint_ref['id']) - self._assert_notify_sent(endpoint_ref['id'], DELETED_OPERATION, - 'endpoint') - self._assert_last_audit(endpoint_ref['id'], DELETED_OPERATION, - 'endpoint', cadftaxonomy.SECURITY_ENDPOINT) - - def test_create_service(self): - service_ref = unit.new_service_ref() - self.catalog_api.create_service(service_ref['id'], service_ref) - self._assert_notify_sent(service_ref['id'], CREATED_OPERATION, - 'service') - self._assert_last_audit(service_ref['id'], CREATED_OPERATION, - 'service', cadftaxonomy.SECURITY_SERVICE) - - def test_update_service(self): - service_ref = unit.new_service_ref() - self.catalog_api.create_service(service_ref['id'], service_ref) - self.catalog_api.update_service(service_ref['id'], service_ref) - self._assert_notify_sent(service_ref['id'], UPDATED_OPERATION, - 'service') - self._assert_last_audit(service_ref['id'], UPDATED_OPERATION, - 'service', cadftaxonomy.SECURITY_SERVICE) - - def test_delete_service(self): - service_ref = unit.new_service_ref() - self.catalog_api.create_service(service_ref['id'], service_ref) - self.catalog_api.delete_service(service_ref['id']) - self._assert_notify_sent(service_ref['id'], DELETED_OPERATION, - 'service') - self._assert_last_audit(service_ref['id'], DELETED_OPERATION, - 'service', cadftaxonomy.SECURITY_SERVICE) - - def test_create_region(self): - region_ref = unit.new_region_ref() - self.catalog_api.create_region(region_ref) - self._assert_notify_sent(region_ref['id'], CREATED_OPERATION, - 'region') - self._assert_last_audit(region_ref['id'], CREATED_OPERATION, - 'region', cadftaxonomy.SECURITY_REGION) - - def test_update_region(self): - region_ref = unit.new_region_ref() - self.catalog_api.create_region(region_ref) - self.catalog_api.update_region(region_ref['id'], region_ref) - self._assert_notify_sent(region_ref['id'], UPDATED_OPERATION, - 'region') - self._assert_last_audit(region_ref['id'], UPDATED_OPERATION, - 'region', cadftaxonomy.SECURITY_REGION) - - def test_delete_region(self): - region_ref = unit.new_region_ref() - self.catalog_api.create_region(region_ref) - self.catalog_api.delete_region(region_ref['id']) - self._assert_notify_sent(region_ref['id'], DELETED_OPERATION, - 'region') - self._assert_last_audit(region_ref['id'], DELETED_OPERATION, - 'region', cadftaxonomy.SECURITY_REGION) - - def test_create_policy(self): - policy_ref = unit.new_policy_ref() - self.policy_api.create_policy(policy_ref['id'], policy_ref) - self._assert_notify_sent(policy_ref['id'], CREATED_OPERATION, - 'policy') - self._assert_last_audit(policy_ref['id'], CREATED_OPERATION, - 'policy', cadftaxonomy.SECURITY_POLICY) - - def test_update_policy(self): - policy_ref = unit.new_policy_ref() - self.policy_api.create_policy(policy_ref['id'], policy_ref) - self.policy_api.update_policy(policy_ref['id'], policy_ref) - self._assert_notify_sent(policy_ref['id'], UPDATED_OPERATION, - 'policy') - self._assert_last_audit(policy_ref['id'], UPDATED_OPERATION, - 'policy', cadftaxonomy.SECURITY_POLICY) - - def test_delete_policy(self): - policy_ref = unit.new_policy_ref() - self.policy_api.create_policy(policy_ref['id'], policy_ref) - self.policy_api.delete_policy(policy_ref['id']) - self._assert_notify_sent(policy_ref['id'], DELETED_OPERATION, - 'policy') - self._assert_last_audit(policy_ref['id'], DELETED_OPERATION, - 'policy', cadftaxonomy.SECURITY_POLICY) - - def test_disable_domain(self): - domain_ref = unit.new_domain_ref() - self.resource_api.create_domain(domain_ref['id'], domain_ref) - domain_ref['enabled'] = False - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self._assert_notify_sent(domain_ref['id'], 'disabled', 'domain', - public=False) - - def test_disable_of_disabled_domain_does_not_notify(self): - domain_ref = unit.new_domain_ref(enabled=False) - self.resource_api.create_domain(domain_ref['id'], domain_ref) - # The domain_ref above is not changed during the create process. We - # can use the same ref to perform the update. - self.resource_api.update_domain(domain_ref['id'], domain_ref) - self._assert_notify_not_sent(domain_ref['id'], 'disabled', 'domain', - public=False) - - def test_update_group(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.update_group(group_ref['id'], group_ref) - self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group') - self._assert_last_audit(group_ref['id'], UPDATED_OPERATION, 'group', - cadftaxonomy.SECURITY_GROUP) - - def test_update_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_notify_sent( - project_ref['id'], UPDATED_OPERATION, 'project', public=True) - self._assert_last_audit(project_ref['id'], UPDATED_OPERATION, - 'project', cadftaxonomy.SECURITY_PROJECT) - - def test_disable_project(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - project_ref['enabled'] = False - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_notify_sent(project_ref['id'], 'disabled', 'project', - public=False) - - def test_disable_of_disabled_project_does_not_notify(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id, - enabled=False) - self.resource_api.create_project(project_ref['id'], project_ref) - # The project_ref above is not changed during the create process. We - # can use the same ref to perform the update. - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project', - public=False) - - def test_update_project_does_not_send_disable(self): - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - project_ref['enabled'] = True - self.resource_api.update_project(project_ref['id'], project_ref) - self._assert_last_note( - project_ref['id'], UPDATED_OPERATION, 'project') - self._assert_notify_not_sent(project_ref['id'], 'disabled', 'project') - - def test_update_role(self): - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - self.role_api.update_role(role_ref['id'], role_ref) - self._assert_last_note(role_ref['id'], UPDATED_OPERATION, 'role') - self._assert_last_audit(role_ref['id'], UPDATED_OPERATION, 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_update_user(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - self.identity_api.update_user(user_ref['id'], user_ref) - self._assert_last_note(user_ref['id'], UPDATED_OPERATION, 'user') - self._assert_last_audit(user_ref['id'], UPDATED_OPERATION, 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_config_option_no_events(self): - self.config_fixture.config(notification_format='basic') - role_ref = unit.new_role_ref() - self.role_api.create_role(role_ref['id'], role_ref) - # The regular notifications will still be emitted, since they are - # used for callback handling. - self._assert_last_note(role_ref['id'], CREATED_OPERATION, 'role') - # No audit event should have occurred - self.assertEqual(0, len(self._audits)) - - def test_add_user_to_group(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.add_user_to_group(user_ref['id'], group_ref['id']) - self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group', - actor_id=user_ref['id'], actor_type='user', - actor_operation='added') - - def test_remove_user_from_group(self): - user_ref = unit.new_user_ref(domain_id=self.domain_id) - user_ref = self.identity_api.create_user(user_ref) - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group_ref = self.identity_api.create_group(group_ref) - self.identity_api.add_user_to_group(user_ref['id'], group_ref['id']) - self.identity_api.remove_user_from_group(user_ref['id'], - group_ref['id']) - self._assert_last_note(group_ref['id'], UPDATED_OPERATION, 'group', - actor_id=user_ref['id'], actor_type='user', - actor_operation='removed') - - -class CADFNotificationsForEntities(NotificationsForEntities): - - def setUp(self): - super(CADFNotificationsForEntities, self).setUp() - self.config_fixture.config(notification_format='cadf') - - def test_initiator_data_is_set(self): - ref = unit.new_domain_ref() - resp = self.post('/domains', body={'domain': ref}) - resource_id = resp.result.get('domain').get('id') - self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain', - cadftaxonomy.SECURITY_DOMAIN) - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'domain', - cadftaxonomy.SECURITY_DOMAIN) - - -class V2Notifications(BaseNotificationTest): - - def setUp(self): - super(V2Notifications, self).setUp() - self.config_fixture.config(notification_format='cadf') - - def test_user(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/users', - body={ - 'user': { - 'name': uuid.uuid4().hex, - 'password': uuid.uuid4().hex, - 'enabled': True, - }, - }, - token=token, - ) - user_id = resp.result.get('user').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - # test for delete user - self.admin_request( - method='DELETE', - path='/v2.0/users/%s' % user_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'user', - cadftaxonomy.SECURITY_ACCOUNT_USER) - - def test_role(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/OS-KSADM/roles', - body={ - 'role': { - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - }, - }, - token=token, - ) - role_id = resp.result.get('role').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'role', - cadftaxonomy.SECURITY_ROLE) - # test for delete role - self.admin_request( - method='DELETE', - path='/v2.0/OS-KSADM/roles/%s' % role_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'role', - cadftaxonomy.SECURITY_ROLE) - - def test_service_and_endpoint(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/OS-KSADM/services', - body={ - 'OS-KSADM:service': { - 'name': uuid.uuid4().hex, - 'type': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - }, - }, - token=token, - ) - service_id = resp.result.get('OS-KSADM:service').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'service', - cadftaxonomy.SECURITY_SERVICE) - resp = self.admin_request( - method='POST', - path='/v2.0/endpoints', - body={ - 'endpoint': { - 'region': uuid.uuid4().hex, - 'service_id': service_id, - 'publicurl': uuid.uuid4().hex, - 'adminurl': uuid.uuid4().hex, - 'internalurl': uuid.uuid4().hex, - }, - }, - token=token, - ) - endpoint_id = resp.result.get('endpoint').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'endpoint', - cadftaxonomy.SECURITY_ENDPOINT) - # test for delete endpoint - self.admin_request( - method='DELETE', - path='/v2.0/endpoints/%s' % endpoint_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'endpoint', - cadftaxonomy.SECURITY_ENDPOINT) - # test for delete service - self.admin_request( - method='DELETE', - path='/v2.0/OS-KSADM/services/%s' % service_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'service', - cadftaxonomy.SECURITY_SERVICE) - - def test_project(self): - token = self.get_scoped_token() - resp = self.admin_request( - method='POST', - path='/v2.0/tenants', - body={ - 'tenant': { - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'enabled': True - }, - }, - token=token, - ) - project_id = resp.result.get('tenant').get('id') - self._assert_initiator_data_is_set(CREATED_OPERATION, - 'project', - cadftaxonomy.SECURITY_PROJECT) - # test for delete project - self.admin_request( - method='DELETE', - path='/v2.0/tenants/%s' % project_id, - token=token, - ) - self._assert_initiator_data_is_set(DELETED_OPERATION, - 'project', - cadftaxonomy.SECURITY_PROJECT) - - -class TestEventCallbacks(test_v3.RestfulTestCase): - - def setUp(self): - super(TestEventCallbacks, self).setUp() - self.has_been_called = False - - def _project_deleted_callback(self, service, resource_type, operation, - payload): - self.has_been_called = True - - def _project_created_callback(self, service, resource_type, operation, - payload): - self.has_been_called = True - - def test_notification_received(self): - callback = register_callback(CREATED_OPERATION, 'project') - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.assertTrue(callback.called) - - def test_notification_method_not_callable(self): - fake_method = None - self.assertRaises(TypeError, - notifications.register_event_callback, - UPDATED_OPERATION, - 'project', - [fake_method]) - - def test_notification_event_not_valid(self): - self.assertRaises(ValueError, - notifications.register_event_callback, - uuid.uuid4().hex, - 'project', - self._project_deleted_callback) - - def test_event_registration_for_unknown_resource_type(self): - # Registration for unknown resource types should succeed. If no event - # is issued for that resource type, the callback wont be triggered. - notifications.register_event_callback(DELETED_OPERATION, - uuid.uuid4().hex, - self._project_deleted_callback) - resource_type = uuid.uuid4().hex - notifications.register_event_callback(DELETED_OPERATION, - resource_type, - self._project_deleted_callback) - - def test_provider_event_callback_subscription(self): - callback_called = [] - - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = { - CREATED_OPERATION: {'project': self.foo_callback}} - - def foo_callback(self, service, resource_type, operation, - payload): - # uses callback_called from the closure - callback_called.append(True) - - Foo() - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.assertEqual([True], callback_called) - - def test_provider_event_callbacks_subscription(self): - callback_called = [] - - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = { - CREATED_OPERATION: { - 'project': [self.callback_0, self.callback_1]}} - - def callback_0(self, service, resource_type, operation, payload): - # uses callback_called from the closure - callback_called.append('cb0') - - def callback_1(self, service, resource_type, operation, payload): - # uses callback_called from the closure - callback_called.append('cb1') - - Foo() - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - self.assertItemsEqual(['cb1', 'cb0'], callback_called) - - def test_invalid_event_callbacks(self): - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = 'bogus' - - self.assertRaises(AttributeError, Foo) - - def test_invalid_event_callbacks_event(self): - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = {CREATED_OPERATION: 'bogus'} - - self.assertRaises(AttributeError, Foo) - - def test_using_an_unbound_method_as_a_callback_fails(self): - # NOTE(dstanek): An unbound method is when you reference a method - # from a class object. You'll get a method that isn't bound to a - # particular instance so there is no magic 'self'. You can call it, - # but you have to pass in the instance manually like: C.m(C()). - # If you reference the method from an instance then you get a method - # that effectively curries the self argument for you - # (think functools.partial). Obviously is we don't have an - # instance then we can't call the method. - @notifications.listener - class Foo(object): - def __init__(self): - self.event_callbacks = {CREATED_OPERATION: - {'project': Foo.callback}} - - def callback(self, *args): - pass - - # TODO(dstanek): it would probably be nice to fail early using - # something like: - # self.assertRaises(TypeError, Foo) - Foo() - project_ref = unit.new_project_ref(domain_id=self.domain_id) - self.assertRaises(TypeError, self.resource_api.create_project, - project_ref['id'], project_ref) - - -class CadfNotificationsWrapperTestCase(test_v3.RestfulTestCase): - - LOCAL_HOST = 'localhost' - ACTION = 'authenticate' - ROLE_ASSIGNMENT = 'role_assignment' - - def setUp(self): - super(CadfNotificationsWrapperTestCase, self).setUp() - self._notifications = [] - - def fake_notify(action, initiator, outcome, target, - event_type, **kwargs): - service_security = cadftaxonomy.SERVICE_SECURITY - - event = eventfactory.EventFactory().new_event( - eventType=cadftype.EVENTTYPE_ACTIVITY, - outcome=outcome, - action=action, - initiator=initiator, - target=target, - observer=cadfresource.Resource(typeURI=service_security)) - - for key, value in kwargs.items(): - setattr(event, key, value) - - note = { - 'action': action, - 'initiator': initiator, - 'event': event, - 'event_type': event_type, - 'send_notification_called': True} - self._notifications.append(note) - - self.useFixture(mockpatch.PatchObject( - notifications, '_send_audit_notification', fake_notify)) - - def _assert_last_note(self, action, user_id, event_type=None): - self.assertTrue(self._notifications) - note = self._notifications[-1] - self.assertEqual(action, note['action']) - initiator = note['initiator'] - self.assertEqual(user_id, initiator.id) - self.assertEqual(self.LOCAL_HOST, initiator.host.address) - self.assertTrue(note['send_notification_called']) - if event_type: - self.assertEqual(event_type, note['event_type']) - - def _assert_event(self, role_id, project=None, domain=None, - user=None, group=None, inherit=False): - """Assert that the CADF event is valid. - - In the case of role assignments, the event will have extra data, - specifically, the role, target, actor, and if the role is inherited. - - An example event, as a dictionary is seen below: - { - 'typeURI': 'http://schemas.dmtf.org/cloud/audit/1.0/event', - 'initiator': { - 'typeURI': 'service/security/account/user', - 'host': {'address': 'localhost'}, - 'id': 'openstack:0a90d95d-582c-4efb-9cbc-e2ca7ca9c341', - 'name': u'bccc2d9bfc2a46fd9e33bcf82f0b5c21' - }, - 'target': { - 'typeURI': 'service/security/account/user', - 'id': 'openstack:d48ea485-ef70-4f65-8d2b-01aa9d7ec12d' - }, - 'observer': { - 'typeURI': 'service/security', - 'id': 'openstack:d51dd870-d929-4aba-8d75-dcd7555a0c95' - }, - 'eventType': 'activity', - 'eventTime': '2014-08-21T21:04:56.204536+0000', - 'role': u'0e6b990380154a2599ce6b6e91548a68', - 'domain': u'24bdcff1aab8474895dbaac509793de1', - 'inherited_to_projects': False, - 'group': u'c1e22dc67cbd469ea0e33bf428fe597a', - 'action': 'created.role_assignment', - 'outcome': 'success', - 'id': 'openstack:782689dd-f428-4f13-99c7-5c70f94a5ac1' - } - """ - note = self._notifications[-1] - event = note['event'] - if project: - self.assertEqual(project, event.project) - if domain: - self.assertEqual(domain, event.domain) - if group: - self.assertEqual(group, event.group) - elif user: - self.assertEqual(user, event.user) - self.assertEqual(role_id, event.role) - self.assertEqual(inherit, event.inherited_to_projects) - - def test_v3_authenticate_user_name_and_domain_id(self): - user_id = self.user_id - user_name = self.user['name'] - password = self.user['password'] - domain_id = self.domain_id - data = self.build_authentication_request(username=user_name, - user_domain_id=domain_id, - password=password) - self.post('/auth/tokens', body=data) - self._assert_last_note(self.ACTION, user_id) - - def test_v3_authenticate_user_id(self): - user_id = self.user_id - password = self.user['password'] - data = self.build_authentication_request(user_id=user_id, - password=password) - self.post('/auth/tokens', body=data) - self._assert_last_note(self.ACTION, user_id) - - def test_v3_authenticate_user_name_and_domain_name(self): - user_id = self.user_id - user_name = self.user['name'] - password = self.user['password'] - domain_name = self.domain['name'] - data = self.build_authentication_request(username=user_name, - user_domain_name=domain_name, - password=password) - self.post('/auth/tokens', body=data) - self._assert_last_note(self.ACTION, user_id) - - def _test_role_assignment(self, url, role, project=None, domain=None, - user=None, group=None): - self.put(url) - action = "%s.%s" % (CREATED_OPERATION, self.ROLE_ASSIGNMENT) - event_type = '%s.%s.%s' % (notifications.SERVICE, - self.ROLE_ASSIGNMENT, CREATED_OPERATION) - self._assert_last_note(action, self.user_id, event_type) - self._assert_event(role, project, domain, user, group) - self.delete(url) - action = "%s.%s" % (DELETED_OPERATION, self.ROLE_ASSIGNMENT) - event_type = '%s.%s.%s' % (notifications.SERVICE, - self.ROLE_ASSIGNMENT, DELETED_OPERATION) - self._assert_last_note(action, self.user_id, event_type) - self._assert_event(role, project, domain, user, None) - - def test_user_project_grant(self): - url = ('/projects/%s/users/%s/roles/%s' % - (self.project_id, self.user_id, self.role_id)) - self._test_role_assignment(url, self.role_id, - project=self.project_id, - user=self.user_id) - - def test_group_domain_grant(self): - group_ref = unit.new_group_ref(domain_id=self.domain_id) - group = self.identity_api.create_group(group_ref) - self.identity_api.add_user_to_group(self.user_id, group['id']) - url = ('/domains/%s/groups/%s/roles/%s' % - (self.domain_id, group['id'], self.role_id)) - self._test_role_assignment(url, self.role_id, - domain=self.domain_id, - user=self.user_id, - group=group['id']) - - def test_add_role_to_user_and_project(self): - # A notification is sent when add_role_to_user_and_project is called on - # the assignment manager. - - project_ref = unit.new_project_ref(self.domain_id) - project = self.resource_api.create_project( - project_ref['id'], project_ref) - tenant_id = project['id'] - - self.assignment_api.add_role_to_user_and_project( - self.user_id, tenant_id, self.role_id) - - self.assertTrue(self._notifications) - note = self._notifications[-1] - self.assertEqual('created.role_assignment', note['action']) - self.assertTrue(note['send_notification_called']) - - self._assert_event(self.role_id, project=tenant_id, user=self.user_id) - - def test_remove_role_from_user_and_project(self): - # A notification is sent when remove_role_from_user_and_project is - # called on the assignment manager. - - self.assignment_api.remove_role_from_user_and_project( - self.user_id, self.project_id, self.role_id) - - self.assertTrue(self._notifications) - note = self._notifications[-1] - self.assertEqual('deleted.role_assignment', note['action']) - self.assertTrue(note['send_notification_called']) - - self._assert_event(self.role_id, project=self.project_id, - user=self.user_id) - - -class TestCallbackRegistration(unit.BaseTestCase): - def setUp(self): - super(TestCallbackRegistration, self).setUp() - self.mock_log = mock.Mock() - # Force the callback logging to occur - self.mock_log.logger.getEffectiveLevel.return_value = logging.DEBUG - - def verify_log_message(self, data): - """Verify log message. - - Tests that use this are a little brittle because adding more - logging can break them. - - TODO(dstanek): remove the need for this in a future refactoring - - """ - log_fn = self.mock_log.debug - self.assertEqual(len(data), log_fn.call_count) - for datum in data: - log_fn.assert_any_call(mock.ANY, datum) - - def test_a_function_callback(self): - def callback(*args, **kwargs): - pass - - resource_type = 'thing' - with mock.patch('keystone.notifications.LOG', self.mock_log): - notifications.register_event_callback( - CREATED_OPERATION, resource_type, callback) - - callback = 'keystone.tests.unit.common.test_notifications.callback' - expected_log_data = { - 'callback': callback, - 'event': 'identity.%s.created' % resource_type - } - self.verify_log_message([expected_log_data]) - - def test_a_method_callback(self): - class C(object): - def callback(self, *args, **kwargs): - pass - - with mock.patch('keystone.notifications.LOG', self.mock_log): - notifications.register_event_callback( - CREATED_OPERATION, 'thing', C().callback) - - callback = 'keystone.tests.unit.common.test_notifications.C.callback' - expected_log_data = { - 'callback': callback, - 'event': 'identity.thing.created' - } - self.verify_log_message([expected_log_data]) - - def test_a_list_of_callbacks(self): - def callback(*args, **kwargs): - pass - - class C(object): - def callback(self, *args, **kwargs): - pass - - with mock.patch('keystone.notifications.LOG', self.mock_log): - notifications.register_event_callback( - CREATED_OPERATION, 'thing', [callback, C().callback]) - - callback_1 = 'keystone.tests.unit.common.test_notifications.callback' - callback_2 = 'keystone.tests.unit.common.test_notifications.C.callback' - expected_log_data = [ - { - 'callback': callback_1, - 'event': 'identity.thing.created' - }, - { - 'callback': callback_2, - 'event': 'identity.thing.created' - }, - ] - self.verify_log_message(expected_log_data) - - def test_an_invalid_callback(self): - self.assertRaises(TypeError, - notifications.register_event_callback, - (CREATED_OPERATION, 'thing', object())) - - def test_an_invalid_event(self): - def callback(*args, **kwargs): - pass - - self.assertRaises(ValueError, - notifications.register_event_callback, - uuid.uuid4().hex, - 'thing', - callback) |