aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/common/test_notifications.py
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-12-02 09:49:33 +0100
committerasteroide <thomas.duval@orange.com>2015-12-02 10:25:15 +0100
commit7a5a0e4df646d46476ec7a9fcdedd638e8781f6e (patch)
tree54eecd1210e4fb5db2b14edeac1df601da7698e2 /keystone-moon/keystone/tests/unit/common/test_notifications.py
parent8d7b0ffa8e7a7bb09686d8f25176c364d5b6aa0e (diff)
Update keystone to the branch stable/liberty.
Change-Id: I7cce62ae4b4cbca525a7b9499285455bdd04993e
Diffstat (limited to 'keystone-moon/keystone/tests/unit/common/test_notifications.py')
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_notifications.py163
1 files changed, 158 insertions, 5 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py
index ec087c41..1ad8d50d 100644
--- a/keystone-moon/keystone/tests/unit/common/test_notifications.py
+++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py
@@ -279,6 +279,16 @@ class BaseNotificationTest(test_v3.RestfulTestCase):
self.assertEqual(event_type, audit['event_type'])
self.assertTrue(audit['send_notification_called'])
+ def _assert_initiator_data_is_set(self, operation, resource_type, typeURI):
+ self.assertTrue(len(self._audits) > 0)
+ audit = self._audits[-1]
+ payload = audit['payload']
+ self.assertEqual(self.user_id, payload['initiator']['id'])
+ self.assertEqual(self.project_id, payload['initiator']['project_id'])
+ self.assertEqual(typeURI, payload['target']['typeURI'])
+ action = '%s.%s' % (operation, resource_type)
+ self.assertEqual(action, payload['action'])
+
def _assert_notify_not_sent(self, resource_id, operation, resource_type,
public=True):
unexpected = {
@@ -633,11 +643,154 @@ class CADFNotificationsForEntities(NotificationsForEntities):
resource_id = resp.result.get('domain').get('id')
self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain',
cadftaxonomy.SECURITY_DOMAIN)
- self.assertTrue(len(self._audits) > 0)
- audit = self._audits[-1]
- payload = audit['payload']
- self.assertEqual(self.user_id, payload['initiator']['id'])
- self.assertEqual(self.project_id, payload['initiator']['project_id'])
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'domain',
+ cadftaxonomy.SECURITY_DOMAIN)
+
+
+class V2Notifications(BaseNotificationTest):
+
+ def setUp(self):
+ super(V2Notifications, self).setUp()
+ self.config_fixture.config(notification_format='cadf')
+
+ def test_user(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/users',
+ body={
+ 'user': {
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ },
+ },
+ token=token,
+ )
+ user_id = resp.result.get('user').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+ # test for delete user
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/users/%s' % user_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+
+ def test_role(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/OS-KSADM/roles',
+ body={
+ 'role': {
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ },
+ },
+ token=token,
+ )
+ role_id = resp.result.get('role').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'role',
+ cadftaxonomy.SECURITY_ROLE)
+ # test for delete role
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/OS-KSADM/roles/%s' % role_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'role',
+ cadftaxonomy.SECURITY_ROLE)
+
+ def test_service_and_endpoint(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/OS-KSADM/services',
+ body={
+ 'OS-KSADM:service': {
+ 'name': uuid.uuid4().hex,
+ 'type': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ },
+ },
+ token=token,
+ )
+ service_id = resp.result.get('OS-KSADM:service').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'service',
+ cadftaxonomy.SECURITY_SERVICE)
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/endpoints',
+ body={
+ 'endpoint': {
+ 'region': uuid.uuid4().hex,
+ 'service_id': service_id,
+ 'publicurl': uuid.uuid4().hex,
+ 'adminurl': uuid.uuid4().hex,
+ 'internalurl': uuid.uuid4().hex,
+ },
+ },
+ token=token,
+ )
+ endpoint_id = resp.result.get('endpoint').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'endpoint',
+ cadftaxonomy.SECURITY_ENDPOINT)
+ # test for delete endpoint
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/endpoints/%s' % endpoint_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'endpoint',
+ cadftaxonomy.SECURITY_ENDPOINT)
+ # test for delete service
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/OS-KSADM/services/%s' % service_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'service',
+ cadftaxonomy.SECURITY_SERVICE)
+
+ def test_project(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/tenants',
+ body={
+ 'tenant': {
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True
+ },
+ },
+ token=token,
+ )
+ project_id = resp.result.get('tenant').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'project',
+ cadftaxonomy.SECURITY_PROJECT)
+ # test for delete project
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/tenants/%s' % project_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'project',
+ cadftaxonomy.SECURITY_PROJECT)
class TestEventCallbacks(test_v3.RestfulTestCase):