aboutsummaryrefslogtreecommitdiffstats
path: root/app/test/scan/test_scan_controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'app/test/scan/test_scan_controller.py')
-rw-r--r--app/test/scan/test_scan_controller.py215
1 files changed, 215 insertions, 0 deletions
diff --git a/app/test/scan/test_scan_controller.py b/app/test/scan/test_scan_controller.py
new file mode 100644
index 0000000..f3bcc9a
--- /dev/null
+++ b/app/test/scan/test_scan_controller.py
@@ -0,0 +1,215 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import sys
+from unittest.mock import MagicMock
+
+from discover.scan import ScanController, ScanPlan
+from discover.scanner import Scanner
+from test.scan.test_scan import TestScan
+from test.scan.test_data.scan import *
+from utils.inventory_mgr import InventoryMgr
+
+
+class TestScanController(TestScan):
+
+ def setUp(self):
+ super().setUp()
+ self.scan_controller = ScanController()
+
+ def arg_validate(self, args, expected, key, err=None):
+ if key not in expected:
+ return
+ err = err if err else 'The value of {} is wrong'.format(key)
+ self.assertEqual(args.get(key, None), expected[key.upper()], err)
+
+ def check_args_values(self, args, expected):
+ self.arg_validate(args, expected, 'env',
+ 'The value of environment is wrong')
+ keys = ['mongo_config', 'mongo_config', 'type', 'inventory',
+ 'scan_self', 'id', 'parent_id', 'parent_type', 'id_field',
+ 'loglevel', 'inventory_only', 'links_only', 'cliques_only',
+ 'clear']
+ for key in keys:
+ self.arg_validate(args, expected, key)
+
+ def test_get_args_with_default_arguments(self):
+ sys.argv = DEFAULT_COMMAND_ARGS
+ args = self.scan_controller.get_args()
+ # check the default value of each argument
+ self.check_args_values(args, DEFAULT_ARGUMENTS)
+
+ def test_get_args_with_short_command_args(self):
+ sys.argv = SHORT_COMMAND_ARGS
+ args = self.scan_controller.get_args()
+ # check the value parsed by short arguments
+ self.check_args_values(args, SHORT_FLAGS_ARGUMENTS)
+
+ def test_get_args_with_full_command_args(self):
+ sys.argv = LONG_COMMAND_ARGS
+ args = self.scan_controller.get_args()
+ # check the value parsed by long arguments
+ self.check_args_values(args, ARGUMENTS_FULL)
+
+ def test_get_args_with_full_command_args_clear_all(self):
+ sys.argv = LONG_COMMAND_ARGS_CLEAR_ALL
+ args = self.scan_controller.get_args()
+ # check the value parsed by long arguments
+ self.check_args_values(args, ARGUMENTS_FULL_CLEAR_ALL)
+
+ def test_get_args_with_full_command_args_inventory_only(self):
+ sys.argv = LONG_COMMAND_ARGS_INVENTORY_ONLY
+ args = self.scan_controller.get_args()
+ # check the value parsed by long arguments
+ self.check_args_values(args, ARGUMENTS_FULL_INVENTORY_ONLY)
+
+ def test_get_args_with_full_command_args_links_only(self):
+ sys.argv = LONG_COMMAND_ARGS_LINKS_ONLY
+ args = self.scan_controller.get_args()
+ # check the value parsed by long arguments
+ self.check_args_values(args, ARGUMENTS_FULL_LINKS_ONLY)
+
+ def test_get_args_with_full_command_args_cliques_only(self):
+ sys.argv = LONG_COMMAND_ARGS_CLIQUES_ONLY
+ args = self.scan_controller.get_args()
+ # check the value parsed by long arguments
+ self.check_args_values(args, ARGUMENTS_FULL_CLIQUES_ONLY)
+
+ def side_effect(self, key, default):
+ if key in FORM.keys():
+ return FORM[key]
+ else:
+ return default
+
+ def check_plan_values(self, plan, scanner_type, obj_id,
+ child_type, child_id):
+ self.assertEqual(scanner_type, plan.scanner_type,
+ 'The scanner class is wrong')
+ self.assertEqual(child_type, plan.child_type,
+ 'The child type is wrong')
+ self.assertEqual(child_id, plan.child_id,
+ 'The child id is wrong')
+ self.assertEqual(obj_id, plan.object_id, 'The object is wrong')
+
+ def test_prepare_scan_plan(self):
+ scan_plan = ScanPlan(SCAN_ENV_PLAN_TO_BE_PREPARED)
+ plan = self.scan_controller.prepare_scan_plan(scan_plan)
+ self.check_plan_values(plan, SCANNER_TYPE_FOR_ENV,
+ OBJ_ID_FOR_ENV, CHILD_TYPE_FOR_ENV,
+ CHILD_ID_FOR_ENV)
+
+ def test_prepare_scan_region_plan(self):
+ original_get_by_id = self.inv.get_by_id
+ self.inv.get_by_id = MagicMock(return_value=REGIONS_FOLDER)
+
+ self.scan_controller.inv = self.inv
+ scan_plan = ScanPlan(SCAN_REGION_PLAN_TO_BE_PREPARED)
+ plan = self.scan_controller.prepare_scan_plan(scan_plan)
+
+ self.check_plan_values(plan, SCANNER_TYPE_FOR_REGION,
+ OBJ_ID_FOR_REGION, CHILD_TYPE_FOR_REGION,
+ CHILD_ID_FOR_REGION)
+ self.inv.get_by_id = original_get_by_id
+
+ def test_prepare_scan_region_folder_plan(self):
+ scan_plan = ScanPlan(SCAN_REGION_FOLDER_PLAN_TO_BE_PREPARED)
+ plan = self.scan_controller.prepare_scan_plan(scan_plan)
+ self.check_plan_values(plan, SCANNER_CLASS_FOR_REGION_FOLDER,
+ OBJ_ID_FOR_REGION_FOLDER,
+ CHILD_TYPE_FOR_REGION_FOLDER,
+ CHILD_ID_FOR_REGION_FOLDER)
+
+ def check_scan_method_calls(self, mock, count):
+ if count:
+ self.assertTrue(mock.called)
+ else:
+ mock.assert_not_called()
+
+ def check_scan_counts(self, run_scan_count, scan_links_count,
+ scan_cliques_count, deploy_monitoring_setup_count):
+ self.check_scan_method_calls(Scanner.scan, run_scan_count)
+ self.check_scan_method_calls(Scanner.scan_links, scan_links_count)
+ self.check_scan_method_calls(Scanner.scan_cliques, scan_cliques_count)
+ self.check_scan_method_calls(Scanner.deploy_monitoring_setup,
+ deploy_monitoring_setup_count)
+
+ def prepare_scan_mocks(self):
+ self.load_metadata = Scanner.load_metadata
+ self.scan = Scanner.scan
+ self.scan_links = Scanner.scan_links
+ self.scan_cliques = Scanner.scan_cliques
+ self.deploy_monitoring_setup = Scanner.deploy_monitoring_setup
+
+ Scanner.load_metadata = MagicMock()
+ Scanner.scan = MagicMock()
+ Scanner.scan_links = MagicMock()
+ Scanner.scan_cliques = MagicMock()
+ Scanner.deploy_monitoring_setup = MagicMock()
+
+ def reset_methods(self):
+ Scanner.load_metadata = self.load_metadata
+ Scanner.scan = self.scan
+ Scanner.scan_links = self.scan_links
+ Scanner.scan_cliques = self.scan_cliques
+ Scanner.deploy_monitoring_setup = self.deploy_monitoring_setup
+
+ def test_scan(self):
+ self.scan_controller.get_args = MagicMock()
+ InventoryMgr.is_feature_supported = MagicMock(return_value=False)
+ plan = self.scan_controller.prepare_scan_plan(ScanPlan(SCAN_ENV_PLAN_TO_BE_PREPARED))
+ self.scan_controller.get_scan_plan = MagicMock(return_value=plan)
+ self.prepare_scan_mocks()
+
+ self.scan_controller.run()
+ self.check_scan_counts(1, 1, 1, 0)
+ self.reset_methods()
+
+ def test_scan_with_monitoring_setup(self):
+ self.scan_controller.get_args = MagicMock()
+ InventoryMgr.is_feature_supported = MagicMock(return_value=True)
+ plan = self.scan_controller.prepare_scan_plan(ScanPlan(SCAN_ENV_PLAN_TO_BE_PREPARED))
+ self.scan_controller.get_scan_plan = MagicMock(return_value=plan)
+ self.prepare_scan_mocks()
+
+ self.scan_controller.run()
+ self.check_scan_counts(1, 1, 1, 1)
+ self.reset_methods()
+
+ def test_scan_with_inventory_only(self):
+ self.scan_controller.get_args = MagicMock()
+ scan_plan = ScanPlan(SCAN_ENV_INVENTORY_ONLY_PLAN_TO_BE_PREPARED)
+ plan = self.scan_controller.prepare_scan_plan(scan_plan)
+ self.scan_controller.get_scan_plan = MagicMock(return_value=plan)
+ self.prepare_scan_mocks()
+
+ self.scan_controller.run()
+ self.check_scan_counts(1, 0, 0, 0)
+ self.reset_methods()
+
+ def test_scan_with_links_only(self):
+ self.scan_controller.get_args = MagicMock()
+ scan_plan = ScanPlan(SCAN_ENV_LINKS_ONLY_PLAN_TO_BE_PREPARED)
+ plan = self.scan_controller.prepare_scan_plan(scan_plan)
+ self.scan_controller.get_scan_plan = MagicMock(return_value=plan)
+ self.prepare_scan_mocks()
+
+ self.scan_controller.run()
+ self.check_scan_counts(0, 1, 0, 0)
+ self.reset_methods()
+
+ def test_scan_with_cliques_only(self):
+ self.scan_controller.get_args = MagicMock()
+ scan_plan = ScanPlan(SCAN_ENV_CLIQUES_ONLY_PLAN_TO_BE_PREPARED)
+ plan = self.scan_controller.prepare_scan_plan(scan_plan)
+ self.scan_controller.get_scan_plan = MagicMock(return_value=plan)
+ self.prepare_scan_mocks()
+
+ self.scan_controller.run()
+ self.check_scan_counts(0, 0, 1, 0)
+ self.reset_methods()