From 78b0f48b178325c74d5609bac5c764ac111ad808 Mon Sep 17 00:00:00 2001 From: Koren Lev Date: Tue, 24 Oct 2017 13:35:42 +0300 Subject: several bug fixes for release 1.1 Change-Id: I433d41245107a68959efdcd6b56ce3348c7bbeb5 Signed-off-by: Koren Lev (cherry picked from commit 7cba26258ea17f790028a7005cb9bbbdc3923389) --- app/connection_test/connection_test.py | 283 +++++++++++++++++++++++ app/discover/clique_finder.py | 74 ++++-- app/discover/event_manager.py | 4 +- app/discover/fetchers/api/api_access.py | 20 +- app/discover/fetchers/db/db_access.py | 19 +- app/discover/manager.py | 9 + app/discover/scan.py | 13 +- app/discover/scan_manager.py | 8 - app/monitoring/setup/monitoring_setup_manager.py | 15 +- app/utils/constants.py | 14 ++ 10 files changed, 405 insertions(+), 54 deletions(-) create mode 100644 app/connection_test/connection_test.py diff --git a/app/connection_test/connection_test.py b/app/connection_test/connection_test.py new file mode 100644 index 0000000..d9d6af7 --- /dev/null +++ b/app/connection_test/connection_test.py @@ -0,0 +1,283 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import argparse +import datetime +from kombu import Connection + +import time + +import pymongo +from functools import partial + +from discover.fetchers.api.api_access import ApiAccess +from discover.fetchers.db.db_access import DbAccess +from discover.manager import Manager +from utils.constants import ConnectionTestStatus, ConnectionTestType +from utils.logging.file_logger import FileLogger +from utils.mongo_access import MongoAccess +from utils.ssh_connection import * + + +def test_openstack(config, test_request): + try: + api = ApiAccess(config) + ConnectionTest.report_success(test_request, + ConnectionTestType.OPENSTACK.value) + if api: + pass + except ValueError: + pass + + +def test_mysql(config, test_request): + db_access = DbAccess(config) + ConnectionTest.report_success(test_request, ConnectionTestType.MYSQL.value) + if db_access: + pass + + +def test_ssh_connect(config) -> bool: + ssh = SshConnection(config.get('host', ''), + config.get('user', ''), + _pwd=config.get('pwd'), + _key=config.get('key'), + _port=int(config.get('port', + SshConnection.DEFAULT_PORT))) + ret = ssh.connect() + return ret + + +def test_cli(config, test_request): + ret = test_ssh_connect(config) + ConnectionTest.set_test_result(test_request, + ConnectionTestType.CLI.value, + ret) + + +def test_amqp_connect(config): + connect_url = 'amqp://{user}:{pwd}@{host}:{port}//' \ + .format(user=config.get("user", ''), + pwd=config.get('pwd', ''), + host=config.get('host', ''), + port=int(config.get('port', 5671))) + conn = Connection(connect_url) + conn.connect() + + +def test_amqp(config, test_request): + test_amqp_connect(config) + ConnectionTest.report_success(test_request, ConnectionTestType.AMQP.value) + + +def test_monitoring(config, test_request): + # for monitoring configuration test, need to test: + # 1. SSH access + # 2. RabbitMQ access + ssh_config = { + 'host': config.get('server_ip'), + 'user': config.get('ssh_user'), + 'pwd': config.get('ssh_password'), + 'port': int(config.get('ssh_port', 0)) + } + if not test_ssh_connect(ssh_config): + return + amqp_connect_config = { + 'user': config.get('rabbitmq_user', ''), + 'pwd': config.get('rabbitmq_pass', ''), + 'host': config.get('server_ip'), + 'port': int(config.get('rabbitmq_port', 5672)), + } + test_amqp_connect(amqp_connect_config) + ConnectionTest.report_success(test_request, ConnectionTestType.AMQP.value) + + +def test_aci(config, test_request): + pass + + +TEST_HANDLERS = { + ConnectionTestType.OPENSTACK.value: test_openstack, + ConnectionTestType.MYSQL.value: test_mysql, + ConnectionTestType.CLI.value: test_cli, + ConnectionTestType.AMQP.value: test_amqp, + ConnectionTestType.ACI.value: test_aci, + ConnectionTestType.MONITORING.value: test_monitoring +} + + +class ConnectionTest(Manager): + + DEFAULTS = { + 'mongo_config': '', + 'connection_tests': 'connection_tests', + 'environments': 'environments_config', + 'interval': 1, + 'loglevel': 'WARNING' + } + + def __init__(self): + self.args = self.get_args() + super().__init__(log_directory=self.args.log_directory, + mongo_config_file=self.args.mongo_config) + self.db_client = None + self.connection_tests_collection = None + self.environments_collection = None + + @staticmethod + def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-m', '--mongo_config', nargs='?', type=str, + default=ConnectionTest.DEFAULTS['mongo_config'], + help='Name of config file ' + + 'with MongoDB server access details') + parser.add_argument('-c', '--connection_tests_collection', nargs='?', + type=str, + default=ConnectionTest.DEFAULTS['connection_tests'], + help='connection_tests collection to read from') + parser.add_argument('-e', '--environments_collection', nargs='?', + type=str, + default=ConnectionTest.DEFAULTS['environments'], + help='Environments collection to update ' + 'after tests') + parser.add_argument('-i', '--interval', nargs='?', type=float, + default=ConnectionTest.DEFAULTS['interval'], + help='Interval between collection polls' + '(must be more than {} seconds)' + .format(ConnectionTest.MIN_INTERVAL)) + parser.add_argument('-l', '--loglevel', nargs='?', type=str, + default=ConnectionTest.DEFAULTS['loglevel'], + help='Logging level \n(default: {})' + .format(ConnectionTest.DEFAULTS['loglevel'])) + parser.add_argument('-d', '--log_directory', nargs='?', type=str, + default=FileLogger.LOG_DIRECTORY, + help='File logger directory \n(default: {})' + .format(FileLogger.LOG_DIRECTORY)) + args = parser.parse_args() + return args + + def configure(self): + self.db_client = MongoAccess() + self.connection_tests_collection = \ + self.db_client.db[self.args.connection_tests_collection] + self.environments_collection = \ + self.db_client.db[self.args.environments_collection] + self._update_document = \ + partial(MongoAccess.update_document, + self.connection_tests_collection) + self.interval = max(self.MIN_INTERVAL, self.args.interval) + self.log.set_loglevel(self.args.loglevel) + + self.log.info('Started ConnectionTest with following configuration:\n' + 'Mongo config file path: {0.args.mongo_config}\n' + 'connection_tests collection: ' + '{0.connection_tests_collection.name}\n' + 'Polling interval: {0.interval} second(s)' + .format(self)) + + def _build_test_args(self, test_request: dict): + args = { + 'mongo_config': self.args.mongo_config + } + + def set_arg(name_from: str, name_to: str = None): + if name_to is None: + name_to = name_from + val = test_request.get(name_from) + if val: + args[name_to] = val + + set_arg('object_id', 'id') + set_arg('log_level', 'loglevel') + set_arg('environment', 'env') + set_arg('scan_only_inventory', 'inventory_only') + set_arg('scan_only_links', 'links_only') + set_arg('scan_only_cliques', 'cliques_only') + set_arg('inventory') + set_arg('clear') + set_arg('clear_all') + + return args + + def _finalize_test(self, test_request: dict): + # update the status and timestamps. + self.log.info('Request {} has been tested.' + .format(test_request['_id'])) + start_time = test_request['submit_timestamp'] + end_time = datetime.datetime.utcnow() + test_request['response_timestamp'] = end_time + test_request['response_time'] = \ + str(end_time - start_time.replace(tzinfo=None)) + test_request['status'] = ConnectionTestStatus.RESPONSE.value + self._update_document(test_request) + + @staticmethod + def set_test_result(test_request, target, result): + test_request.get('test_results', {})[target] = result + + @staticmethod + def report_success(test_request, target): + ConnectionTest.set_test_result(test_request, target, True) + + @staticmethod + def handle_test_target(target, test_request): + targets_config = test_request.get('targets_configuration', []) + try: + config = next(t for t in targets_config if t['name'] == target) + except StopIteration: + raise ValueError('failed to find {} in targets_configuration' + .format(target)) + handler = TEST_HANDLERS.get(target) + if not handler: + raise ValueError('unknown test target: {}'.format(target)) + handler(config, test_request) + + def do_test(self, test_request): + targets = [t for t in test_request.get('test_targets', [])] + test_request['test_results'] = {t: False for t in targets} + for test_target in test_request.get('test_targets', []): + self.log.info('testing connection to: {}'.format(test_target)) + try: + self.handle_test_target(test_target, test_request) + except Exception as e: + self.log.exception(e) + if 'errors' not in test_request: + test_request['errors'] = {} + test_request['errors'][test_target] = str(e) + self.log.error('Test of target {} failed (id: {}):\n{}' + .format(test_target, + test_request['_id'], + str(e))) + self._finalize_test(test_request) + self._set_env_operational(test_request['environment']) + + # if environment_config document for this specific environment exists, + # update the value of the 'operational' field to 'running' + def _set_env_operational(self, env): + self.environments_collection. \ + update_one({'name': env}, {'$set': {'operational': 'running'}}) + + def do_action(self): + while True: + # Find a pending request that is waiting the longest time + results = self.connection_tests_collection \ + .find({'status': ConnectionTestStatus.REQUEST.value, + 'submit_timestamp': {'$ne': None}}) \ + .sort('submit_timestamp', pymongo.ASCENDING) \ + .limit(1) + + # If no connection tests are pending, sleep for some time + if results.count() == 0: + time.sleep(self.interval) + else: + self.do_test(results[0]) + + +if __name__ == '__main__': + ConnectionTest().run() diff --git a/app/discover/clique_finder.py b/app/discover/clique_finder.py index 47843e6..57b2e3b 100644 --- a/app/discover/clique_finder.py +++ b/app/discover/clique_finder.py @@ -19,6 +19,7 @@ class CliqueFinder(Fetcher): def __init__(self): super().__init__() + self.env_config = None self.inv = InventoryMgr() self.inventory = self.inv.inventory_collection self.links = self.inv.collections["links"] @@ -27,6 +28,10 @@ class CliqueFinder(Fetcher): self.clique_constraints = self.inv.collections["clique_constraints"] self.cliques = self.inv.collections["cliques"] + def set_env(self, env): + super().set_env(env) + self.env_config = self.configuration.environment + def find_cliques_by_link(self, links_list): return self.links.find({'links': {'$in': links_list}}) @@ -43,23 +48,62 @@ class CliqueFinder(Fetcher): self.find_cliques_for_type(clique_type) self.log.info("finished scanning for cliques") + # Calculate priority score + def _get_priority_score(self, clique_type): + if self.env == clique_type['environment']: + return 4 + if (self.env_config['distribution'] == clique_type.get('distribution') and + self.env_config['distribution_version'] == clique_type.get('distribution_version')): + return 3 + if clique_type.get('mechanism_drivers') in self.env_config['mechanism_drivers']: + return 2 + if self.env_config['type_drivers'] == clique_type.get('type_drivers'): + return 1 + else: + return 0 + + # Get clique type with max priority + # for given environment configuration and focal point type + def _get_clique_type(self, focal_point, clique_types): + # If there's no configuration match for the specified environment, + # we use the default clique type definition with environment='ANY' + fallback_type = next( + filter(lambda t: t['environment'] == 'ANY', clique_types), + None + ) + if not fallback_type: + raise ValueError("No fallback clique type (ANY) " + "defined for focal point type '{}'" + .format(focal_point)) + + clique_types.remove(fallback_type) + + priority_scores = [self._get_priority_score(clique_type) + for clique_type + in clique_types] + max_score = max(priority_scores) if priority_scores else 0 + + return (fallback_type + if max_score == 0 + else clique_types[priority_scores.index(max_score)]) + def get_clique_types(self): if not self.clique_types_by_type: - clique_types = self.clique_types \ - .find({"environment": self.get_env()}) - default_clique_types = \ - self.clique_types.find({'environment': 'ANY'}) - for clique_type in clique_types: - focal_point_type = clique_type['focal_point_type'] - self.clique_types_by_type[focal_point_type] = clique_type - # if some focal point type does not have an explicit definition in - # clique_types for this specific environment, use the default - # clique type definition with environment=ANY - for clique_type in default_clique_types: - focal_point_type = clique_type['focal_point_type'] - if focal_point_type not in self.clique_types_by_type: - self.clique_types_by_type[focal_point_type] = clique_type - return self.clique_types_by_type + clique_types_by_focal_point = self.clique_types.aggregate([{ + "$group": { + "_id": "$focal_point_type", + "types": {"$push": "$$ROOT"} + } + }]) + + self.clique_types_by_type = { + cliques['_id']: self._get_clique_type(cliques['_id'], + cliques['types']) + for cliques in + clique_types_by_focal_point + } + + return self.clique_types_by_type def find_cliques_for_type(self, clique_type): focal_point_type = clique_type["focal_point_type"] diff --git a/app/discover/event_manager.py b/app/discover/event_manager.py index e2f8282..4855acc 100644 --- a/app/discover/event_manager.py +++ b/app/discover/event_manager.py @@ -44,6 +44,7 @@ class EventManager(Manager): '6.0': DefaultListener, '7.0': DefaultListener, '8.0': DefaultListener, + '9.0': DefaultListener }, 'RDO': { 'Mitaka': DefaultListener, @@ -112,7 +113,8 @@ class EventManager(Manager): def get_listener(self, env: str): env_config = self.inv.get_env_config(env) return (self.LISTENERS.get(env_config.get('distribution'), {}) - .get(env_config.get('distribution_version'))) + .get(env_config.get('distribution_version', + DefaultListener))) def listen_to_events(self, listener: ListenerBase, env_name: str, process_vars: dict): listener.listen({ diff --git a/app/discover/fetchers/api/api_access.py b/app/discover/fetchers/api/api_access.py index 84c4de3..f685faf 100644 --- a/app/discover/fetchers/api/api_access.py +++ b/app/discover/fetchers/api/api_access.py @@ -36,24 +36,23 @@ class ApiAccess(Fetcher): "neutron": ["quantum"] } - # identitity API v2 version with admin token - def __init__(self): + # identity API v2 version with admin token + def __init__(self, config=None): super(ApiAccess, self).__init__() if ApiAccess.initialized: return - ApiAccess.config = Configuration() + ApiAccess.config = {'OpenStack': config} if config else Configuration() ApiAccess.api_config = ApiAccess.config.get("OpenStack") - host = ApiAccess.api_config["host"] + host = ApiAccess.api_config.get("host", "") ApiAccess.host = host - port = ApiAccess.api_config["port"] + port = ApiAccess.api_config.get("port", "") if not (host and port): raise ValueError('Missing definition of host or port ' + 'for OpenStack API access') ApiAccess.base_url = "http://" + host + ":" + port - ApiAccess.admin_token = ApiAccess.api_config["admin_token"] - ApiAccess.admin_project = ApiAccess.api_config["admin_project"] \ - if "admin_project" in ApiAccess.api_config \ - else 'admin' + ApiAccess.admin_token = ApiAccess.api_config.get("admin_token", "") + ApiAccess.admin_project = ApiAccess.api_config.get("admin_project", + "admin") ApiAccess.admin_endpoint = "http://" + host + ":" + "35357" token = self.v2_auth_pwd(ApiAccess.admin_project) @@ -97,7 +96,8 @@ class ApiAccess(Fetcher): if subject_token: return subject_token req_url = ApiAccess.base_url + "/v2.0/tokens" - response = requests.post(req_url, json=post_body, headers=headers) + response = requests.post(req_url, json=post_body, headers=headers, + timeout=5) response = response.json() ApiAccess.auth_response[project_id] = response if 'error' in response: diff --git a/app/discover/fetchers/db/db_access.py b/app/discover/fetchers/db/db_access.py index 64d7372..090ab84 100644 --- a/app/discover/fetchers/db/db_access.py +++ b/app/discover/fetchers/db/db_access.py @@ -40,11 +40,12 @@ class DbAccess(Fetcher): # connection timeout set to 30 seconds, # due to problems over long connections - TIMEOUT = 30 + TIMEOUT = 5 - def __init__(self): + def __init__(self, mysql_config=None): super().__init__() - self.config = Configuration() + self.config = {'mysql': mysql_config} if mysql_config \ + else Configuration() self.conf = self.config.get("mysql") self.connect_to_db() self.neutron_db = self.get_neutron_db_name() @@ -61,8 +62,9 @@ class DbAccess(Fetcher): database=_database, raise_on_warnings=True) DbAccess.conn.ping(True) # auto-reconnect if necessary - except: - self.log.critical("failed to connect to MySQL DB") + except Exception as e: + self.log.critical("failed to connect to MySQL DB: {}" + .format(str(e))) return DbAccess.query_count_per_con = 0 @@ -91,10 +93,9 @@ class DbAccess(Fetcher): DbAccess.conn = None self.conf = self.config.get("mysql") cnf = self.conf - cnf['schema'] = cnf['schema'] if 'schema' in cnf else 'nova' - self.db_connect(cnf["host"], cnf["port"], - cnf["user"], cnf["pwd"], - cnf["schema"]) + self.db_connect(cnf.get('host', ''), cnf.get('port', ''), + cnf.get('user', ''), cnf.get('pwd', ''), + cnf.get('schema', 'nova')) @with_cursor def get_objects_list_for_id(self, query, object_type, object_id, diff --git a/app/discover/manager.py b/app/discover/manager.py index e37bb31..503c8a8 100644 --- a/app/discover/manager.py +++ b/app/discover/manager.py @@ -9,6 +9,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 # ############################################################################### from abc import ABC, abstractmethod +import datetime from utils.logging.file_logger import FileLogger from utils.logging.full_logger import FullLogger @@ -19,6 +20,14 @@ class Manager(ABC): MIN_INTERVAL = 0.1 # To prevent needlessly frequent scans + INTERVALS = { + 'YEARLY': datetime.timedelta(days=365.25), + 'MONTHLY': datetime.timedelta(days=365.25/12), + 'WEEKLY': datetime.timedelta(weeks=1), + 'DAILY': datetime.timedelta(days=1), + 'HOURLY': datetime.timedelta(hours=1) + } + def __init__(self, log_directory: str = None, mongo_config_file: str = None): super().__init__() diff --git a/app/discover/scan.py b/app/discover/scan.py index 6c40a7f..49f37ff 100755 --- a/app/discover/scan.py +++ b/app/discover/scan.py @@ -319,13 +319,20 @@ class ScanController(Fetcher): SshConnection.disconnect_all() status = 'ok' if not scanner.found_errors.get(env_name, False) \ else 'errors detected' + if status == 'ok' and scan_plan.object_type == "environment": + self.mark_env_scanned(scan_plan.env) self.log.info('Scan completed, status: {}'.format(status)) return True, status + def mark_env_scanned(self, env): + environments_collection = self.inv.collection['environments_config'] + environments_collection \ + .update_one(filter={'name': env}, + update={'$set': {'scanned': True}}) if __name__ == '__main__': - scan_manager = ScanController() - ret, msg = scan_manager.run() + scan_controller = ScanController() + ret, msg = scan_controller.run() if not ret: - scan_manager.log.error(msg) + scan_controller.log.error(msg) sys.exit(0 if ret else 1) diff --git a/app/discover/scan_manager.py b/app/discover/scan_manager.py index 12dbec0..6c46d47 100644 --- a/app/discover/scan_manager.py +++ b/app/discover/scan_manager.py @@ -170,14 +170,6 @@ class ScanManager(Manager): .update_many(filter={'name': {'$in': env_scans}}, update={'$set': {'scanned': False}}) - INTERVALS = { - 'YEARLY': datetime.timedelta(days=365.25), - 'MONTHLY': datetime.timedelta(days=365.25/12), - 'WEEKLY': datetime.timedelta(weeks=1), - 'DAILY': datetime.timedelta(days=1), - 'HOURLY': datetime.timedelta(hours=1) - } - def _submit_scan_request_for_schedule(self, scheduled_scan, interval, ts): scans = self.scans_collection new_scan = { diff --git a/app/monitoring/setup/monitoring_setup_manager.py b/app/monitoring/setup/monitoring_setup_manager.py index 9faf5b8..bc4fe01 100644 --- a/app/monitoring/setup/monitoring_setup_manager.py +++ b/app/monitoring/setup/monitoring_setup_manager.py @@ -40,14 +40,13 @@ class MonitoringSetupManager(MonitoringHandler): if self.provision == self.provision_levels['none']: self.log.debug('Monitoring config setup skipped') return - sensu_server_files = [ - 'transport.json', - 'client.json', - 'rabbitmq.json', - 'handlers.json', - 'redis.json', - 'api.json' - ] + sensu_server_files_templates = \ + self.inv.find({'side': 'server'}, + projection={'type': 1}, + collection='monitoring_config_templates') + sensu_server_files = [] + for f in sensu_server_files_templates: + sensu_server_files.append(f.get('type', '')) conf = self.env_monitoring_config is_container = bool(conf.get('ssh_user', '')) server_host = conf['server_ip'] diff --git a/app/utils/constants.py b/app/utils/constants.py index 01bf09f..5b53921 100644 --- a/app/utils/constants.py +++ b/app/utils/constants.py @@ -18,6 +18,20 @@ class StringEnum(Enum): return repr(self.value) +class ConnectionTestType(StringEnum): + AMQP = "AMQP" + CLI = "CLI" + ACI = "ACI" + MYSQL = "mysql" + OPENSTACK = "OpenStack" + MONITORING = "Monitoring" + + +class ConnectionTestStatus(StringEnum): + REQUEST = "request" + RESPONSE = "response" + + class ScanStatus(StringEnum): DRAFT = "draft" PENDING = "pending" -- cgit 1.2.3-korg