diff options
-rw-r--r-- | yardstick/tests/unit/benchmark/runner/test_proxduration.py | 275 |
1 files changed, 133 insertions, 142 deletions
diff --git a/yardstick/tests/unit/benchmark/runner/test_proxduration.py b/yardstick/tests/unit/benchmark/runner/test_proxduration.py index be1715aad..3299c5b05 100644 --- a/yardstick/tests/unit/benchmark/runner/test_proxduration.py +++ b/yardstick/tests/unit/benchmark/runner/test_proxduration.py @@ -1,23 +1,29 @@ -############################################################################## -# Copyright (c) 2018 Nokia and others. +# Copyright (c) 2018 Intel Corporation # -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import mock import unittest import multiprocessing import os -import time from yardstick.benchmark.runners import proxduration +from yardstick.common import constants from yardstick.common import exceptions as y_exc class ProxDurationRunnerTest(unittest.TestCase): + class MyMethod(object): SLA_VALIDATION_ERROR_SIDE_EFFECT = 1 BROAD_EXCEPTION_SIDE_EFFECT = 2 @@ -69,38 +75,37 @@ class ProxDurationRunnerTest(unittest.TestCase): @mock.patch.object(os, 'getpid') def test__worker_process_runner_id(self, mock_os_getpid): mock_os_getpid.return_value = 101 - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), mock.Mock()) - self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101) + self.assertEqual(101, self.scenario_cfg['runner']['runner_id']) def test__worker_process_called_with_cfg(self): - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() def test__worker_process_called_with_cfg_loop(self): - self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01} - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() self.assertGreater(self.benchmark.my_method.call_count, 2) def test__worker_process_called_without_cfg(self): scenario_cfg = {'runner': {}} - aborted = multiprocessing.Event() aborted.set() - - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - scenario_cfg, {}, aborted, mock.Mock()) + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', scenario_cfg, {}, + aborted, mock.Mock()) self.benchmark_cls.assert_called_once_with(scenario_cfg, {}) self.benchmark.setup.assert_called_once() @@ -108,188 +113,174 @@ class ProxDurationRunnerTest(unittest.TestCase): def test__worker_process_output_queue(self): self.benchmark.my_method = mock.Mock(return_value='my_result') - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - output_queue = multiprocessing.Queue() - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), output_queue) - time.sleep(0.1) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + output_queue = mock.Mock() + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), output_queue) self._assert_defaults__worker_run_setup_and_teardown() - self.assertEquals(output_queue.get(), 'my_result') + output_queue.put.assert_has_calls( + [mock.call('my_result', True, constants.QUEUE_PUT_TIMEOUT)]) def test__worker_process_output_queue_multiple_iterations(self): - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} self.benchmark.my_method = self.MyMethod() - - output_queue = multiprocessing.Queue() - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), output_queue) - time.sleep(0.1) + output_queue = mock.Mock() + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), output_queue) self._assert_defaults__worker_run_setup_and_teardown() - self.assertGreater(self.benchmark.my_method.count, 103) - - count = 101 - while not output_queue.empty(): - count += 1 - self.assertEquals(output_queue.get(), count) + for idx in range(102, 101 + len(output_queue.method_calls)): + output_queue.put.assert_has_calls( + [mock.call(idx, True, constants.QUEUE_PUT_TIMEOUT)]) def test__worker_process_queue(self): self.benchmark.my_method = self.MyMethod() - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - queue = multiprocessing.Queue() - timestamp = time.time() - proxduration._worker_process(queue, self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) - time.sleep(0.1) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + queue = mock.Mock() + proxduration._worker_process( + queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() - - result = queue.get() - self.assertGreater(result['timestamp'], timestamp) - self.assertEqual(result['errors'], '') - self.assertEqual(result['data'], {'my_key': 102}) - self.assertEqual(result['sequence'], 1) + benchmark_output = {'timestamp': mock.ANY, + 'sequence': 1, + 'data': {'my_key': 102}, + 'errors': ''} + queue.put.assert_has_calls( + [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)]) def test__worker_process_queue_multiple_iterations(self): - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} self.benchmark.my_method = self.MyMethod() - - queue = multiprocessing.Queue() - timestamp = time.time() - proxduration._worker_process(queue, self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) - time.sleep(0.1) + queue = mock.Mock() + proxduration._worker_process( + queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() - self.assertGreater(self.benchmark.my_method.count, 103) - - count = 0 - while not queue.empty(): - count += 1 - result = queue.get() - self.assertGreater(result['timestamp'], timestamp) - self.assertEqual(result['errors'], '') - self.assertEqual(result['data'], {'my_key': count + 101}) - self.assertEqual(result['sequence'], count) + for idx in range(102, 101 + len(queue.method_calls)): + benchmark_output = {'timestamp': mock.ANY, + 'sequence': idx - 101, + 'data': {'my_key': idx}, + 'errors': ''} + queue.put.assert_has_calls( + [mock.call(benchmark_output, True, + constants.QUEUE_PUT_TIMEOUT)]) def test__worker_process_except_sla_validation_error_no_sla_cfg(self): self.benchmark.my_method = mock.Mock( side_effect=y_exc.SLAValidationError) - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() - def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self): + @mock.patch.object(proxduration.LOG, 'warning') + def test__worker_process_except_sla_validation_error_sla_cfg_monitor( + self, *args): self.scenario_cfg['sla'] = {'action': 'monitor'} - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} self.benchmark.my_method = mock.Mock( side_effect=y_exc.SLAValidationError) - - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg, + {}, multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() def test__worker_process_raise_sla_validation_error_sla_cfg_default(self): self.scenario_cfg['sla'] = {} - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} self.benchmark.my_method = mock.Mock( side_effect=y_exc.SLAValidationError) - with self.assertRaises(y_exc.SLAValidationError): - proxduration._worker_process(mock.Mock(), self.benchmark_cls, - 'my_method', self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock()) self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) self.benchmark.setup.assert_called_once() self.benchmark.my_method.assert_called_once_with({}) def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self): - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} self.scenario_cfg['sla'] = {'action': 'assert'} self.benchmark.my_method = mock.Mock( side_effect=y_exc.SLAValidationError) with self.assertRaises(y_exc.SLAValidationError): - proxduration._worker_process(mock.Mock(), self.benchmark_cls, - 'my_method', self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock()) self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) self.benchmark.setup.assert_called_once() self.benchmark.my_method.assert_called_once_with({}) - def test__worker_process_queue_on_sla_validation_error_monitor(self): + @mock.patch.object(proxduration.LOG, 'warning') + def test__worker_process_queue_on_sla_validation_error_monitor( + self, *args): self.scenario_cfg['sla'] = {'action': 'monitor'} - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} self.benchmark.my_method = self.MyMethod( side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT) - - queue = multiprocessing.Queue() - timestamp = time.time() - proxduration._worker_process(queue, self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) - time.sleep(0.1) + queue = mock.Mock() + proxduration._worker_process( + queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() - - result = queue.get() - self.assertGreater(result['timestamp'], timestamp) - self.assertEqual(result['errors'], ('My Case SLA validation failed. ' - 'Error: my error message',)) - self.assertEqual(result['data'], {'my_key': 102}) - self.assertEqual(result['sequence'], 1) - - def test__worker_process_broad_exception(self): + benchmark_output = {'timestamp': mock.ANY, + 'sequence': 1, + 'data': {'my_key': 102}, + 'errors': ('My Case SLA validation failed. ' + 'Error: my error message', )} + queue.put.assert_has_calls( + [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)]) + + @mock.patch.object(proxduration.LOG, 'exception') + def test__worker_process_broad_exception(self, *args): self.benchmark.my_method = mock.Mock( side_effect=y_exc.YardstickException) - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock()) self._assert_defaults__worker_run_setup_and_teardown() - def test__worker_process_queue_on_broad_exception(self): + @mock.patch.object(proxduration.LOG, 'exception') + def test__worker_process_queue_on_broad_exception(self, *args): self.benchmark.my_method = self.MyMethod( side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT) - - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} - queue = multiprocessing.Queue() - timestamp = time.time() - proxduration._worker_process(queue, self.benchmark_cls, 'my_method', - self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) - time.sleep(0.1) - - self._assert_defaults__worker_run_setup_and_teardown() - - result = queue.get() - self.assertGreater(result['timestamp'], timestamp) - self.assertNotEqual(result['errors'], '') - self.assertEqual(result['data'], {'my_key': 102}) - self.assertEqual(result['sequence'], 1) - - def test__worker_process_benchmark_teardown_on_broad_exception(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} + queue = mock.Mock() + proxduration._worker_process( + queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + benchmark_output = {'timestamp': mock.ANY, + 'sequence': 1, + 'data': {'my_key': 102}, + 'errors': mock.ANY} + queue.put.assert_has_calls( + [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)]) + + @mock.patch.object(proxduration.LOG, 'exception') + def test__worker_process_benchmark_teardown_on_broad_exception( + self, *args): self.benchmark.teardown = mock.Mock( side_effect=y_exc.YardstickException) - - self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1} with self.assertRaises(SystemExit) as raised: - proxduration._worker_process(mock.Mock(), self.benchmark_cls, - 'my_method', self.scenario_cfg, {}, - multiprocessing.Event(), mock.Mock()) - self.assertEqual(raised.exception.code, 1) + proxduration._worker_process( + mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock()) + self.assertEqual(1, raised.exception.code) self._assert_defaults__worker_run_setup_and_teardown() |