From 03bf0c32a0c656d4b91bebedc87a005e6d7563bb Mon Sep 17 00:00:00 2001 From: WuKong Date: Wed, 1 Jul 2015 08:54:55 +0200 Subject: migrate openstack hook to opnfv Change-Id: I1e828dae38820fdff93966e57691b344af01140f Signed-off-by: WuKong --- .../tests/unit/test_audit_middleware.py | 485 +++++++++++++++++++++ 1 file changed, 485 insertions(+) create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py') diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py new file mode 100644 index 00000000..89e5aa44 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py @@ -0,0 +1,485 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import tempfile +import uuid + +import mock +from oslo_config import cfg +from pycadf import identifier +import testtools +from testtools import matchers +import webob + +from keystonemiddleware import audit + + +class FakeApp(object): + def __call__(self, env, start_response): + body = 'Some response' + start_response('200 OK', [ + ('Content-Type', 'text/plain'), + ('Content-Length', str(sum(map(len, body)))) + ]) + return [body] + + +class FakeFailingApp(object): + def __call__(self, env, start_response): + raise Exception('It happens!') + + +class BaseAuditMiddlewareTest(testtools.TestCase): + def setUp(self): + super(BaseAuditMiddlewareTest, self).setUp() + self.fd, self.audit_map = tempfile.mkstemp() + + with open(self.audit_map, "w") as f: + f.write("[custom_actions]\n") + f.write("reboot = start/reboot\n") + f.write("os-migrations/get = read\n\n") + f.write("[path_keywords]\n") + f.write("action = None\n") + f.write("os-hosts = host\n") + f.write("os-migrations = None\n") + f.write("reboot = None\n") + f.write("servers = server\n\n") + f.write("[service_endpoints]\n") + f.write("compute = service/compute") + + cfg.CONF([], project='keystonemiddleware') + + self.middleware = audit.AuditMiddleware( + FakeApp(), audit_map_file=self.audit_map, + service_name='pycadf') + + self.addCleanup(lambda: os.close(self.fd)) + self.addCleanup(cfg.CONF.reset) + + @staticmethod + def get_environ_header(req_type): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"adminURL": + "http://admin_host:8774", + "region": "RegionOne", + "publicURL": + "http://public_host:8774", + "internalURL": + "http://internal_host:8774", + "id": "resource_id"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed'} + env_headers['REQUEST_METHOD'] = req_type + return env_headers + + +@mock.patch('oslo.messaging.get_transport', mock.MagicMock()) +class AuditMiddlewareTest(BaseAuditMiddlewareTest): + + def test_api_request(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + self.middleware(req) + # Check first notification with only 'request' + call_args = notify.call_args_list[0][0] + self.assertEqual('audit.http.request', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('pending', call_args[2]['outcome']) + self.assertNotIn('reason', call_args[2]) + self.assertNotIn('reporterchain', call_args[2]) + + # Check second notification with request + response + call_args = notify.call_args_list[1][0] + self.assertEqual('audit.http.response', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('success', call_args[2]['outcome']) + self.assertIn('reason', call_args[2]) + self.assertIn('reporterchain', call_args[2]) + + def test_api_request_failure(self): + self.middleware = audit.AuditMiddleware( + FakeFailingApp(), + audit_map_file=self.audit_map, + service_name='pycadf') + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + try: + self.middleware(req) + self.fail('Application exception has not been re-raised') + except Exception: + pass + # Check first notification with only 'request' + call_args = notify.call_args_list[0][0] + self.assertEqual('audit.http.request', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('pending', call_args[2]['outcome']) + self.assertNotIn('reporterchain', call_args[2]) + + # Check second notification with request + response + call_args = notify.call_args_list[1][0] + self.assertEqual('audit.http.response', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('unknown', call_args[2]['outcome']) + self.assertIn('reporterchain', call_args[2]) + + def test_process_request_fail(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info', + side_effect=Exception('error')) as notify: + self.middleware._process_request(req) + self.assertTrue(notify.called) + + def test_process_response_fail(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info', + side_effect=Exception('error')) as notify: + self.middleware._process_response(req, webob.response.Response()) + self.assertTrue(notify.called) + + def test_ignore_req_opt(self): + self.middleware = audit.AuditMiddleware(FakeApp(), + audit_map_file=self.audit_map, + ignore_req_list='get, PUT') + req = webob.Request.blank('/skip/foo', + environ=self.get_environ_header('GET')) + req1 = webob.Request.blank('/skip/foo', + environ=self.get_environ_header('PUT')) + req2 = webob.Request.blank('/accept/foo', + environ=self.get_environ_header('POST')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + # Check GET/PUT request does not send notification + self.middleware(req) + self.middleware(req1) + self.assertEqual([], notify.call_args_list) + + # Check non-GET/PUT request does send notification + self.middleware(req2) + self.assertThat(notify.call_args_list, matchers.HasLength(2)) + call_args = notify.call_args_list[0][0] + self.assertEqual('audit.http.request', call_args[1]) + self.assertEqual('/accept/foo', call_args[2]['requestPath']) + + call_args = notify.call_args_list[1][0] + self.assertEqual('audit.http.response', call_args[1]) + self.assertEqual('/accept/foo', call_args[2]['requestPath']) + + def test_api_request_no_messaging(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('keystonemiddleware.audit.messaging', None): + with mock.patch('keystonemiddleware.audit._LOG.info') as log: + self.middleware(req) + # Check first notification with only 'request' + call_args = log.call_args_list[0][0] + self.assertEqual('audit.http.request', + call_args[1]['event_type']) + + # Check second notification with request + response + call_args = log.call_args_list[1][0] + self.assertEqual('audit.http.response', + call_args[1]['event_type']) + + def test_cadf_event_scoped_to_request(self): + middleware = audit.AuditMiddleware( + FakeApp(), + audit_map_file=self.audit_map, + service_name='pycadf') + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + middleware(req) + self.assertIsNotNone(req.environ.get('cadf_event')) + + # ensure exact same event is used between request and response + self.assertEqual(notify.call_args_list[0][0][2]['id'], + notify.call_args_list[1][0][2]['id']) + + def test_cadf_event_scoped_to_request_on_error(self): + middleware = audit.AuditMiddleware( + FakeApp(), + audit_map_file=self.audit_map, + service_name='pycadf') + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info', + side_effect=Exception('error')) as notify: + middleware._process_request(req) + self.assertTrue(notify.called) + req2 = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + middleware._process_response(req2, webob.response.Response()) + self.assertTrue(notify.called) + # ensure event is not the same across requests + self.assertNotEqual(req.environ['cadf_event'].id, + notify.call_args_list[0][0][2]['id']) + + +@mock.patch('oslo.messaging', mock.MagicMock()) +class AuditApiLogicTest(BaseAuditMiddlewareTest): + + def api_request(self, method, url): + req = webob.Request.blank(url, environ=self.get_environ_header(method), + remote_addr='192.168.0.1') + self.middleware._process_request(req) + return req + + def test_get_list(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['action'], 'read/list') + self.assertEqual(payload['typeURI'], + 'http://schemas.dmtf.org/cloud/audit/1.0/event') + self.assertEqual(payload['outcome'], 'pending') + self.assertEqual(payload['eventType'], 'activity') + self.assertEqual(payload['target']['name'], 'nova') + self.assertEqual(payload['target']['id'], 'openstack:resource_id') + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(len(payload['target']['addresses']), 3) + self.assertEqual(payload['target']['addresses'][0]['name'], 'admin') + self.assertEqual(payload['target']['addresses'][0]['url'], + 'http://admin_host:8774') + self.assertEqual(payload['initiator']['id'], 'openstack:user_id') + self.assertEqual(payload['initiator']['name'], 'user_name') + self.assertEqual(payload['initiator']['project_id'], + 'openstack:tenant_id') + self.assertEqual(payload['initiator']['host']['address'], + '192.168.0.1') + self.assertEqual(payload['initiator']['typeURI'], + 'service/security/account/user') + self.assertNotEqual(payload['initiator']['credential']['token'], + 'token') + self.assertEqual(payload['initiator']['credential']['identity_status'], + 'Confirmed') + self.assertNotIn('reason', payload) + self.assertNotIn('reporterchain', payload) + self.assertEqual(payload['observer']['id'], 'target') + self.assertEqual(req.path, payload['requestPath']) + + def test_get_read(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/' + + str(uuid.uuid4())) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/server') + self.assertEqual(payload['action'], 'read') + self.assertEqual(payload['outcome'], 'pending') + + def test_get_unknown_endpoint(self): + req = self.api_request('GET', 'http://unknown:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['action'], 'read/list') + self.assertEqual(payload['outcome'], 'pending') + self.assertEqual(payload['target']['name'], 'unknown') + self.assertEqual(payload['target']['id'], 'unknown') + self.assertEqual(payload['target']['typeURI'], 'unknown') + + def test_get_unknown_endpoint_default_set(self): + with open(self.audit_map, "w") as f: + f.write("[DEFAULT]\n") + f.write("target_endpoint_type = compute\n") + f.write("[path_keywords]\n") + f.write("servers = server\n\n") + f.write("[service_endpoints]\n") + f.write("compute = service/compute") + + self.middleware = audit.AuditMiddleware( + FakeApp(), audit_map_file=self.audit_map, + service_name='pycadf') + + req = self.api_request('GET', 'http://unknown:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['action'], 'read/list') + self.assertEqual(payload['outcome'], 'pending') + self.assertEqual(payload['target']['name'], 'nova') + self.assertEqual(payload['target']['id'], 'openstack:resource_id') + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + + def test_put(self): + req = self.api_request('PUT', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'update') + self.assertEqual(payload['outcome'], 'pending') + + def test_delete(self): + req = self.api_request('DELETE', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'delete') + self.assertEqual(payload['outcome'], 'pending') + + def test_head(self): + req = self.api_request('HEAD', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'read') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_update(self): + req = self.api_request('POST', + 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/' + + str(uuid.uuid4())) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/server') + self.assertEqual(payload['action'], 'update') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_create(self): + req = self.api_request('POST', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'create') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_action(self): + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/action', + environ=self.get_environ_header('POST')) + req.body = b'{"createImage" : {"name" : "new-image","metadata": ' \ + b'{"ImageType": "Gold","ImageVersion": "2.0"}}}' + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/action') + self.assertEqual(payload['action'], 'update/createImage') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_empty_body_action(self): + req = self.api_request('POST', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/action') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/action') + self.assertEqual(payload['action'], 'create') + self.assertEqual(payload['outcome'], 'pending') + + def test_custom_action(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/os-hosts/' + + str(uuid.uuid4()) + '/reboot') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/os-hosts/host/reboot') + self.assertEqual(payload['action'], 'start/reboot') + self.assertEqual(payload['outcome'], 'pending') + + def test_custom_action_complex(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/os-migrations') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/os-migrations') + self.assertEqual(payload['action'], 'read') + req = self.api_request('POST', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/os-migrations') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/os-migrations') + self.assertEqual(payload['action'], 'create') + + def test_response_mod_msg(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.middleware._process_response(req, webob.Response()) + payload2 = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['id'], payload2['id']) + self.assertEqual(payload['tags'], payload2['tags']) + self.assertEqual(payload2['outcome'], 'success') + self.assertEqual(payload2['reason']['reasonType'], 'HTTP') + self.assertEqual(payload2['reason']['reasonCode'], '200') + self.assertEqual(len(payload2['reporterchain']), 1) + self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier') + self.assertEqual(payload2['reporterchain'][0]['reporter']['id'], + 'target') + + def test_no_response(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.middleware._process_response(req, None) + payload2 = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['id'], payload2['id']) + self.assertEqual(payload['tags'], payload2['tags']) + self.assertEqual(payload2['outcome'], 'unknown') + self.assertNotIn('reason', payload2) + self.assertEqual(len(payload2['reporterchain']), 1) + self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier') + self.assertEqual(payload2['reporterchain'][0]['reporter']['id'], + 'target') + + def test_missing_req(self): + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=self.get_environ_header('GET')) + self.assertNotIn('cadf_event', req.environ) + self.middleware._process_response(req, webob.Response()) + self.assertIn('cadf_event', req.environ) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['outcome'], 'success') + self.assertEqual(payload['reason']['reasonType'], 'HTTP') + self.assertEqual(payload['reason']['reasonCode'], '200') + self.assertEqual(payload['observer']['id'], 'target') + + def test_missing_catalog_endpoint_id(self): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"adminURL": + "http://admin_host:8774", + "region": "RegionOne", + "publicURL": + "http://public_host:8774", + "internalURL": + "http://internal_host:8774"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'REQUEST_METHOD': 'GET'} + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=env_headers) + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['id'], identifier.norm_ns('nova')) -- cgit 1.2.3-korg