diff options
Diffstat (limited to 'app/connection_test')
-rw-r--r-- | app/connection_test/connection_test.py | 283 |
1 files changed, 283 insertions, 0 deletions
diff --git a/app/connection_test/connection_test.py b/app/connection_test/connection_test.py new file mode 100644 index 0000000..d9d6af7 --- /dev/null +++ b/app/connection_test/connection_test.py @@ -0,0 +1,283 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import argparse +import datetime +from kombu import Connection + +import time + +import pymongo +from functools import partial + +from discover.fetchers.api.api_access import ApiAccess +from discover.fetchers.db.db_access import DbAccess +from discover.manager import Manager +from utils.constants import ConnectionTestStatus, ConnectionTestType +from utils.logging.file_logger import FileLogger +from utils.mongo_access import MongoAccess +from utils.ssh_connection import * + + +def test_openstack(config, test_request): + try: + api = ApiAccess(config) + ConnectionTest.report_success(test_request, + ConnectionTestType.OPENSTACK.value) + if api: + pass + except ValueError: + pass + + +def test_mysql(config, test_request): + db_access = DbAccess(config) + ConnectionTest.report_success(test_request, ConnectionTestType.MYSQL.value) + if db_access: + pass + + +def test_ssh_connect(config) -> bool: + ssh = SshConnection(config.get('host', ''), + config.get('user', ''), + _pwd=config.get('pwd'), + _key=config.get('key'), + _port=int(config.get('port', + SshConnection.DEFAULT_PORT))) + ret = ssh.connect() + return ret + + +def test_cli(config, test_request): + ret = test_ssh_connect(config) + ConnectionTest.set_test_result(test_request, + ConnectionTestType.CLI.value, + ret) + + +def test_amqp_connect(config): + connect_url = 'amqp://{user}:{pwd}@{host}:{port}//' \ + .format(user=config.get("user", ''), + pwd=config.get('pwd', ''), + host=config.get('host', ''), + port=int(config.get('port', 5671))) + conn = Connection(connect_url) + conn.connect() + + +def test_amqp(config, test_request): + test_amqp_connect(config) + ConnectionTest.report_success(test_request, ConnectionTestType.AMQP.value) + + +def test_monitoring(config, test_request): + # for monitoring configuration test, need to test: + # 1. SSH access + # 2. RabbitMQ access + ssh_config = { + 'host': config.get('server_ip'), + 'user': config.get('ssh_user'), + 'pwd': config.get('ssh_password'), + 'port': int(config.get('ssh_port', 0)) + } + if not test_ssh_connect(ssh_config): + return + amqp_connect_config = { + 'user': config.get('rabbitmq_user', ''), + 'pwd': config.get('rabbitmq_pass', ''), + 'host': config.get('server_ip'), + 'port': int(config.get('rabbitmq_port', 5672)), + } + test_amqp_connect(amqp_connect_config) + ConnectionTest.report_success(test_request, ConnectionTestType.AMQP.value) + + +def test_aci(config, test_request): + pass + + +TEST_HANDLERS = { + ConnectionTestType.OPENSTACK.value: test_openstack, + ConnectionTestType.MYSQL.value: test_mysql, + ConnectionTestType.CLI.value: test_cli, + ConnectionTestType.AMQP.value: test_amqp, + ConnectionTestType.ACI.value: test_aci, + ConnectionTestType.MONITORING.value: test_monitoring +} + + +class ConnectionTest(Manager): + + DEFAULTS = { + 'mongo_config': '', + 'connection_tests': 'connection_tests', + 'environments': 'environments_config', + 'interval': 1, + 'loglevel': 'WARNING' + } + + def __init__(self): + self.args = self.get_args() + super().__init__(log_directory=self.args.log_directory, + mongo_config_file=self.args.mongo_config) + self.db_client = None + self.connection_tests_collection = None + self.environments_collection = None + + @staticmethod + def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument('-m', '--mongo_config', nargs='?', type=str, + default=ConnectionTest.DEFAULTS['mongo_config'], + help='Name of config file ' + + 'with MongoDB server access details') + parser.add_argument('-c', '--connection_tests_collection', nargs='?', + type=str, + default=ConnectionTest.DEFAULTS['connection_tests'], + help='connection_tests collection to read from') + parser.add_argument('-e', '--environments_collection', nargs='?', + type=str, + default=ConnectionTest.DEFAULTS['environments'], + help='Environments collection to update ' + 'after tests') + parser.add_argument('-i', '--interval', nargs='?', type=float, + default=ConnectionTest.DEFAULTS['interval'], + help='Interval between collection polls' + '(must be more than {} seconds)' + .format(ConnectionTest.MIN_INTERVAL)) + parser.add_argument('-l', '--loglevel', nargs='?', type=str, + default=ConnectionTest.DEFAULTS['loglevel'], + help='Logging level \n(default: {})' + .format(ConnectionTest.DEFAULTS['loglevel'])) + parser.add_argument('-d', '--log_directory', nargs='?', type=str, + default=FileLogger.LOG_DIRECTORY, + help='File logger directory \n(default: {})' + .format(FileLogger.LOG_DIRECTORY)) + args = parser.parse_args() + return args + + def configure(self): + self.db_client = MongoAccess() + self.connection_tests_collection = \ + self.db_client.db[self.args.connection_tests_collection] + self.environments_collection = \ + self.db_client.db[self.args.environments_collection] + self._update_document = \ + partial(MongoAccess.update_document, + self.connection_tests_collection) + self.interval = max(self.MIN_INTERVAL, self.args.interval) + self.log.set_loglevel(self.args.loglevel) + + self.log.info('Started ConnectionTest with following configuration:\n' + 'Mongo config file path: {0.args.mongo_config}\n' + 'connection_tests collection: ' + '{0.connection_tests_collection.name}\n' + 'Polling interval: {0.interval} second(s)' + .format(self)) + + def _build_test_args(self, test_request: dict): + args = { + 'mongo_config': self.args.mongo_config + } + + def set_arg(name_from: str, name_to: str = None): + if name_to is None: + name_to = name_from + val = test_request.get(name_from) + if val: + args[name_to] = val + + set_arg('object_id', 'id') + set_arg('log_level', 'loglevel') + set_arg('environment', 'env') + set_arg('scan_only_inventory', 'inventory_only') + set_arg('scan_only_links', 'links_only') + set_arg('scan_only_cliques', 'cliques_only') + set_arg('inventory') + set_arg('clear') + set_arg('clear_all') + + return args + + def _finalize_test(self, test_request: dict): + # update the status and timestamps. + self.log.info('Request {} has been tested.' + .format(test_request['_id'])) + start_time = test_request['submit_timestamp'] + end_time = datetime.datetime.utcnow() + test_request['response_timestamp'] = end_time + test_request['response_time'] = \ + str(end_time - start_time.replace(tzinfo=None)) + test_request['status'] = ConnectionTestStatus.RESPONSE.value + self._update_document(test_request) + + @staticmethod + def set_test_result(test_request, target, result): + test_request.get('test_results', {})[target] = result + + @staticmethod + def report_success(test_request, target): + ConnectionTest.set_test_result(test_request, target, True) + + @staticmethod + def handle_test_target(target, test_request): + targets_config = test_request.get('targets_configuration', []) + try: + config = next(t for t in targets_config if t['name'] == target) + except StopIteration: + raise ValueError('failed to find {} in targets_configuration' + .format(target)) + handler = TEST_HANDLERS.get(target) + if not handler: + raise ValueError('unknown test target: {}'.format(target)) + handler(config, test_request) + + def do_test(self, test_request): + targets = [t for t in test_request.get('test_targets', [])] + test_request['test_results'] = {t: False for t in targets} + for test_target in test_request.get('test_targets', []): + self.log.info('testing connection to: {}'.format(test_target)) + try: + self.handle_test_target(test_target, test_request) + except Exception as e: + self.log.exception(e) + if 'errors' not in test_request: + test_request['errors'] = {} + test_request['errors'][test_target] = str(e) + self.log.error('Test of target {} failed (id: {}):\n{}' + .format(test_target, + test_request['_id'], + str(e))) + self._finalize_test(test_request) + self._set_env_operational(test_request['environment']) + + # if environment_config document for this specific environment exists, + # update the value of the 'operational' field to 'running' + def _set_env_operational(self, env): + self.environments_collection. \ + update_one({'name': env}, {'$set': {'operational': 'running'}}) + + def do_action(self): + while True: + # Find a pending request that is waiting the longest time + results = self.connection_tests_collection \ + .find({'status': ConnectionTestStatus.REQUEST.value, + 'submit_timestamp': {'$ne': None}}) \ + .sort('submit_timestamp', pymongo.ASCENDING) \ + .limit(1) + + # If no connection tests are pending, sleep for some time + if results.count() == 0: + time.sleep(self.interval) + else: + self.do_test(results[0]) + + +if __name__ == '__main__': + ConnectionTest().run() |