aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yardstick/tests/unit/benchmark/runner/test_proxduration.py275
1 files changed, 133 insertions, 142 deletions
diff --git a/yardstick/tests/unit/benchmark/runner/test_proxduration.py b/yardstick/tests/unit/benchmark/runner/test_proxduration.py
index be1715aad..3299c5b05 100644
--- a/yardstick/tests/unit/benchmark/runner/test_proxduration.py
+++ b/yardstick/tests/unit/benchmark/runner/test_proxduration.py
@@ -1,23 +1,29 @@
-##############################################################################
-# Copyright (c) 2018 Nokia and others.
+# Copyright (c) 2018 Intel Corporation
#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
import mock
import unittest
import multiprocessing
import os
-import time
from yardstick.benchmark.runners import proxduration
+from yardstick.common import constants
from yardstick.common import exceptions as y_exc
class ProxDurationRunnerTest(unittest.TestCase):
+
class MyMethod(object):
SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
BROAD_EXCEPTION_SIDE_EFFECT = 2
@@ -69,38 +75,37 @@ class ProxDurationRunnerTest(unittest.TestCase):
@mock.patch.object(os, 'getpid')
def test__worker_process_runner_id(self, mock_os_getpid):
mock_os_getpid.return_value = 101
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), mock.Mock())
- self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
+ self.assertEqual(101, self.scenario_cfg['runner']['runner_id'])
def test__worker_process_called_with_cfg(self):
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
def test__worker_process_called_with_cfg_loop(self):
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01}
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
self.assertGreater(self.benchmark.my_method.call_count, 2)
def test__worker_process_called_without_cfg(self):
scenario_cfg = {'runner': {}}
-
aborted = multiprocessing.Event()
aborted.set()
-
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- scenario_cfg, {}, aborted, mock.Mock())
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', scenario_cfg, {},
+ aborted, mock.Mock())
self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
self.benchmark.setup.assert_called_once()
@@ -108,188 +113,174 @@ class ProxDurationRunnerTest(unittest.TestCase):
def test__worker_process_output_queue(self):
self.benchmark.my_method = mock.Mock(return_value='my_result')
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- output_queue = multiprocessing.Queue()
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), output_queue)
- time.sleep(0.1)
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ output_queue = mock.Mock()
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), output_queue)
self._assert_defaults__worker_run_setup_and_teardown()
- self.assertEquals(output_queue.get(), 'my_result')
+ output_queue.put.assert_has_calls(
+ [mock.call('my_result', True, constants.QUEUE_PUT_TIMEOUT)])
def test__worker_process_output_queue_multiple_iterations(self):
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
self.benchmark.my_method = self.MyMethod()
-
- output_queue = multiprocessing.Queue()
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), output_queue)
- time.sleep(0.1)
+ output_queue = mock.Mock()
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), output_queue)
self._assert_defaults__worker_run_setup_and_teardown()
- self.assertGreater(self.benchmark.my_method.count, 103)
-
- count = 101
- while not output_queue.empty():
- count += 1
- self.assertEquals(output_queue.get(), count)
+ for idx in range(102, 101 + len(output_queue.method_calls)):
+ output_queue.put.assert_has_calls(
+ [mock.call(idx, True, constants.QUEUE_PUT_TIMEOUT)])
def test__worker_process_queue(self):
self.benchmark.my_method = self.MyMethod()
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- queue = multiprocessing.Queue()
- timestamp = time.time()
- proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
- time.sleep(0.1)
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ queue = mock.Mock()
+ proxduration._worker_process(
+ queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+ multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
-
- result = queue.get()
- self.assertGreater(result['timestamp'], timestamp)
- self.assertEqual(result['errors'], '')
- self.assertEqual(result['data'], {'my_key': 102})
- self.assertEqual(result['sequence'], 1)
+ benchmark_output = {'timestamp': mock.ANY,
+ 'sequence': 1,
+ 'data': {'my_key': 102},
+ 'errors': ''}
+ queue.put.assert_has_calls(
+ [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
def test__worker_process_queue_multiple_iterations(self):
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
self.benchmark.my_method = self.MyMethod()
-
- queue = multiprocessing.Queue()
- timestamp = time.time()
- proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
- time.sleep(0.1)
+ queue = mock.Mock()
+ proxduration._worker_process(
+ queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+ multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
- self.assertGreater(self.benchmark.my_method.count, 103)
-
- count = 0
- while not queue.empty():
- count += 1
- result = queue.get()
- self.assertGreater(result['timestamp'], timestamp)
- self.assertEqual(result['errors'], '')
- self.assertEqual(result['data'], {'my_key': count + 101})
- self.assertEqual(result['sequence'], count)
+ for idx in range(102, 101 + len(queue.method_calls)):
+ benchmark_output = {'timestamp': mock.ANY,
+ 'sequence': idx - 101,
+ 'data': {'my_key': idx},
+ 'errors': ''}
+ queue.put.assert_has_calls(
+ [mock.call(benchmark_output, True,
+ constants.QUEUE_PUT_TIMEOUT)])
def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
self.benchmark.my_method = mock.Mock(
side_effect=y_exc.SLAValidationError)
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
- def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self):
+ @mock.patch.object(proxduration.LOG, 'warning')
+ def test__worker_process_except_sla_validation_error_sla_cfg_monitor(
+ self, *args):
self.scenario_cfg['sla'] = {'action': 'monitor'}
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
self.benchmark.my_method = mock.Mock(
side_effect=y_exc.SLAValidationError)
-
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+ {}, multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
self.scenario_cfg['sla'] = {}
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
self.benchmark.my_method = mock.Mock(
side_effect=y_exc.SLAValidationError)
-
with self.assertRaises(y_exc.SLAValidationError):
- proxduration._worker_process(mock.Mock(), self.benchmark_cls,
- 'my_method', self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method',
+ self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
self.benchmark.setup.assert_called_once()
self.benchmark.my_method.assert_called_once_with({})
def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
self.scenario_cfg['sla'] = {'action': 'assert'}
self.benchmark.my_method = mock.Mock(
side_effect=y_exc.SLAValidationError)
with self.assertRaises(y_exc.SLAValidationError):
- proxduration._worker_process(mock.Mock(), self.benchmark_cls,
- 'my_method', self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method',
+ self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
self.benchmark.setup.assert_called_once()
self.benchmark.my_method.assert_called_once_with({})
- def test__worker_process_queue_on_sla_validation_error_monitor(self):
+ @mock.patch.object(proxduration.LOG, 'warning')
+ def test__worker_process_queue_on_sla_validation_error_monitor(
+ self, *args):
self.scenario_cfg['sla'] = {'action': 'monitor'}
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
self.benchmark.my_method = self.MyMethod(
side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
-
- queue = multiprocessing.Queue()
- timestamp = time.time()
- proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
- time.sleep(0.1)
+ queue = mock.Mock()
+ proxduration._worker_process(
+ queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+ multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
-
- result = queue.get()
- self.assertGreater(result['timestamp'], timestamp)
- self.assertEqual(result['errors'], ('My Case SLA validation failed. '
- 'Error: my error message',))
- self.assertEqual(result['data'], {'my_key': 102})
- self.assertEqual(result['sequence'], 1)
-
- def test__worker_process_broad_exception(self):
+ benchmark_output = {'timestamp': mock.ANY,
+ 'sequence': 1,
+ 'data': {'my_key': 102},
+ 'errors': ('My Case SLA validation failed. '
+ 'Error: my error message', )}
+ queue.put.assert_has_calls(
+ [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
+
+ @mock.patch.object(proxduration.LOG, 'exception')
+ def test__worker_process_broad_exception(self, *args):
self.benchmark.my_method = mock.Mock(
side_effect=y_exc.YardstickException)
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method',
+ self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
self._assert_defaults__worker_run_setup_and_teardown()
- def test__worker_process_queue_on_broad_exception(self):
+ @mock.patch.object(proxduration.LOG, 'exception')
+ def test__worker_process_queue_on_broad_exception(self, *args):
self.benchmark.my_method = self.MyMethod(
side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
-
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
- queue = multiprocessing.Queue()
- timestamp = time.time()
- proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
- self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
- time.sleep(0.1)
-
- self._assert_defaults__worker_run_setup_and_teardown()
-
- result = queue.get()
- self.assertGreater(result['timestamp'], timestamp)
- self.assertNotEqual(result['errors'], '')
- self.assertEqual(result['data'], {'my_key': 102})
- self.assertEqual(result['sequence'], 1)
-
- def test__worker_process_benchmark_teardown_on_broad_exception(self):
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+ queue = mock.Mock()
+ proxduration._worker_process(
+ queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+ multiprocessing.Event(), mock.Mock())
+
+ benchmark_output = {'timestamp': mock.ANY,
+ 'sequence': 1,
+ 'data': {'my_key': 102},
+ 'errors': mock.ANY}
+ queue.put.assert_has_calls(
+ [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
+
+ @mock.patch.object(proxduration.LOG, 'exception')
+ def test__worker_process_benchmark_teardown_on_broad_exception(
+ self, *args):
self.benchmark.teardown = mock.Mock(
side_effect=y_exc.YardstickException)
-
- self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+ self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
with self.assertRaises(SystemExit) as raised:
- proxduration._worker_process(mock.Mock(), self.benchmark_cls,
- 'my_method', self.scenario_cfg, {},
- multiprocessing.Event(), mock.Mock())
- self.assertEqual(raised.exception.code, 1)
+ proxduration._worker_process(
+ mock.Mock(), self.benchmark_cls, 'my_method',
+ self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
+ self.assertEqual(1, raised.exception.code)
self._assert_defaults__worker_run_setup_and_teardown()