summaryrefslogtreecommitdiffstats
path: root/app/test/scan/test_scan_metadata_parser.py
blob: 91c11efaf0a1d826d480c91b426b68af117a0bb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
###############################################################################
# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems)   #
# and others                                                                  #
#                                                                             #
# All rights reserved. This program and the accompanying materials            #
# are made available under the terms of the Apache License, Version 2.0       #
# which accompanies this distribution, and is available at                    #
# http://www.apache.org/licenses/LICENSE-2.0                                  #
###############################################################################
from discover.fetchers.db.db_access import DbAccess
from discover.scan_metadata_parser import ScanMetadataParser
from test.scan.test_scan import TestScan
from test.scan.test_data.metadata import *
from unittest import mock
from utils.mongo_access import MongoAccess


SCANNERS_FILE = 'scanners.json'

JSON_REQUIRED_FIELDS_ERROR = 'Metadata json should contain all the ' + \
                             'following fields: scanners_package, scanners'

JSON_NO_SCANNERS = 'no scanners found in scanners list'
JSON_ERRORS_FOUND = 'Errors encountered during metadata file parsing:\n'


class TestScanMetadataParser(TestScan):
    def setUp(self):
        super().setUp()
        DbAccess.conn = mock.MagicMock()
        self.prepare_constants()
        self.parser = ScanMetadataParser(self.inv)

        self.parser.check_metadata_file_ok = mock.MagicMock()

    def prepare_metadata(self, content):
        self.parser._load_json_file = mock.MagicMock(return_value=content)

    def prepare_constants(self):
        MongoAccess.db = mock.MagicMock()
        MongoAccess.db["constants"].find_one = mock.MagicMock(side_effect=
                                                              lambda input:
                                                              CONSTANTS[input["name"]]
                                                              if CONSTANTS.get(input["name"])
                                                              else []
                                                              )

    def handle_error_scenario(self, input_content, expected_error,
                              add_errors_encountered_pretext=True):
        self.prepare_metadata(input_content)
        found_exception = False
        expected_message = expected_error
        metadata = None
        try:
            metadata = self.parser.parse_metadata_file(SCANNERS_FILE)
        except ValueError as e:
            found_exception = True
            expected_message = expected_error \
                if not add_errors_encountered_pretext \
                else JSON_ERRORS_FOUND + expected_error
            self.assertEqual(str(e), expected_message)
        self.assertTrue(found_exception,
                        'failed to throw exception, expected_message: {}'
                        .format(expected_message))
        self.assertIsNone(metadata)

    def handle_json_missing_field(self, json_content):
        self.handle_error_scenario(json_content, JSON_REQUIRED_FIELDS_ERROR,
                                   add_errors_encountered_pretext=False)

    def test_missing_field(self):
        for content in [METADATA_EMPTY, METADATA_NO_PACKAGE,
                        METADATA_NO_SCANNERS]:
            self.handle_json_missing_field(content)

    def test_json_no_scanners(self):
        self.handle_error_scenario(METADATA_ZERO_SCANNERS, JSON_NO_SCANNERS)

    def test_json_scanner_errors(self):
        errors_scenarios = [
            {
                'input': METADATA_ZERO_SCANNERS,
                'msg': JSON_NO_SCANNERS
            },
            {
                'input': METADATA_SCANNER_UNKNOWN_ATTRIBUTE,
                'msg': 'unknown attribute xyz in scanner ScanAggregate, type #1'
            },
            {
                'input': METADATA_SCANNER_NO_TYPE,
                'msg': 'scanner ScanAggregate, type #1: ' +
                       'missing attribute "type"'
            },
            {
                'input': METADATA_SCANNER_NO_FETCHER,
                'msg': 'scanner ScanAggregate, type #1: ' +
                       'missing attribute "fetcher"'
            },
            {
                'input': METADATA_SCANNER_INCORRECT_TYPE,
                'msg': 'scanner ScanAggregate: value not in types: t1'
            },
            {
                'input': METADATA_SCANNER_INCORRECT_FETCHER,
                'msg': 'failed to find fetcher class f1 '
                       'in scanner ScanAggregate type #1'
            },
            {
                'input': METADATA_SCANNER_WITH_INCORRECT_CHILD,
                'msg': 'scanner ScanAggregatesRoot type #1: '
                       'children_scanner must be a string'
            },
            {
                'input': METADATA_SCANNER_WITH_MISSING_CHILD,
                'msg': 'scanner ScanAggregatesRoot type #1: '
                       'children_scanner ScanAggregate not found '
            },
            {
                'input': METADATA_SCANNER_FETCHER_INVALID_DICT,
                'msg': 'scanner ScanEnvironment type #1: '
                       'only folder dict accepted in fetcher'
            },
            {
                'input': METADATA_SCANNER_WITH_INVALID_CONDITION,
                'msg': 'scanner ScanHost type #1: condition must be dict'
            }
        ]
        for scenario in errors_scenarios:
            self.handle_error_scenario(scenario['input'], scenario['msg'])

    def check_json_is_ok(self, json_content):
        self.prepare_metadata(json_content)
        found_exception = False
        metadata = None
        msg = None
        try:
            metadata = self.parser.parse_metadata_file(SCANNERS_FILE)
        except ValueError as e:
            found_exception = True
            msg = str(e)
        self.assertFalse(found_exception, 'Exception: {}'.format(msg))
        self.assertIsNotNone(metadata)

    def test_json_valid_content(self):
        valid_content = [
            METADATA_SIMPLE_SCANNER,
            METADATA_SCANNER_WITH_CHILD,
            METADATA_SCANNER_WITH_FOLDER,
            METADATA_SCANNER_WITH_CONDITION
        ]
        for content in valid_content:
            self.check_json_is_ok(content)