1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
##############################################################################
# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import time
import mock
import subprocess
from yardstick.benchmark.runners import base as runner_base
from yardstick.benchmark.runners import iteration
from yardstick.tests.unit import base as ut_base
class ActionTestCase(ut_base.BaseUnitTestCase):
def setUp(self):
self._mock_log = mock.patch.object(runner_base.log, 'error')
self.mock_log = self._mock_log.start()
self.addCleanup(self._stop_mocks)
def _stop_mocks(self):
self._mock_log.stop()
@mock.patch.object(subprocess, 'check_output')
def test__execute_shell_command(self, mock_subprocess):
mock_subprocess.side_effect = subprocess.CalledProcessError(-1, '')
self.assertEqual(runner_base._execute_shell_command("")[0], -1)
@mock.patch.object(subprocess, 'check_output')
def test__single_action(self, mock_subprocess):
mock_subprocess.side_effect = subprocess.CalledProcessError(-1, '')
runner_base._single_action(0, 'echo', mock.Mock())
@mock.patch.object(subprocess, 'check_output')
def test__periodic_action(self, mock_subprocess):
mock_subprocess.side_effect = subprocess.CalledProcessError(-1, '')
runner_base._periodic_action(0, 'echo', mock.Mock())
class ScenarioOutputTestCase(ut_base.BaseUnitTestCase):
def setUp(self):
self.output_queue = mock.Mock()
self.scenario_output = runner_base.ScenarioOutput(self.output_queue,
sequence=1)
@mock.patch.object(time, 'time')
def test_push(self, mock_time):
mock_time.return_value = 2
data = {"value1": 1}
self.scenario_output.push(data)
self.output_queue.put.assert_called_once_with({'timestamp': 2,
'sequence': 1,
'data': data}, True, 10)
def test_push_no_timestamp(self):
self.scenario_output["value1"] = 1
self.scenario_output.push(None, False)
self.output_queue.put.assert_called_once_with({'sequence': 1,
'value1': 1}, True, 10)
class RunnerTestCase(ut_base.BaseUnitTestCase):
def setUp(self):
config = {
'output_config': {
'DEFAULT': {
'dispatcher': 'file'
}
}
}
self.runner = iteration.IterationRunner(config)
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
def test_get_output(self, *args):
self.runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
self.runner.output_queue.put({'criteria': 'PASS'})
idle_result = {
'case': 'opnfv_yardstick_tc002',
'criteria': 'PASS'
}
for _ in range(1000):
time.sleep(0.01)
if not self.runner.output_queue.empty():
break
actual_result = self.runner.get_output()
self.assertEqual(idle_result, actual_result)
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
def test_get_result(self, *args):
self.runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
self.runner.result_queue.put({'criteria': 'PASS'})
idle_result = [
{'case': 'opnfv_yardstick_tc002'},
{'criteria': 'PASS'}
]
for _ in range(1000):
time.sleep(0.01)
if not self.runner.result_queue.empty():
break
actual_result = self.runner.get_result()
self.assertEqual(idle_result, actual_result)
def test__run_benchmark(self):
runner = runner_base.Runner(mock.Mock())
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
|