aboutsummaryrefslogtreecommitdiffstats
path: root/app/discover/scan_metadata_parser.py
blob: df27e187dfb84868177b09a2b76939d96722436f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
###############################################################################
# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems)   #
# and others                                                                  #
#                                                                             #
# All rights reserved. This program and the accompanying materials            #
# are made available under the terms of the Apache License, Version 2.0       #
# which accompanies this distribution, and is available at                    #
# http://www.apache.org/licenses/LICENSE-2.0                                  #
###############################################################################
from discover.fetchers.folder_fetcher import FolderFetcher
from utils.metadata_parser import MetadataParser
from utils.mongo_access import MongoAccess
from utils.util import ClassResolver


class ScanMetadataParser(MetadataParser):

    SCANNERS_PACKAGE = 'scanners_package'
    SCANNERS_FILE = 'scanners.json'
    SCANNERS = 'scanners'

    TYPE = 'type'
    FETCHER = 'fetcher'
    CHILDREN_SCANNER = 'children_scanner'
    ENVIRONMENT_CONDITION = 'environment_condition'
    OBJECT_ID_TO_USE_IN_CHILD = 'object_id_to_use_in_child'

    COMMENT = '_comment'

    REQUIRED_SCANNER_ATTRIBUTES = [TYPE, FETCHER]
    ALLOWED_SCANNER_ATTRIBUTES = [TYPE, FETCHER, CHILDREN_SCANNER,
                                  ENVIRONMENT_CONDITION,
                                  OBJECT_ID_TO_USE_IN_CHILD]

    MECHANISM_DRIVER = 'mechanism_driver'

    def __init__(self, inventory_mgr):
        super().__init__()
        self.inv = inventory_mgr
        self.constants = {}

    def get_required_fields(self):
        return [self.SCANNERS_PACKAGE, self.SCANNERS]

    def validate_fetcher(self, scanner_name: str, scan_type: dict,
                         type_index: int, package: str):
        fetcher = scan_type.get(self.FETCHER, '')
        if not fetcher:
            self.add_error('missing or empty fetcher in scanner {} type #{}'
                           .format(scanner_name, str(type_index)))
        elif isinstance(fetcher, str):
            try:
                module_name = ClassResolver.get_module_file_by_class_name(fetcher)
                fetcher_package = module_name.split("_")[0]
                if package:
                    fetcher_package = ".".join((package, fetcher_package))
                instance = ClassResolver.get_instance_of_class(package_name=fetcher_package,
                                                               module_name=module_name,
                                                               class_name=fetcher)
            except ValueError:
                instance = None
            if not instance:
                self.add_error('failed to find fetcher class {} in scanner {}'
                               ' type #{}'
                               .format(fetcher, scanner_name, type_index))
            scan_type[self.FETCHER] = instance
        elif isinstance(fetcher, dict):
            is_folder = fetcher.get('folder', False)
            if not is_folder:
                self.add_error('scanner {} type #{}: '
                               'only folder dict accepted in fetcher'
                               .format(scanner_name, type_index))
            else:
                instance = FolderFetcher(fetcher['types_name'],
                                         fetcher['parent_type'],
                                         fetcher.get('text', ''))
                scan_type[self.FETCHER] = instance
        else:
            self.add_error('incorrect type of fetcher for scanner {} type #{}'
                           .format(scanner_name, type_index))

    def validate_children_scanner(self, scanner_name: str, type_index: int,
                                  scanners: dict, scan_type: dict):
        scanner = scanners[scanner_name]
        if 'children_scanner' in scan_type:
            children_scanner = scan_type.get('children_scanner')
            if not isinstance(children_scanner, str):
                self.add_error('scanner {} type #{}: '
                               'children_scanner must be a string'
                               .format(scanner_name, type_index))
            elif children_scanner not in scanners:
                self.add_error('scanner {} type #{}: '
                               'children_scanner {} not found '
                               .format(scanner_name, type_index,
                                       children_scanner))

    def validate_environment_condition(self, scanner_name: str, type_index: int,
                                       scanner: dict):
        if self.ENVIRONMENT_CONDITION not in scanner:
            return
        condition = scanner[self.ENVIRONMENT_CONDITION]
        if not isinstance(condition, dict):
            self.add_error('scanner {} type #{}: condition must be dict'
                           .format(scanner_name, str(type_index)))
            return
        if self.MECHANISM_DRIVER in condition.keys():
            drivers = condition[self.MECHANISM_DRIVER]
            if not isinstance(drivers, list):
                self.add_error('scanner {} type #{}: '
                               '{} must be a list of strings'
                               .format(scanner_name, type_index,
                                       self.MECHANISM_DRIVER))
            if not all((isinstance(driver, str) for driver in drivers)):
                self.add_error('scanner {} type #{}: '
                               '{} must be a list of strings'
                               .format(scanner_name, type_index,
                                       self.MECHANISM_DRIVER))
            else:
                for driver in drivers:
                    self.validate_constant(scanner_name,
                                           driver,
                                           'mechanism_drivers',
                                           'mechanism drivers')

    def validate_scanner(self, scanners: dict, name: str, package: str):
        scanner = scanners.get(name)
        if not scanner:
            self.add_error('failed to find scanner: {}')
            return

        # make sure only allowed attributes are supplied
        for i in range(0, len(scanner)):
            scan_type = scanner[i]
            self.validate_scan_type(scanners, name, i+1, scan_type, package)

    def validate_scan_type(self, scanners: dict, scanner_name: str,
                           type_index: int, scan_type: dict, package: str):
        # keep previous error count to know if errors were detected here
        error_count = len(self.errors)
        # ignore comments
        scan_type.pop(self.COMMENT, '')
        for attribute in scan_type.keys():
            if attribute not in self.ALLOWED_SCANNER_ATTRIBUTES:
                self.add_error('unknown attribute {} '
                               'in scanner {}, type #{}'
                               .format(attribute, scanner_name,
                                       str(type_index)))

        # make sure required attributes are supplied
        for attribute in ScanMetadataParser.REQUIRED_SCANNER_ATTRIBUTES:
            if attribute not in scan_type:
                self.add_error('scanner {}, type #{}: '
                               'missing attribute "{}"'
                               .format(scanner_name, str(type_index),
                                       attribute))
        # the following checks depend on previous checks,
        # so return if previous checks found errors
        if len(self.errors) > error_count:
            return

        # type must be valid object type
        self.validate_constant(scanner_name, scan_type[self.TYPE],
                               'scan_object_types', 'types')
        self.validate_fetcher(scanner_name, scan_type, type_index, package)
        self.validate_children_scanner(scanner_name, type_index, scanners,
                                       scan_type)
        self.validate_environment_condition(scanner_name, type_index,
                                            scan_type)

    def get_constants(self, scanner_name, items_desc, constant_type):
        if not self.constants.get(constant_type):
            constants = MongoAccess.db['constants']
            values_list = constants.find_one({'name': constant_type})
            if not values_list:
                raise ValueError('scanner {}: '
                                 'could not find {} list in DB'
                                 .format(scanner_name, items_desc))
            self.constants[constant_type] = values_list
        return self.constants[constant_type]

    def validate_constant(self,
                          scanner_name: str,
                          value_to_check: str,
                          constant_type: str,
                          items_desc: str = None):
        values_list = self.get_constants(scanner_name, items_desc,
                                         constant_type)
        values = [t['value'] for t in values_list['data']]
        if value_to_check not in values:
            self.add_error('scanner {}: value not in {}: {}'
                           .format(scanner_name, items_desc, value_to_check))

    def validate_metadata(self, metadata: dict) -> bool:
        super().validate_metadata(metadata)
        scanners = metadata.get(self.SCANNERS, {})
        package = metadata.get(self.SCANNERS_PACKAGE)
        if not scanners:
            self.add_error('no scanners found in scanners list')
        else:
            for name in scanners.keys():
                self.validate_scanner(scanners, name, package)
        return len(self.errors) == 0