aboutsummaryrefslogtreecommitdiffstats
path: root/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py')
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py485
1 files changed, 485 insertions, 0 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py
new file mode 100644
index 00000000..89e5aa44
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py
@@ -0,0 +1,485 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+import uuid
+
+import mock
+from oslo_config import cfg
+from pycadf import identifier
+import testtools
+from testtools import matchers
+import webob
+
+from keystonemiddleware import audit
+
+
+class FakeApp(object):
+ def __call__(self, env, start_response):
+ body = 'Some response'
+ start_response('200 OK', [
+ ('Content-Type', 'text/plain'),
+ ('Content-Length', str(sum(map(len, body))))
+ ])
+ return [body]
+
+
+class FakeFailingApp(object):
+ def __call__(self, env, start_response):
+ raise Exception('It happens!')
+
+
+class BaseAuditMiddlewareTest(testtools.TestCase):
+ def setUp(self):
+ super(BaseAuditMiddlewareTest, self).setUp()
+ self.fd, self.audit_map = tempfile.mkstemp()
+
+ with open(self.audit_map, "w") as f:
+ f.write("[custom_actions]\n")
+ f.write("reboot = start/reboot\n")
+ f.write("os-migrations/get = read\n\n")
+ f.write("[path_keywords]\n")
+ f.write("action = None\n")
+ f.write("os-hosts = host\n")
+ f.write("os-migrations = None\n")
+ f.write("reboot = None\n")
+ f.write("servers = server\n\n")
+ f.write("[service_endpoints]\n")
+ f.write("compute = service/compute")
+
+ cfg.CONF([], project='keystonemiddleware')
+
+ self.middleware = audit.AuditMiddleware(
+ FakeApp(), audit_map_file=self.audit_map,
+ service_name='pycadf')
+
+ self.addCleanup(lambda: os.close(self.fd))
+ self.addCleanup(cfg.CONF.reset)
+
+ @staticmethod
+ def get_environ_header(req_type):
+ env_headers = {'HTTP_X_SERVICE_CATALOG':
+ '''[{"endpoints_links": [],
+ "endpoints": [{"adminURL":
+ "http://admin_host:8774",
+ "region": "RegionOne",
+ "publicURL":
+ "http://public_host:8774",
+ "internalURL":
+ "http://internal_host:8774",
+ "id": "resource_id"}],
+ "type": "compute",
+ "name": "nova"},]''',
+ 'HTTP_X_USER_ID': 'user_id',
+ 'HTTP_X_USER_NAME': 'user_name',
+ 'HTTP_X_AUTH_TOKEN': 'token',
+ 'HTTP_X_PROJECT_ID': 'tenant_id',
+ 'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
+ env_headers['REQUEST_METHOD'] = req_type
+ return env_headers
+
+
+@mock.patch('oslo.messaging.get_transport', mock.MagicMock())
+class AuditMiddlewareTest(BaseAuditMiddlewareTest):
+
+ def test_api_request(self):
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info') as notify:
+ self.middleware(req)
+ # Check first notification with only 'request'
+ call_args = notify.call_args_list[0][0]
+ self.assertEqual('audit.http.request', call_args[1])
+ self.assertEqual('/foo/bar', call_args[2]['requestPath'])
+ self.assertEqual('pending', call_args[2]['outcome'])
+ self.assertNotIn('reason', call_args[2])
+ self.assertNotIn('reporterchain', call_args[2])
+
+ # Check second notification with request + response
+ call_args = notify.call_args_list[1][0]
+ self.assertEqual('audit.http.response', call_args[1])
+ self.assertEqual('/foo/bar', call_args[2]['requestPath'])
+ self.assertEqual('success', call_args[2]['outcome'])
+ self.assertIn('reason', call_args[2])
+ self.assertIn('reporterchain', call_args[2])
+
+ def test_api_request_failure(self):
+ self.middleware = audit.AuditMiddleware(
+ FakeFailingApp(),
+ audit_map_file=self.audit_map,
+ service_name='pycadf')
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info') as notify:
+ try:
+ self.middleware(req)
+ self.fail('Application exception has not been re-raised')
+ except Exception:
+ pass
+ # Check first notification with only 'request'
+ call_args = notify.call_args_list[0][0]
+ self.assertEqual('audit.http.request', call_args[1])
+ self.assertEqual('/foo/bar', call_args[2]['requestPath'])
+ self.assertEqual('pending', call_args[2]['outcome'])
+ self.assertNotIn('reporterchain', call_args[2])
+
+ # Check second notification with request + response
+ call_args = notify.call_args_list[1][0]
+ self.assertEqual('audit.http.response', call_args[1])
+ self.assertEqual('/foo/bar', call_args[2]['requestPath'])
+ self.assertEqual('unknown', call_args[2]['outcome'])
+ self.assertIn('reporterchain', call_args[2])
+
+ def test_process_request_fail(self):
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info',
+ side_effect=Exception('error')) as notify:
+ self.middleware._process_request(req)
+ self.assertTrue(notify.called)
+
+ def test_process_response_fail(self):
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info',
+ side_effect=Exception('error')) as notify:
+ self.middleware._process_response(req, webob.response.Response())
+ self.assertTrue(notify.called)
+
+ def test_ignore_req_opt(self):
+ self.middleware = audit.AuditMiddleware(FakeApp(),
+ audit_map_file=self.audit_map,
+ ignore_req_list='get, PUT')
+ req = webob.Request.blank('/skip/foo',
+ environ=self.get_environ_header('GET'))
+ req1 = webob.Request.blank('/skip/foo',
+ environ=self.get_environ_header('PUT'))
+ req2 = webob.Request.blank('/accept/foo',
+ environ=self.get_environ_header('POST'))
+ with mock.patch('oslo.messaging.Notifier.info') as notify:
+ # Check GET/PUT request does not send notification
+ self.middleware(req)
+ self.middleware(req1)
+ self.assertEqual([], notify.call_args_list)
+
+ # Check non-GET/PUT request does send notification
+ self.middleware(req2)
+ self.assertThat(notify.call_args_list, matchers.HasLength(2))
+ call_args = notify.call_args_list[0][0]
+ self.assertEqual('audit.http.request', call_args[1])
+ self.assertEqual('/accept/foo', call_args[2]['requestPath'])
+
+ call_args = notify.call_args_list[1][0]
+ self.assertEqual('audit.http.response', call_args[1])
+ self.assertEqual('/accept/foo', call_args[2]['requestPath'])
+
+ def test_api_request_no_messaging(self):
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('keystonemiddleware.audit.messaging', None):
+ with mock.patch('keystonemiddleware.audit._LOG.info') as log:
+ self.middleware(req)
+ # Check first notification with only 'request'
+ call_args = log.call_args_list[0][0]
+ self.assertEqual('audit.http.request',
+ call_args[1]['event_type'])
+
+ # Check second notification with request + response
+ call_args = log.call_args_list[1][0]
+ self.assertEqual('audit.http.response',
+ call_args[1]['event_type'])
+
+ def test_cadf_event_scoped_to_request(self):
+ middleware = audit.AuditMiddleware(
+ FakeApp(),
+ audit_map_file=self.audit_map,
+ service_name='pycadf')
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info') as notify:
+ middleware(req)
+ self.assertIsNotNone(req.environ.get('cadf_event'))
+
+ # ensure exact same event is used between request and response
+ self.assertEqual(notify.call_args_list[0][0][2]['id'],
+ notify.call_args_list[1][0][2]['id'])
+
+ def test_cadf_event_scoped_to_request_on_error(self):
+ middleware = audit.AuditMiddleware(
+ FakeApp(),
+ audit_map_file=self.audit_map,
+ service_name='pycadf')
+ req = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info',
+ side_effect=Exception('error')) as notify:
+ middleware._process_request(req)
+ self.assertTrue(notify.called)
+ req2 = webob.Request.blank('/foo/bar',
+ environ=self.get_environ_header('GET'))
+ with mock.patch('oslo.messaging.Notifier.info') as notify:
+ middleware._process_response(req2, webob.response.Response())
+ self.assertTrue(notify.called)
+ # ensure event is not the same across requests
+ self.assertNotEqual(req.environ['cadf_event'].id,
+ notify.call_args_list[0][0][2]['id'])
+
+
+@mock.patch('oslo.messaging', mock.MagicMock())
+class AuditApiLogicTest(BaseAuditMiddlewareTest):
+
+ def api_request(self, method, url):
+ req = webob.Request.blank(url, environ=self.get_environ_header(method),
+ remote_addr='192.168.0.1')
+ self.middleware._process_request(req)
+ return req
+
+ def test_get_list(self):
+ req = self.api_request('GET', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['action'], 'read/list')
+ self.assertEqual(payload['typeURI'],
+ 'http://schemas.dmtf.org/cloud/audit/1.0/event')
+ self.assertEqual(payload['outcome'], 'pending')
+ self.assertEqual(payload['eventType'], 'activity')
+ self.assertEqual(payload['target']['name'], 'nova')
+ self.assertEqual(payload['target']['id'], 'openstack:resource_id')
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers')
+ self.assertEqual(len(payload['target']['addresses']), 3)
+ self.assertEqual(payload['target']['addresses'][0]['name'], 'admin')
+ self.assertEqual(payload['target']['addresses'][0]['url'],
+ 'http://admin_host:8774')
+ self.assertEqual(payload['initiator']['id'], 'openstack:user_id')
+ self.assertEqual(payload['initiator']['name'], 'user_name')
+ self.assertEqual(payload['initiator']['project_id'],
+ 'openstack:tenant_id')
+ self.assertEqual(payload['initiator']['host']['address'],
+ '192.168.0.1')
+ self.assertEqual(payload['initiator']['typeURI'],
+ 'service/security/account/user')
+ self.assertNotEqual(payload['initiator']['credential']['token'],
+ 'token')
+ self.assertEqual(payload['initiator']['credential']['identity_status'],
+ 'Confirmed')
+ self.assertNotIn('reason', payload)
+ self.assertNotIn('reporterchain', payload)
+ self.assertEqual(payload['observer']['id'], 'target')
+ self.assertEqual(req.path, payload['requestPath'])
+
+ def test_get_read(self):
+ req = self.api_request('GET', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers/'
+ + str(uuid.uuid4()))
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers/server')
+ self.assertEqual(payload['action'], 'read')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_get_unknown_endpoint(self):
+ req = self.api_request('GET', 'http://unknown:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['action'], 'read/list')
+ self.assertEqual(payload['outcome'], 'pending')
+ self.assertEqual(payload['target']['name'], 'unknown')
+ self.assertEqual(payload['target']['id'], 'unknown')
+ self.assertEqual(payload['target']['typeURI'], 'unknown')
+
+ def test_get_unknown_endpoint_default_set(self):
+ with open(self.audit_map, "w") as f:
+ f.write("[DEFAULT]\n")
+ f.write("target_endpoint_type = compute\n")
+ f.write("[path_keywords]\n")
+ f.write("servers = server\n\n")
+ f.write("[service_endpoints]\n")
+ f.write("compute = service/compute")
+
+ self.middleware = audit.AuditMiddleware(
+ FakeApp(), audit_map_file=self.audit_map,
+ service_name='pycadf')
+
+ req = self.api_request('GET', 'http://unknown:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['action'], 'read/list')
+ self.assertEqual(payload['outcome'], 'pending')
+ self.assertEqual(payload['target']['name'], 'nova')
+ self.assertEqual(payload['target']['id'], 'openstack:resource_id')
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers')
+
+ def test_put(self):
+ req = self.api_request('PUT', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers')
+ self.assertEqual(payload['action'], 'update')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_delete(self):
+ req = self.api_request('DELETE', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers')
+ self.assertEqual(payload['action'], 'delete')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_head(self):
+ req = self.api_request('HEAD', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers')
+ self.assertEqual(payload['action'], 'read')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_post_update(self):
+ req = self.api_request('POST',
+ 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers/'
+ + str(uuid.uuid4()))
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers/server')
+ self.assertEqual(payload['action'], 'update')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_post_create(self):
+ req = self.api_request('POST', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers')
+ self.assertEqual(payload['action'], 'create')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_post_action(self):
+ req = webob.Request.blank('http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers/action',
+ environ=self.get_environ_header('POST'))
+ req.body = b'{"createImage" : {"name" : "new-image","metadata": ' \
+ b'{"ImageType": "Gold","ImageVersion": "2.0"}}}'
+ self.middleware._process_request(req)
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers/action')
+ self.assertEqual(payload['action'], 'update/createImage')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_post_empty_body_action(self):
+ req = self.api_request('POST', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers/action')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/servers/action')
+ self.assertEqual(payload['action'], 'create')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_custom_action(self):
+ req = self.api_request('GET', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/os-hosts/'
+ + str(uuid.uuid4()) + '/reboot')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/os-hosts/host/reboot')
+ self.assertEqual(payload['action'], 'start/reboot')
+ self.assertEqual(payload['outcome'], 'pending')
+
+ def test_custom_action_complex(self):
+ req = self.api_request('GET', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/os-migrations')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/os-migrations')
+ self.assertEqual(payload['action'], 'read')
+ req = self.api_request('POST', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/os-migrations')
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['typeURI'],
+ 'service/compute/os-migrations')
+ self.assertEqual(payload['action'], 'create')
+
+ def test_response_mod_msg(self):
+ req = self.api_request('GET', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.middleware._process_response(req, webob.Response())
+ payload2 = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['id'], payload2['id'])
+ self.assertEqual(payload['tags'], payload2['tags'])
+ self.assertEqual(payload2['outcome'], 'success')
+ self.assertEqual(payload2['reason']['reasonType'], 'HTTP')
+ self.assertEqual(payload2['reason']['reasonCode'], '200')
+ self.assertEqual(len(payload2['reporterchain']), 1)
+ self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier')
+ self.assertEqual(payload2['reporterchain'][0]['reporter']['id'],
+ 'target')
+
+ def test_no_response(self):
+ req = self.api_request('GET', 'http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers')
+ payload = req.environ['cadf_event'].as_dict()
+ self.middleware._process_response(req, None)
+ payload2 = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['id'], payload2['id'])
+ self.assertEqual(payload['tags'], payload2['tags'])
+ self.assertEqual(payload2['outcome'], 'unknown')
+ self.assertNotIn('reason', payload2)
+ self.assertEqual(len(payload2['reporterchain']), 1)
+ self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier')
+ self.assertEqual(payload2['reporterchain'][0]['reporter']['id'],
+ 'target')
+
+ def test_missing_req(self):
+ req = webob.Request.blank('http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers',
+ environ=self.get_environ_header('GET'))
+ self.assertNotIn('cadf_event', req.environ)
+ self.middleware._process_response(req, webob.Response())
+ self.assertIn('cadf_event', req.environ)
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['outcome'], 'success')
+ self.assertEqual(payload['reason']['reasonType'], 'HTTP')
+ self.assertEqual(payload['reason']['reasonCode'], '200')
+ self.assertEqual(payload['observer']['id'], 'target')
+
+ def test_missing_catalog_endpoint_id(self):
+ env_headers = {'HTTP_X_SERVICE_CATALOG':
+ '''[{"endpoints_links": [],
+ "endpoints": [{"adminURL":
+ "http://admin_host:8774",
+ "region": "RegionOne",
+ "publicURL":
+ "http://public_host:8774",
+ "internalURL":
+ "http://internal_host:8774"}],
+ "type": "compute",
+ "name": "nova"},]''',
+ 'HTTP_X_USER_ID': 'user_id',
+ 'HTTP_X_USER_NAME': 'user_name',
+ 'HTTP_X_AUTH_TOKEN': 'token',
+ 'HTTP_X_PROJECT_ID': 'tenant_id',
+ 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
+ 'REQUEST_METHOD': 'GET'}
+ req = webob.Request.blank('http://admin_host:8774/v2/'
+ + str(uuid.uuid4()) + '/servers',
+ environ=env_headers)
+ self.middleware._process_request(req)
+ payload = req.environ['cadf_event'].as_dict()
+ self.assertEqual(payload['target']['id'], identifier.norm_ns('nova'))