diff options
Diffstat (limited to 'moon_engine/moon_engine/plugins/pyorchestrator.py')
-rw-r--r-- | moon_engine/moon_engine/plugins/pyorchestrator.py | 372 |
1 files changed, 372 insertions, 0 deletions
diff --git a/moon_engine/moon_engine/plugins/pyorchestrator.py b/moon_engine/moon_engine/plugins/pyorchestrator.py new file mode 100644 index 00000000..bf2d70f9 --- /dev/null +++ b/moon_engine/moon_engine/plugins/pyorchestrator.py @@ -0,0 +1,372 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import logging +import os +import time +import requests +import subprocess # nosec +from uuid import uuid4 +import yaml +from moon_engine.orchestration_driver import PipelineDriver +from moon_engine.api import configuration +from moon_engine.api.configuration import get_configuration +from moon_engine import get_api_key +from moon_utilities.auth_functions import xor_decode +from moon_utilities import exceptions +from datetime import datetime + +LOGGER = logging.getLogger("moon.engine.orchestrator.driver.pyorchestrator") + +PLUGIN_TYPE = "orchestration" +pipelines = {} +ports = [] + + +def init(): + """Initialize the plugin by initializing wrappers + + :return: nothing + """ + + # FIXME: get pipelines from Manager + pass + + +def create_gunicorn_config(host, port, server_type, uuid): + """Create a Gunicorn config file in a temporary directory + + :return: filename + """ + config_dir = get_configuration("orchestration").get("config_dir", "/tmp") # nosec + # (/tmp is a fallback solution) + _log_config = get_configuration("logging") + _log_config["handlers"]["file"]["filename"] = os.path.join(config_dir, + "moon_{}.log".format(uuid)) + __manager_url = get_configuration("management")["url"] + filename = os.path.join(config_dir, "gunicorn_{}.cfg".format(uuid4().hex)) + fd = open(filename, "w") + fd.write("""bind = "{host}:{port}" +workers = {workers} +moon = "{moon_filename}" + """.format( + host=host, + port=port, + workers=1, + moon_filename=os.path.join(config_dir, "moon_{}.yaml".format(uuid)), + )) + fd.close() + return filename + + +def create_moon_config(uuid, manager_cnx=True, policy_file=None): + """Create a Moon config file in a temporary directory + + :return: filename + """ + LOGGER.info(f"create_moon_config({uuid})") + config_dir = get_configuration("orchestration").get("config_dir", "/tmp") # nosec + _log_config = get_configuration("logging") + _log_config["handlers"]["file"]["filename"] = os.path.join(config_dir, + "moon_{}.log".format(uuid)) + if manager_cnx: + __manager_url = get_configuration("management")["url"] + api_token = get_api_key(get_configuration("management")["url"], + get_configuration("management")["user"], + get_configuration("management")["password"]) + else: + __manager_url = "" + api_token = "" + config_dir = get_configuration("orchestration").get("config_dir", "/tmp") # nosec + # (/tmp is a fallback solution) + filename = os.path.join(config_dir, "moon_{}.yaml".format(uuid)) + config_dict = { + "type": "pipeline", + "uuid": uuid, + "management": { + "url": __manager_url, + "token_file": os.path.join(config_dir, "db_{}.json".format(uuid)) + }, + "incremental_updates": True, + "api_token": api_token, + "data": "", + "logging": _log_config, + "authorization": get_configuration("authorization", + {"driver": "moon_engine.plugins.authz"}), + "plugins": get_configuration("plugins"), + "debug": get_configuration(key='debug', default=False) + } + if policy_file: + config_dict['data'] = policy_file + if not manager_cnx: + config_dict['uuid'] = "" + config_dict['incremental_updates'] = False + LOGGER.info("Writing config file to {}".format(filename)) + yaml.dump(config_dict, open(filename, "w"), default_flow_style=False) + return filename + + +def kill_server(uuid): + """Kill the server given its UUID + + :param uuid: UUID of the server + :return: nothing + """ + LOGGER.info("pipelines={}".format(pipelines)) + if uuid in pipelines: + LOGGER.info("pipeline={}".format(pipelines[uuid])) + # Fixme: if the server has been restarted, the process attribute is empty + LOGGER.info("Killing server {} after {} of uptime".format( + uuid, + str(datetime.now() - datetime.fromtimestamp(pipelines[uuid]["starttime"])) + )) + with open(pipelines[uuid]["process"], 'r') as pid_file: + try: + pid = int(pid_file.read()) + except ValueError: + LOGGER.error("The pid found in {} is not valid".format(pipelines[uuid]["process"])) + return + + os.kill(pid, 15) + del_server_port(pipelines[uuid]["port"]) + pipelines.pop(uuid) + else: + LOGGER.warning("Cannot find UUID {} in wrappers or interfaces".format(uuid)) + + +def get_ports_range(): + ports_range = get_configuration("orchestration")["port"] + return int(ports_range.split(".")[0]), int(ports_range.split(".")[-1]) + + +def get_next_port(server_host="127.0.0.1"): + port_min, port_max = get_ports_range() + _port = port_min + _ports = [] + for _pipeline in pipelines: + _ports.append(pipelines[_pipeline]["port"]) + _ports.sort() + if not _ports: + _port = port_min + elif _ports[-1]+1 > port_max: + raise Exception( + "Cannot add a new slave because " + "the port range is bounded to {}".format(port_max)) + while True: + if _port in _ports: + _port += 1 + continue + try: + requests.get("http://{}:{}/status".format(server_host, _port), timeout=1) + except requests.exceptions.ConnectionError: + break + if _port > port_max: + raise Exception( + "Cannot add a new pipeline because " + "the port range is bounded to {}".format(port_max)) + _port += 1 + return _port + + +def add_server_port(port): + ports.append(port) + + +def del_server_port(port): + try: + ports.remove(port) + except ValueError: + LOGGER.warning("port {} is not in the known port".format(port)) + + +def get_server_url(uuid=None): + if not uuid: + return + url = "" + try: + if uuid in pipelines: + url = "http://{}:{}".format(pipelines[uuid]["server_ip"], + pipelines[uuid]["port"]) + if url: + response = requests.get(url + "/status") + if response.status_code == 200: + return url + except TimeoutError: + LOGGER.warning("A timeout occurred when connecting to {}".format(url)) + # if port has not be found in local data, try to get information from remote servers + port_min, port_max = get_ports_range() + host = "127.0.0.1" + for _port in range(port_min, port_max): + try: + req = requests.get("http://{}:{}/status".format(host, _port), timeout=1) + data = req.json() + if "status" in data and data["status"]["uuid"] == uuid: + return "http://{}:{}".format(host, _port) + except Exception as e: + LOGGER.warning("Error getting information from {} ({})".format(host, str(e))) + return + + +def start_new_server(uuid): + """Start a new server in a new process + + :param uuid: UUID of the server + :return: nothing + """ + _url = get_server_url(uuid) + config_dir = get_configuration("orchestration").get("config_dir", "/tmp") # nosec + server_ip = "127.0.0.1" + config_filename = os.path.join(config_dir, "moon_{}.yaml".format(uuid)) + LOGGER.info("Starting server {} {}".format(_url, uuid)) + if _url: + _port = int(_url.split(":")[-1]) + add_server_port(_port) + config = yaml.safe_load(open(config_filename)) + log_file = config["logging"]["handlers"]["file"]["filename"] + _out = { + "pipeline_id": uuid, + "starttime": time.time(), + "port": _port, + "host": server_ip, + "server_ip": server_ip, + "log_file": log_file + } + else: + _port = get_next_port() + create_moon_config(uuid=uuid) + pid_file = os.path.join(config_dir, uuid + ".pid") + # NOTE: we have actually no solution to get the actual IP address + # so we need to put 0.0.0.0 in the host address + gunicorn_config = create_gunicorn_config( + host="0.0.0.0", # nosec + port=_port, + server_type="pipeline", + uuid=uuid) + command = ["gunicorn", "moon_engine.server:__hug_wsgi__", "--threads", "10", + "-p", pid_file, "-D", "-c", gunicorn_config] + LOGGER.info("Executing {}".format(" ".join(command))) + subprocess.Popen(command, stdout=subprocess.PIPE, close_fds=True) # nosec + # (command attribute is safe) + _out = { + "pipeline_id": uuid, + "starttime": time.time(), + "port": _port, + "host": server_ip, + "server_ip": server_ip, + "process": pid_file, + } + time.sleep(1) + config = yaml.safe_load(open(config_filename)) + log_file = config["logging"]["handlers"]["file"]["filename"] + _out["log"] = log_file + for cpt in range(10): + try: + f_sock = open(log_file) + except FileNotFoundError: + time.sleep(1) + else: + break + else: + LOGGER.error("Cannot find log file ({})".format(log_file)) + return + p_sock = 0 + LOGGER.info("Process running") + while True: + f_sock.seek(p_sock) + latest_data = f_sock.read() + p_sock = f_sock.tell() + if latest_data and "APIKEY" in latest_data: + _index_start = latest_data.index("APIKEY=") + len("APIKEY=") + _index_stop = latest_data.index("\n", _index_start) + key = latest_data[_index_start:_index_stop].strip() + # api_key = get_api_key_for_user("admin") + api_key = configuration.get_configuration('api_token') + try: + engine_api_key = xor_decode(key, api_key) + except exceptions.DecryptError: + engine_api_key = False + _out["api_key"] = engine_api_key + break + time.sleep(1) + + return _out + + +class PipelineConnector(PipelineDriver): + + def __init__(self, driver_name, engine_name): + self.driver_name = driver_name + self.engine_name = engine_name + + def update_pipeline(self, pipeline_id, data): + _url = get_server_url(pipeline_id) + if not _url: + self.add_pipeline(pipeline_id, data) + if "security_pipeline" in data: + req = requests.post("{}/update".format(_url), json={"attributes": "pdp"}) + if req.status_code == 206: + LOGGER.warning("No pipeline available...") + elif req.status_code != 202: + LOGGER.warning("Error sending upgrade command to pipeline ({})".format(req)) + if "vim_project_id" in data and data['vim_project_id']: + LOGGER.warning("Cannot update vim_project_id for the moment") + # FIXME: manage vim_project_id + + def delete_pipeline(self, pipeline_id): + LOGGER.info("Deleting pipeline {}".format(pipeline_id)) + kill_server(pipeline_id) + + def add_pipeline(self, pipeline_id=None, data=None): + LOGGER.debug("Adding POD in Engine {} {}".format(pipeline_id, data)) + if not pipeline_id: + pipeline_id = uuid4().hex + if not data: + content = dict() + else: + content = dict(data) + content.update(start_new_server(pipeline_id)) + pipelines[pipeline_id] = content + return pipelines[pipeline_id] + + def get_pipelines(self, pipeline_id=None): + results = {} + for interface in pipelines: + results[interface] = { + "starttime": pipelines[interface]["starttime"], + "port": pipelines[interface]["port"], + "server_ip": pipelines[interface]["server_ip"], + "status": "down", + "log": pipelines[interface]["log"] + } + try: + req = requests.get("http://{}:{}/status".format( + pipelines[interface]["server_ip"], + pipelines[interface]["port"] + )) + if req.status_code == 200: + results[interface]["status"] = "up" + except TimeoutError: + LOGGER.warning("Timeout connecting {} on port {}".format( + pipelines[interface]["server_ip"], + pipelines[interface]["port"] + )) + return results + + def get_pipeline_api_key(self, pipeline_id): + return pipelines.get(pipeline_id, {}).get('api_key', "") + + +class Connector(PipelineConnector): + pass + + +init() |