diff options
Diffstat (limited to 'app/discover/event_manager.py')
-rw-r--r-- | app/discover/event_manager.py | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/app/discover/event_manager.py b/app/discover/event_manager.py new file mode 100644 index 0000000..ce40ce4 --- /dev/null +++ b/app/discover/event_manager.py @@ -0,0 +1,265 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import argparse +import signal +import time +from multiprocessing import Process, Manager as SharedManager + +import os + +from discover.events.listeners.default_listener import DefaultListener +from discover.events.listeners.listener_base import ListenerBase +from discover.manager import Manager +from utils.constants import OperationalStatus, EnvironmentFeatures +from utils.inventory_mgr import InventoryMgr +from utils.logging.file_logger import FileLogger +from utils.mongo_access import MongoAccess + + +class EventManager(Manager): + + # After EventManager receives a SIGTERM, + # it will try to terminate all listeners. + # After this delay, a SIGKILL will be sent + # to each listener that is still alive. + SIGKILL_DELAY = 5 # in seconds + + DEFAULTS = { + "mongo_config": "", + "collection": "environments_config", + "inventory": "inventory", + "interval": 5, + "loglevel": "INFO" + } + + LISTENERS = { + 'Mirantis-6.0': DefaultListener, + 'Mirantis-7.0': DefaultListener, + 'Mirantis-8.0': DefaultListener, + 'RDO-Mitaka': DefaultListener, + 'RDO-Liberty': DefaultListener, + } + + def __init__(self): + self.args = self.get_args() + super().__init__(log_directory=self.args.log_directory, + mongo_config_file=self.args.mongo_config) + self.db_client = None + self.interval = None + self.processes = [] + + @staticmethod + def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument("-m", "--mongo_config", nargs="?", type=str, + default=EventManager.DEFAULTS["mongo_config"], + help="Name of config file with MongoDB server access details") + parser.add_argument("-c", "--collection", nargs="?", type=str, + default=EventManager.DEFAULTS["collection"], + help="Environments collection to read from " + "(default: '{}')" + .format(EventManager.DEFAULTS["collection"])) + parser.add_argument("-y", "--inventory", nargs="?", type=str, + default=EventManager.DEFAULTS["inventory"], + help="name of inventory collection " + "(default: '{}')" + .format(EventManager.DEFAULTS["inventory"])) + parser.add_argument("-i", "--interval", nargs="?", type=float, + default=EventManager.DEFAULTS["interval"], + help="Interval between collection polls " + "(must be more than {} seconds. Default: {})" + .format(EventManager.MIN_INTERVAL, + EventManager.DEFAULTS["interval"])) + parser.add_argument("-l", "--loglevel", nargs="?", type=str, + default=EventManager.DEFAULTS["loglevel"], + help="Logging level \n(default: '{}')" + .format(EventManager.DEFAULTS["loglevel"])) + parser.add_argument("-d", "--log_directory", nargs="?", type=str, + default=FileLogger.LOG_DIRECTORY, + help="File logger directory \n(default: '{}')" + .format(FileLogger.LOG_DIRECTORY)) + args = parser.parse_args() + return args + + def configure(self): + self.db_client = MongoAccess() + self.inv = InventoryMgr() + self.inv.set_collections(self.args.inventory) + self.collection = self.db_client.db[self.args.collection] + self.interval = max(self.MIN_INTERVAL, self.args.interval) + self.log.set_loglevel(self.args.loglevel) + + self.log.info("Started EventManager with following configuration:\n" + "Mongo config file path: {0}\n" + "Collection: {1}\n" + "Polling interval: {2} second(s)" + .format(self.args.mongo_config, self.collection.name, self.interval)) + + def get_listener(self, env: str): + env_config = self.inv.get_env_config(env) + return self.LISTENERS.get(env_config.get('distribution')) + + def listen_to_events(self, listener: ListenerBase, env_name: str, process_vars: dict): + listener.listen({ + 'env': env_name, + 'mongo_config': self.args.mongo_config, + 'inventory': self.args.inventory, + 'loglevel': self.args.loglevel, + 'environments_collection': self.args.collection, + 'process_vars': process_vars + }) + + def _get_alive_processes(self): + return [p for p in self.processes + if p['process'].is_alive()] + + # Get all processes that should be terminated + def _get_stuck_processes(self, stopped_processes: list): + return [p for p in self._get_alive_processes() + if p.get("name") in map(lambda p: p.get("name"), stopped_processes)] + + # Give processes time to finish and kill them if they are stuck + def _kill_stuck_processes(self, process_list: list): + if self._get_stuck_processes(process_list): + time.sleep(self.SIGKILL_DELAY) + for process in self._get_stuck_processes(process_list): + self.log.info("Killing event listener '{0}'".format(process.get("name"))) + os.kill(process.get("process").pid, signal.SIGKILL) + + def _get_operational(self, process: dict) -> OperationalStatus: + try: + return process.get("vars", {})\ + .get("operational") + except: + self.log.error("Event listener '{0}' is unreachable".format(process.get("name"))) + return OperationalStatus.STOPPED + + def _update_operational_status(self, status: OperationalStatus): + self.collection.update_many( + {"name": {"$in": [process.get("name") + for process + in self.processes + if self._get_operational(process) == status]}}, + {"$set": {"operational": status.value}} + ) + + def update_operational_statuses(self): + self._update_operational_status(OperationalStatus.RUNNING) + self._update_operational_status(OperationalStatus.ERROR) + self._update_operational_status(OperationalStatus.STOPPED) + + def cleanup_processes(self): + # Query for envs that are no longer eligible for listening + # (scanned == false and/or listen == false) + dropped_envs = [env['name'] + for env + in self.collection + .find(filter={'$or': [{'scanned': False}, + {'listen': False}]}, + projection=['name'])] + + live_processes = [] + stopped_processes = [] + # Drop already terminated processes + # and for all others perform filtering + for process in self._get_alive_processes(): + # If env no longer qualifies for listening, + # stop the listener. + # Otherwise, keep the process + if process['name'] in dropped_envs: + self.log.info("Stopping event listener '{0}'".format(process.get("name"))) + process['process'].terminate() + stopped_processes.append(process) + else: + live_processes.append(process) + + self._kill_stuck_processes(stopped_processes) + + # Update all 'operational' statuses + # for processes stopped on the previous step + self.collection.update_many( + {"name": {"$in": [process.get("name") + for process + in stopped_processes]}}, + {"$set": {"operational": OperationalStatus.STOPPED.value}} + ) + + # Keep the living processes + self.processes = live_processes + + def do_action(self): + try: + while True: + # Update "operational" field in db before removing dead processes + # so that we keep last statuses of env listeners before they were terminated + self.update_operational_statuses() + + # Perform a cleanup that filters out all processes + # that are no longer eligible for listening + self.cleanup_processes() + + envs = self.collection.find({'scanned': True, 'listen': True}) + + # Iterate over environments that don't have an event listener attached + for env in filter(lambda e: e['name'] not in + map(lambda process: process["name"], self.processes), + envs): + env_name = env['name'] + + if not self.inv.is_feature_supported(env_name, EnvironmentFeatures.LISTENING): + self.log.error("Listening is not supported for env '{}'".format(env_name)) + self.collection.update({"name": env_name}, + {"$set": {"operational": OperationalStatus.ERROR.value}}) + continue + + listener = self.get_listener(env_name) + if not listener: + self.log.error("No listener is defined for env '{}'".format(env_name)) + self.collection.update({"name": env_name}, + {"$set": {"operational": OperationalStatus.ERROR.value}}) + continue + + # A dict that is shared between event manager and newly created env listener + process_vars = SharedManager().dict() + p = Process(target=self.listen_to_events, + args=(listener, env_name, process_vars,), + name=env_name) + self.processes.append({"process": p, "name": env_name, "vars": process_vars}) + self.log.info("Starting event listener '{0}'".format(env_name)) + p.start() + + # Make sure statuses are up-to-date before event manager goes to sleep + self.update_operational_statuses() + time.sleep(self.interval) + finally: + # Fetch operational statuses before terminating listeners. + # Shared variables won't be available after termination. + stopping_processes = [process.get("name") + for process + in self.processes + if self._get_operational(process) != OperationalStatus.ERROR] + self._update_operational_status(OperationalStatus.ERROR) + + # Gracefully stop processes + for process in self._get_alive_processes(): + self.log.info("Stopping event listener '{0}'".format(process.get("name"))) + process.get("process").terminate() + + # Kill all remaining processes + self._kill_stuck_processes(self.processes) + + # Updating operational statuses for stopped processes + self.collection.update_many( + {"name": {"$in": stopping_processes}}, + {"$set": {"operational": OperationalStatus.STOPPED.value}} + ) + +if __name__ == "__main__": + EventManager().run() |