aboutsummaryrefslogtreecommitdiffstats
path: root/tests/unit/network_services/collector/test_subscriber.py
blob: d4b4ecf7a6d32398af20a2e3025bde77e3e8c73b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright (c) 2016-2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Unittest for yardstick.network_services.collector.subscriber

from __future__ import absolute_import
import unittest
import mock

from yardstick.network_services.collector import subscriber


class MockVnfAprrox(object):

    def __init__(self):
        self.result = {}
        self.name = "vnf__1"

    def collect_kpi(self):
        self.result = {
            'pkt_in_up_stream': 100,
            'pkt_drop_up_stream': 5,
            'pkt_in_down_stream': 50,
            'pkt_drop_down_stream': 40
        }
        return self.result


class CollectorTestCase(unittest.TestCase):

    NODES = {
        'node1': {},
        'node2': {
            'ip': '1.2.3.4',
            'collectd': {
                'plugins': {'abc': 12, 'def': 34},
                'interval': 987,
            },
        },
    }
    TRAFFIC_PROFILE = {
        'key1': 'value1',
    }

    def setUp(self):
        vnf = MockVnfAprrox()
        self.ssh_patch = mock.patch('yardstick.network_services.nfvi.resource.ssh', autospec=True)
        mock_ssh = self.ssh_patch.start()
        mock_instance = mock.Mock()
        mock_instance.execute.return_value = 0, '', ''
        mock_ssh.AutoConnectSSH.from_node.return_value = mock_instance
        self.collector = subscriber.Collector([vnf], self.NODES, self.TRAFFIC_PROFILE, 1800)

    def tearDown(self):
        self.ssh_patch.stop()

    def test___init__(self, *_):
        vnf = MockVnfAprrox()
        collector = subscriber.Collector([vnf], {}, {})
        self.assertEqual(len(collector.vnfs), 1)
        self.assertEqual(collector.traffic_profile, {})

    def test___init___with_data(self, *_):
        self.assertEqual(len(self.collector.vnfs), 1)
        self.assertDictEqual(self.collector.traffic_profile, self.TRAFFIC_PROFILE)
        self.assertEqual(len(self.collector.resource_profiles), 1)

    def test___init___negative(self, *_):
        pass

    def test_start(self, *_):
        self.assertIsNone(self.collector.start())

    def test_stop(self, *_):
        self.assertIsNone(self.collector.stop())

    def test_get_kpi(self, *_):
        result = self.collector.get_kpi()

        self.assertEqual(result["vnf__1"]["pkt_in_up_stream"], 100)
        self.assertEqual(result["vnf__1"]["pkt_drop_up_stream"], 5)
        self.assertEqual(result["vnf__1"]["pkt_in_down_stream"], 50)
        self.assertEqual(result["vnf__1"]["pkt_drop_down_stream"], 40)
        self.assertIn('node2', result)