aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/tests/unit/benchmark/runner/test_base.py
blob: 07d6f1843ef7d973e2ca17fc141ff1a79aca906d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
##############################################################################
# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################

import time

import mock
import subprocess

from yardstick.benchmark.runners import base as runner_base
from yardstick.benchmark.runners import iteration
from yardstick.tests.unit import base as ut_base


class ActionTestCase(ut_base.BaseUnitTestCase):

    def setUp(self):
        self._mock_log = mock.patch.object(runner_base.log, 'error')
        self.mock_log = self._mock_log.start()
        self.addCleanup(self._stop_mocks)

    def _stop_mocks(self):
        self._mock_log.stop()

    @mock.patch.object(subprocess, 'check_output')
    def test__execute_shell_command(self, mock_subprocess):
        mock_subprocess.side_effect = subprocess.CalledProcessError(-1, '')
        self.assertEqual(runner_base._execute_shell_command("")[0], -1)

    @mock.patch.object(subprocess, 'check_output')
    def test__single_action(self, mock_subprocess):
        mock_subprocess.side_effect = subprocess.CalledProcessError(-1, '')
        runner_base._single_action(0, 'echo', mock.Mock())

    @mock.patch.object(subprocess, 'check_output')
    def test__periodic_action(self, mock_subprocess):
        mock_subprocess.side_effect = subprocess.CalledProcessError(-1, '')
        runner_base._periodic_action(0, 'echo', mock.Mock())


class ScenarioOutputTestCase(ut_base.BaseUnitTestCase):

    def setUp(self):
        self.output_queue = mock.Mock()
        self.scenario_output = runner_base.ScenarioOutput(self.output_queue,
                                                          sequence=1)

    @mock.patch.object(time, 'time')
    def test_push(self, mock_time):
        mock_time.return_value = 2
        data = {"value1": 1}
        self.scenario_output.push(data)
        self.output_queue.put.assert_called_once_with({'timestamp': 2,
                                                       'sequence': 1,
                                                       'data': data}, True, 10)

    def test_push_no_timestamp(self):
        self.scenario_output["value1"] = 1
        self.scenario_output.push(None, False)
        self.output_queue.put.assert_called_once_with({'sequence': 1,
                                                       'value1': 1}, True, 10)


class RunnerTestCase(ut_base.BaseUnitTestCase):

    def setUp(self):
        config = {
            'output_config': {
                'DEFAULT': {
                    'dispatcher': 'file'
                }
            }
        }
        self.runner = iteration.IterationRunner(config)

    @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
    def test_get_output(self, *args):
        self.runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
        self.runner.output_queue.put({'criteria': 'PASS'})

        idle_result = {
            'case': 'opnfv_yardstick_tc002',
            'criteria': 'PASS'
        }

        for _ in range(1000):
            time.sleep(0.01)
            if not self.runner.output_queue.empty():
                break
        actual_result = self.runner.get_output()
        self.assertEqual(idle_result, actual_result)

    @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
    def test_get_result(self, *args):
        self.runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
        self.runner.result_queue.put({'criteria': 'PASS'})

        idle_result = [
            {'case': 'opnfv_yardstick_tc002'},
            {'criteria': 'PASS'}
        ]

        for _ in range(1000):
            time.sleep(0.01)
            if not self.runner.result_queue.empty():
                break
        actual_result = self.runner.get_result()
        self.assertEqual(idle_result, actual_result)

    def test__run_benchmark(self):
        runner = runner_base.Runner(mock.Mock())

        with self.assertRaises(NotImplementedError):
            runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())