summaryrefslogtreecommitdiffstats
path: root/compass-tasks/hdsdiscovery/base.py
blob: 77b3b0b63f9ee3b681b862818adb108235808925 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Base class extended by specific vendor in vendors directory.
A vendor needs to implement abstract methods of base class.
"""
import logging
import re

from abc import ABCMeta

from compass.hdsdiscovery.error import TimeoutError
from compass.hdsdiscovery import utils


class BaseVendor(object):
    """Basic Vendor object."""
    __metaclass__ = ABCMeta

    def is_this_vendor(self, sys_info, **kwargs):
        """Determine if the host is associated with this vendor.

        This function must be implemented by vendor itself
        """
        raise NotImplementedError


class BaseSnmpVendor(BaseVendor):
    """Base SNMP-based vendor plugin.

       .. note::
          It uses MIB-II sysDescr value to determine the vendor of the switch.
    """

    def __init__(self, matched_names):
        super(BaseSnmpVendor, self).__init__()
        self._matched_names = matched_names

    def is_this_vendor(self, sys_info, **kwargs):
        """Determine if the switch belongs to this vendor.

        Matching the system information retrieved from the switch.
        :param str sys_info: the system information retrieved from a switch
        Return True
        """
        if sys_info:
            for name in self._matched_names:
                if re.search(r"\b" + re.escape(name) + r"\b", sys_info,
                             re.IGNORECASE):
                    return True
        return False


class BasePlugin(object):
    """Extended by vendor's plugin.

    This plugin processes request and retrieve info directly from the switch.
    """
    __metaclass__ = ABCMeta

    def process_data(self, oper='SCAN', **kwargs):
        """Each vendors will have some plugins to do some operations.

        Plugin will process request data and return expected result.

        :param oper: operation function name.
        :param kwargs: key-value pairs of arguments
        """
        raise NotImplementedError

    # At least one of these three functions below must be implemented.
    def scan(self, **kwargs):
        """Get multiple records at once."""
        pass

    def set(self, **kwargs):
        """Set value to desired variable."""
        pass

    def get(self, **kwargs):
        """Get one record from a host."""
        pass


class BaseSnmpMacPlugin(BasePlugin):
    """Base snmp plugin."""

    def __init__(self, host, credential, oid='BRIDGE-MIB::dot1dTpFdbPort',
                 vlan_oid='Q-BRIDGE-MIB::dot1qPvid'):
        super(BaseSnmpMacPlugin, self).__init__()
        self.host = host
        self.credential = credential
        self.oid = oid
        self.port_oid = 'ifName'
        self.vlan_oid = vlan_oid

    def process_data(self, oper='SCAN', **kwargs):
        """progress data."""
        func_name = oper.lower()
        return getattr(self, func_name)(**kwargs)

    def scan(self, **kwargs):
        """scan."""
        results = None
        try:
            results = utils.snmpwalk_by_cl(self.host, self.credential,
                                           self.oid)
        except TimeoutError as error:
            logging.debug("PluginMac:scan snmpwalk_by_cl failed: %s",
                          error.message)
            return None

        mac_list = []
        for entity in results:
            if_index = entity['value']
            if entity and int(if_index):
                tmp = {}
                mac_numbers = entity['iid'].split('.')
                tmp['mac'] = self.get_mac_address(mac_numbers)
                tmp['port'] = self.get_port(if_index)
                tmp['vlan'] = self.get_vlan_id(if_index)
                mac_list.append(tmp)

        return mac_list

    def get_vlan_id(self, port):
        """Get vlan Id."""
        if not port:
            return None

        oid = '.'.join((self.vlan_oid, port))
        vlan_id = None
        result = None
        try:
            result = utils.snmpget_by_cl(self.host, self.credential, oid)
        except TimeoutError as error:
            logging.debug("[PluginMac:get_vlan_id snmpget_by_cl failed: %s]",
                          error.message)
            return None

        vlan_id = result.split()[-1]
        return vlan_id

    def get_port(self, if_index):
        """Get port number."""

        if_name = '.'.join((self.port_oid, if_index))
        result = None
        try:
            result = utils.snmpget_by_cl(self.host, self.credential, if_name)
        except TimeoutError as error:
            logging.debug("[PluginMac:get_port snmpget_by_cl failed: %s]",
                          error.message)
            return None

        # A result may be like "Value:  FasterEthernet1/2/34
        port = result.split()[-1].split('/')[-1]
        return port

    def convert_to_hex(self, value):
        """Convert the integer from decimal to hex."""

        return "%0.2x" % int(value)

    def get_mac_address(self, mac_numbers):
        """Assemble mac address from the list."""
        if len(mac_numbers) != 6:
            logging.error("[PluginMac:get_mac_address] MAC address must be "
                          "6 digitals")
            return None

        mac_in_hex = [self.convert_to_hex(num) for num in mac_numbers]
        return ":".join(mac_in_hex)