aboutsummaryrefslogtreecommitdiffstats
path: root/moonv4/moon_orchestrator
diff options
context:
space:
mode:
authorThomas Duval <thomas.duval@orange.com>2017-10-29 21:02:25 +0100
committerThomas Duval <thomas.duval@orange.com>2017-10-29 21:02:25 +0100
commit0e46fc8f2c6a42c144f05c4e2a416fd2fb8339e1 (patch)
tree87e26e65513f417f578cd56a08763897f3af0e36 /moonv4/moon_orchestrator
parent1bee8fef0c764cda4e547d11bd3567e5ac8820a7 (diff)
Update Orchestrator to use Kubernetes
Change-Id: Ib1f1fb54544e4ac985ef2063ff8114d804e83d0e
Diffstat (limited to 'moonv4/moon_orchestrator')
-rw-r--r--moonv4/moon_orchestrator/Dockerfile3
-rw-r--r--moonv4/moon_orchestrator/bootstrap.py203
-rw-r--r--moonv4/moon_orchestrator/moon_orchestrator/api/containers.py8
-rw-r--r--moonv4/moon_orchestrator/moon_orchestrator/api/generic.py124
-rw-r--r--moonv4/moon_orchestrator/moon_orchestrator/drivers.py189
-rw-r--r--moonv4/moon_orchestrator/moon_orchestrator/http_server.py262
-rw-r--r--moonv4/moon_orchestrator/moon_orchestrator/messenger.py63
-rw-r--r--moonv4/moon_orchestrator/moon_orchestrator/server.py176
-rw-r--r--moonv4/moon_orchestrator/requirements.txt15
9 files changed, 592 insertions, 451 deletions
diff --git a/moonv4/moon_orchestrator/Dockerfile b/moonv4/moon_orchestrator/Dockerfile
index b68c130f..70eef9a8 100644
--- a/moonv4/moon_orchestrator/Dockerfile
+++ b/moonv4/moon_orchestrator/Dockerfile
@@ -9,6 +9,7 @@ RUN pip3 install pip --upgrade
ADD . /root
WORKDIR /root/
RUN pip3 install -r requirements.txt --upgrade
+RUN pip3 install /root/dist/* --upgrade
RUN pip3 install . --upgrade
-CMD ["python3", "bootstrap.py"] \ No newline at end of file
+CMD ["python3", "-m", "moon_orchestrator"] \ No newline at end of file
diff --git a/moonv4/moon_orchestrator/bootstrap.py b/moonv4/moon_orchestrator/bootstrap.py
deleted file mode 100644
index dab78f25..00000000
--- a/moonv4/moon_orchestrator/bootstrap.py
+++ /dev/null
@@ -1,203 +0,0 @@
-import sys
-import time
-import requests
-import yaml
-import logging
-import json
-import base64
-import mysql.connector
-import re
-import subprocess
-import pika
-import pika.credentials
-import pika.exceptions
-
-logging.basicConfig(level=logging.INFO)
-log = logging.getLogger("moon.bootstrap")
-requests_log = logging.getLogger("requests.packages.urllib3")
-requests_log.setLevel(logging.WARNING)
-requests_log.propagate = True
-pika_log = logging.getLogger("pika")
-pika_log.setLevel(logging.ERROR)
-pika_log.propagate = True
-
-CONSUL_HOST = sys.argv[1] if len(sys.argv) > 1 else "consul"
-CONSUL_PORT = sys.argv[2] if len(sys.argv) > 2 else 8500
-HEADERS = {"content-type": "application/json"}
-
-
-def search_config_file():
- data_config = None
- for _file in (
- "moon.conf",
- "conf/moon.conf",
- "../moon.conf",
- "/etc/moon/moon.conf",
- ):
- try:
- data_config = yaml.safe_load(open(_file))
- except FileNotFoundError:
- data_config = None
- continue
- else:
- break
- if not data_config:
- raise Exception("Configuration file not found...")
- return data_config
-
-
-def put(key, value):
- url = "http://{host}:{port}/v1/kv/{key}".format(host=CONSUL_HOST, port=CONSUL_PORT, key=key)
- log.info(url)
- req = requests.put(
- url,
- headers=HEADERS,
- json=value
- )
- if req.status_code != 200:
- raise Exception("Error connecting to Consul ({}, {})".format(req.status_code, req.text))
-
-
-def get(key):
- url = "http://{host}:{port}/v1/kv/{key}".format(host=CONSUL_HOST, port=CONSUL_PORT, key=key)
- req = requests.get(url)
- data = req.json()
- for item in data:
- log.info("{} {} -> {}".format(
- req.status_code,
- item["Key"],
- json.loads(base64.b64decode(item["Value"]).decode("utf-8"))
- ))
- yield json.loads(base64.b64decode(item["Value"]).decode("utf-8"))
-
-
-def populate_consul(data_config):
- while True:
- try:
- req = requests.get("http://{}:{}/ui".format(CONSUL_HOST, CONSUL_PORT))
- except requests.exceptions.ConnectionError:
- log.info("Waiting for Consul ({}:{})".format(CONSUL_HOST, CONSUL_PORT))
- time.sleep(1)
- continue
- else:
- break
- # if req.status_code in (302, 200):
- # break
- # log.info("Waiting for Consul ({}:{})".format(CONSUL_HOST, CONSUL_PORT))
- # time.sleep(1)
- log.info("Consul is up")
-
- req = requests.get("http://{}:{}/v1/kv/database".format(CONSUL_HOST, CONSUL_PORT))
- if req.status_code == 200:
- log.info("Consul is already populated")
- return
-
- put("database", data_config["database"])
- put("messenger", data_config["messenger"])
- put("slave", data_config["slave"])
- put("docker", data_config["docker"])
- put("logging", data_config["logging"])
- put("components_port_start", data_config["components"]["port_start"])
-
- for _key, _value in data_config["components"].items():
- if type(_value) is dict:
- put("components/{}".format(_key), data_config["components"][_key])
-
- for _key, _value in data_config["plugins"].items():
- put("plugins/{}".format(_key), data_config["plugins"][_key])
-
- for _key, _value in data_config["openstack"].items():
- put("openstack/{}".format(_key), data_config["openstack"][_key])
-
-
-def wait_for_database():
- log.info(get("database"))
- for database in get("database"):
- database_url = database['url']
- match = re.search("(?P<proto>^[\\w+]+):\/\/(?P<user>\\w+):(?P<password>.+)@(?P<host>\\w+):*(?P<port>\\d*)",
- database_url)
- config = match.groupdict()
- while True:
- try:
- conn = mysql.connector.connect(
- host=config["host"],
- user=config["user"],
- password=config["password"],
- database="moon"
- )
- conn.close()
- except mysql.connector.errors.InterfaceError:
- log.info("Waiting for Database ({})".format(config["host"]))
- time.sleep(1)
- continue
- else:
- log.info("Database i up, populating it...")
- output = subprocess.run(["moon_db_manager", "upgrade"])
- if output.returncode != 0:
- raise Exception("Error populating the database!")
- break
-
-
-def wait_for_message_queue():
- for messenger in get("messenger"):
- url = messenger['url']
- match = re.search("(?P<proto>^[\\w+]+):\/\/(?P<user>\\w+):(?P<password>.+)@(?P<host>\\w+):?(?P<port>\\d*)/?(?P<virtual_host>\\w+)",
- url)
- config = match.groupdict()
- while True:
- try:
- connection = pika.BlockingConnection(
- pika.ConnectionParameters(
- host=config['host'],
- port=int(config['port']),
- virtual_host=config['virtual_host'],
- credentials=pika.credentials.PlainCredentials(
- config['user'],
- config['password']
- )
- )
- )
- connection.close()
- except (
- pika.exceptions.ProbableAuthenticationError,
- pika.exceptions.ConnectionClosed,
- ConnectionResetError,
- pika.exceptions.IncompatibleProtocolError
- ):
- log.info("Waiting for MessageQueue ({})".format(config["host"]))
- time.sleep(1)
- continue
- else:
- log.info("MessageQueue is up")
- break
-
-
-def wait_for_keystone():
- # TODO: Keystone answers request too quickly
- # even if it is not fully loaded
- # we must test if a token retrieval is possible or not
- # to see if Keystone is truly up and running
- for config in get("openstack/keystone"):
- while True:
- try:
- req = requests.get(config["url"])
- except requests.exceptions.ConnectionError:
- log.info("Waiting for Keystone ({})".format(config["url"]))
- time.sleep(1)
- continue
- else:
- log.info("Keystone is up")
- break
-
-
-def main():
- data_config = search_config_file()
- populate_consul(data_config)
- wait_for_database()
- wait_for_message_queue()
- wait_for_keystone()
- import moon_orchestrator.server
- moon_orchestrator.server.main()
-
-main()
-
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/api/containers.py b/moonv4/moon_orchestrator/moon_orchestrator/api/containers.py
index 23acea5f..cb365414 100644
--- a/moonv4/moon_orchestrator/moon_orchestrator/api/containers.py
+++ b/moonv4/moon_orchestrator/moon_orchestrator/api/containers.py
@@ -8,7 +8,7 @@ from oslo_config import cfg
from oslo_log import log as logging
# from moon_db.core import IntraExtensionRootManager
# from moon_db.core import ConfigurationManager
-from moon_utilities.security_functions import call
+# from moon_utilities.security_functions import call
LOG = logging.getLogger("moon.orchestrator.api.containers")
CONF = cfg.CONF
@@ -24,9 +24,9 @@ class Containers(object):
def __init__(self, docker_manager):
self.docker_manager = docker_manager
self.components = dict()
- for pdp_key, pdp_value in call("moon_manager", method="get_pdp",
- ctx={"user_id": "admin", "id": None})["pdps"].items():
- self.add_container(ctx={"id": pdp_key, "pipeline": pdp_value["security_pipeline"]})
+ # for pdp_key, pdp_value in call("moon_manager", method="get_pdp",
+ # ctx={"user_id": "admin", "id": None})["pdps"].items():
+ # self.add_container(ctx={"id": pdp_key, "pipeline": pdp_value["security_pipeline"]})
def get_container(self, ctx, args=None):
"""Get containers linked to an intra-extension
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/api/generic.py b/moonv4/moon_orchestrator/moon_orchestrator/api/generic.py
index cadd98d3..fd43b1c5 100644
--- a/moonv4/moon_orchestrator/moon_orchestrator/api/generic.py
+++ b/moonv4/moon_orchestrator/moon_orchestrator/api/generic.py
@@ -2,28 +2,130 @@
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+"""
+Those API are helping API used to manage the Moon platform.
+"""
+from flask_restful import Resource, request
+import logging
+import moon_orchestrator.api
+from moon_utilities.security_functions import check_auth
-class Status(object):
+__version__ = "0.1.0"
+
+LOG = logging.getLogger("moon.orchestrator.api." + __name__)
+
+
+class Status(Resource):
"""
- Retrieve the current status of all components.
+ Endpoint for status requests
"""
- __version__ = "0.1.0"
+ __urls__ = ("/status", "/status/", "/status/<string:component_id>")
- def get_status(self, ctx, args):
- """Retrieve the current status of all components."""
- return {"status": "Running"}
+ def get(self, component_id=None):
+ """Retrieve status of all components
+ :return: {
+ "orchestrator": {
+ "status": "Running"
+ },
+ "security_router": {
+ "status": "Running"
+ }
+ }
+ """
+ raise NotImplemented
-class Logs(object):
+
+class Logs(Resource):
"""
- Retrieve the current status of all components.
+ Endpoint for logs requests
"""
- __version__ = "0.1.0"
+ __urls__ = ("/logs", "/logs/", "/logs/<string:component_id>")
+
+ def get(self, component_id=None):
+ """Get logs from the Moon platform
+
+ :param component_id: the ID of the component your are looking for (optional)
+ :return: [
+ "2015-04-15-13:45:20
+ "2015-04-15-13:45:21
+ "2015-04-15-13:45:22
+ "2015-04-15-13:45:23
+ ]
+ """
+ filter_str = request.args.get('filter', '')
+ from_str = request.args.get('from', '')
+ to_str = request.args.get('to', '')
+ event_number = request.args.get('event_number', '')
+ try:
+ event_number = int(event_number)
+ except ValueError:
+ event_number = None
+ args = dict()
+ args["filter"] = filter_str
+ args["from"] = from_str
+ args["to"] = to_str
+ args["event_number"] = event_number
+
+ raise NotImplemented
+
+
+class API(Resource):
+ """
+ Endpoint for API requests
+ """
- def get_logs(self, ctx, args):
- return {"error": "NotImplemented", "ctx": ctx, "args": args}
+ __urls__ = (
+ "/api",
+ "/api/",
+ "/api/<string:group_id>",
+ "/api/<string:group_id>/",
+ "/api/<string:group_id>/<string:endpoint_id>")
+ @check_auth
+ def get(self, group_id="", endpoint_id="", user_id=""):
+ """Retrieve all API endpoints or a specific endpoint if endpoint_id is given
+ :param group_id: the name of one existing group (ie generic, ...)
+ :param endpoint_id: the name of one existing component (ie Logs, Status, ...)
+ :return: {
+ "group_name": {
+ "endpoint_name": {
+ "description": "a description",
+ "methods": {
+ "get": "description of the HTTP method"
+ },
+ "urls": ('/api', '/api/', '/api/<string:endpoint_id>')
+ }
+ }
+ """
+ __methods = ("get", "post", "put", "delete", "options", "patch")
+ api_list = filter(lambda x: "__" not in x, dir(moon_orchestrator.api))
+ api_desc = dict()
+ for api_name in api_list:
+ api_desc[api_name] = {}
+ group_api_obj = eval("moon_interface.api.{}".format(api_name))
+ api_desc[api_name]["description"] = group_api_obj.__doc__
+ if "__version__" in dir(group_api_obj):
+ api_desc[api_name]["version"] = group_api_obj.__version__
+ object_list = list(filter(lambda x: "__" not in x, dir(group_api_obj)))
+ for obj in map(lambda x: eval("moon_interface.api.{}.{}".format(api_name, x)), object_list):
+ if "__urls__" in dir(obj):
+ api_desc[api_name][obj.__name__] = dict()
+ api_desc[api_name][obj.__name__]["urls"] = obj.__urls__
+ api_desc[api_name][obj.__name__]["methods"] = dict()
+ for _method in filter(lambda x: x in __methods, dir(obj)):
+ docstring = eval("moon_interface.api.{}.{}.{}.__doc__".format(api_name, obj.__name__, _method))
+ api_desc[api_name][obj.__name__]["methods"][_method] = docstring
+ api_desc[api_name][obj.__name__]["description"] = str(obj.__doc__)
+ if group_id in api_desc:
+ if endpoint_id in api_desc[group_id]:
+ return {group_id: {endpoint_id: api_desc[group_id][endpoint_id]}}
+ elif len(endpoint_id) > 0:
+ LOG.error("Unknown endpoint_id {}".format(endpoint_id))
+ return {"error": "Unknown endpoint_id {}".format(endpoint_id)}
+ return {group_id: api_desc[group_id]}
+ return api_desc
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/drivers.py b/moonv4/moon_orchestrator/moon_orchestrator/drivers.py
new file mode 100644
index 00000000..970914a2
--- /dev/null
+++ b/moonv4/moon_orchestrator/moon_orchestrator/drivers.py
@@ -0,0 +1,189 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from kubernetes import client, config
+import logging
+import urllib3.exceptions
+import time
+from moon_utilities import configuration
+
+logger = logging.getLogger("moon.orchestrator.drivers")
+
+
+def get_driver():
+ try:
+ driver = K8S()
+ except urllib3.exceptions.MaxRetryError as e:
+ logger.exception(e)
+ return Docker()
+ else:
+ return K8S()
+
+
+class Driver:
+
+ def __init__(self):
+ self.cache = {}
+ # example of cache:
+ # {
+ # "uuid_of_pod": {
+ # "ip": "",
+ # "hostname": "",
+ # "port": 30001,
+ # "pdp": "",
+ # "keystone_project_id": "",
+ # "plugin_name": "",
+ # "namespace": ""
+ # }
+ # }
+
+ def get_pods(self, namespace=None):
+ raise NotImplementedError
+
+ def load_pod(self, data, api_client=None, ext_client=None):
+ raise NotImplementedError
+
+ def delete_pod(self, uuid=None, name=None):
+ raise NotImplementedError
+
+ def get_slaves(self):
+ raise NotImplementedError
+
+
+class K8S(Driver):
+
+ def __init__(self):
+ super(K8S, self).__init__()
+ config.load_kube_config()
+ self.client = client.CoreV1Api()
+
+ def get_pods(self, name=None):
+ # pods = self.client.list_pod_for_all_namespaces(watch=False)
+ # if not namespace:
+ # return pods
+ # # TODO: get pods with specific namespace
+ # for pod in pods:
+ # logger.info("%s\t%s\t%s" % (pod.status.pod_ip,
+ # pod.metadata.namespace,
+ # pod.metadata.name))
+ # return pods
+ if name:
+ pods = self.client.list_pod_for_all_namespaces(watch=False)
+ for pod in pods:
+ if pod.metadata.name == name:
+ return pod
+ else:
+ return None
+ return self.cache
+
+ def __create_pod(self, client, data):
+ pod_manifest = {
+ 'apiVersion': 'extensions/v1beta1',
+ 'kind': 'Deployment',
+ 'metadata': {
+ 'name': data[0].get('name')
+ },
+ 'spec': {
+ 'replicas': 1,
+ 'template': {
+ 'metadata': {'labels': {'app': data[0].get('name')}},
+ # 'hostname': data.get('name'),
+ 'spec': {
+ 'containers': []
+ }
+ },
+ }
+ }
+ for _data in data:
+ pod_manifest['spec']['template']['spec']['containers'].append(
+ {
+ 'image': _data.get('container', "busybox"),
+ 'name': _data.get('name'),
+ 'ports': [
+ {"containerPort": _data.get('port', 80)},
+ ],
+ 'env': [
+ {'name': "UUID", "value": _data.get('name', "None")},
+ {'name': "TYPE", "value": _data.get('genre', "None")},
+ {'name': "PORT", "value": str(_data.get('port', 80))},
+ {'name': "PDP_ID", "value": _data.get('pdp_id', "None")},
+ {'name': "META_RULE_ID", "value": "None"},
+ {'name': "KEYSTONE_PROJECT_ID",
+ "value": _data.get('keystone_project_id', "None")},
+ ]
+ }
+ )
+ resp = client.create_namespaced_deployment(body=pod_manifest,
+ namespace='moon')
+ logger.info("Pod {} created!".format(data[0].get('name')))
+ return resp
+
+ def __create_service(self, client, data, expose=False):
+ service_manifest = {
+ 'apiVersion': 'v1',
+ 'kind': 'Service',
+ 'metadata': {
+ 'name': data.get('name'),
+ 'namespace': 'moon'
+ },
+ 'spec': {
+ 'ports': [{
+ 'port': data.get('port', 80),
+ 'targetPort': data.get('port', 80)
+ }],
+ 'selector': {
+ 'app': data.get('name')
+ },
+ 'type': 'NodePort',
+ 'endpoints': [{
+ 'port': data.get('port', 80),
+ 'protocol': 'TCP',
+ }],
+ }
+ }
+ if expose:
+ service_manifest['spec']['ports'][0]['nodePort'] = \
+ configuration.increment_port()
+ service_manifest['spec']['type'] = "NodePort"
+ resp = client.create_namespaced_service(namespace="moon",
+ body=service_manifest)
+ logger.info("Service {} created!".format(data.get('name')))
+ return resp
+
+ def load_pod(self, data, api_client=None, ext_client=None, expose=False):
+ _client = api_client if api_client else self.client
+ logger.info("Creating pod/service {}".format(data[0].get('name')))
+ logger.info("Creating pod/service {}".format(data))
+ pod = self.__create_pod(client=ext_client, data=data)
+ service = self.__create_service(client=_client, data=data[0],
+ expose=expose)
+ # logger.info("data={}".format(data))
+ # logger.info("service={}".format(service))
+ self.cache[pod.metadata.uid] = data
+ # {
+ # "ip": "",
+ # "hostname": pod.metadata.name,
+ # "port": service.spec.ports[0].node_port,
+ # "pdp": "",
+ # "keystone_project_id": data[0].get('keystone_project_id'),
+ # "plugin_names": [d.get('genre') for d in data],
+ # "namespace": "moon"
+ # }
+
+ def delete_pod(self, uuid=None, name=None):
+ logger.info("Deleting pod {}".format(uuid))
+
+ def get_slaves(self):
+ contexts, active_context = config.list_kube_config_contexts()
+ return contexts, active_context
+
+
+class Docker(Driver):
+
+ def load_pod(self, data, api_client=None, ext_client=None):
+ logger.info("Creating pod {}".format(data[0].get('name')))
+
+ def delete_pod(self, uuid=None, name=None):
+ logger.info("Deleting pod {}".format(uuid))
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/http_server.py b/moonv4/moon_orchestrator/moon_orchestrator/http_server.py
new file mode 100644
index 00000000..a0738f4d
--- /dev/null
+++ b/moonv4/moon_orchestrator/moon_orchestrator/http_server.py
@@ -0,0 +1,262 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from flask import Flask, jsonify
+from flask_cors import CORS, cross_origin
+from flask_restful import Resource, Api
+import logging
+from kubernetes import client, config
+import random
+import requests
+from moon_orchestrator import __version__
+from moon_orchestrator.api.pods import Pods
+from moon_orchestrator.api.generic import Logs, Status
+from moon_utilities import configuration, exceptions
+from moon_utilities.misc import get_random_name
+from moon_orchestrator.drivers import get_driver
+
+logger = logging.getLogger("moon.orchestrator.http")
+
+
+class Server:
+ """Base class for HTTP server"""
+
+ def __init__(self, host="localhost", port=80, api=None, **kwargs):
+ """Run a server
+
+ :param host: hostname of the server
+ :param port: port for the running server
+ :param kwargs: optional parameters
+ :return: a running server
+ """
+ self._host = host
+ self._port = port
+ self._api = api
+ self._extra = kwargs
+
+ @property
+ def host(self):
+ return self._host
+
+ @host.setter
+ def host(self, name):
+ self._host = name
+
+ @host.deleter
+ def host(self):
+ self._host = ""
+
+ @property
+ def port(self):
+ return self._port
+
+ @port.setter
+ def port(self, number):
+ self._port = number
+
+ @port.deleter
+ def port(self):
+ self._port = 80
+
+ def run(self):
+ raise NotImplementedError()
+
+__API__ = (
+ Status, Logs
+ )
+
+
+class Root(Resource):
+ """
+ The root of the web service
+ """
+ __urls__ = ("/", )
+ __methods = ("get", "post", "put", "delete", "options")
+
+ def get(self):
+ tree = {"/": {"methods": ("get",), "description": "List all methods for that service."}}
+ for item in __API__:
+ tree[item.__name__] = {"urls": item.__urls__}
+ _methods = []
+ for _method in self.__methods:
+ if _method in dir(item):
+ _methods.append(_method)
+ tree[item.__name__]["methods"] = _methods
+ tree[item.__name__]["description"] = item.__doc__.strip()
+ return {
+ "version": __version__,
+ "tree": tree
+ }
+
+
+class HTTPServer(Server):
+
+ def __init__(self, host="localhost", port=80, **kwargs):
+ super(HTTPServer, self).__init__(host=host, port=port, **kwargs)
+ self.app = Flask(__name__)
+ conf = configuration.get_configuration("components/orchestrator")
+ self.orchestrator_hostname = conf["components/orchestrator"].get("hostname", "orchestrator")
+ self.orchestrator_port = conf["components/orchestrator"].get("port", 80)
+ conf = configuration.get_configuration("components/manager")
+ self.manager_hostname = conf["components/manager"].get("hostname", "manager")
+ self.manager_port = conf["components/manager"].get("port", 80)
+ # TODO : specify only few urls instead of *
+ # CORS(self.app)
+ self.api = Api(self.app)
+ self.driver = get_driver()
+ logger.info("Driver = {}".format(self.driver.__class__))
+ self.__set_route()
+ self.__hook_errors()
+ self.create_wrappers()
+ pdp = requests.get("http://{}:{}/pdp".format(self.manager_hostname,
+ self.manager_port))
+ logger.info("pdp={}".format(pdp))
+ for _pdp_key, _pdp_value in pdp.json()['pdps'].items():
+ if _pdp_value.get('keystone_project_id'):
+ # TODO: select context to add security function
+ self.create_security_function(
+ keystone_project_id=_pdp_value.get('keystone_project_id'),
+ pdp_id=_pdp_key,
+ policy_ids=_pdp_value.get('security_pipeline', []))
+
+ def __hook_errors(self):
+
+ def get_404_json(e):
+ return jsonify({"result": False, "code": 404, "description": str(e)}), 404
+ self.app.register_error_handler(404, get_404_json)
+
+ def get_400_json(e):
+ return jsonify({"result": False, "code": 400, "description": str(e)}), 400
+ self.app.register_error_handler(400, lambda e: get_400_json)
+ self.app.register_error_handler(403, exceptions.AuthException)
+
+ def __set_route(self):
+ self.api.add_resource(Root, '/')
+
+ for api in __API__:
+ self.api.add_resource(api, *api.__urls__)
+ self.api.add_resource(Pods, *Pods.__urls__,
+ resource_class_kwargs={
+ "driver": self.driver,
+ })
+
+ def run(self):
+ self.app.run(host=self._host, port=self._port) # nosec
+
+ @staticmethod
+ def __filter_str(data):
+ return data.replace("@", "-")
+
+ def create_wrappers(self):
+ contexts, active_context = self.driver.get_slaves()
+ logger.info("contexts: {}".format(contexts))
+ logger.info("active_context: {}".format(active_context))
+ conf = configuration.get_configuration("components/wrapper")
+ hostname = conf["components/wrapper"].get(
+ "hostname", "wrapper")
+ port = conf["components/wrapper"].get("port", 80)
+ container = conf["components/wrapper"].get(
+ "container",
+ "wukongsun/moon_wrapper:v4.3")
+ for _ctx in contexts:
+ _config = config.new_client_from_config(context=_ctx['name'])
+ logger.info("_config={}".format(_config))
+ api_client = client.CoreV1Api(_config)
+ ext_client = client.ExtensionsV1beta1Api(_config)
+ # TODO: get data from consul
+ data = [{
+ "name": hostname + "-" + get_random_name(),
+ "container": container,
+ "port": port,
+ "namespace": "moon"
+ }, ]
+ pod = self.driver.load_pod(data, api_client, ext_client, expose=True)
+ logger.info('wrapper pod={}'.format(pod))
+
+ def create_security_function(self, keystone_project_id,
+ pdp_id, policy_ids, active_context=None,
+ active_context_name=None):
+ """ Create security functions
+
+ :param policy_id: the policy ID mapped to this security function
+ :param active_context: if present, add the security function in this
+ context
+ :param active_context_name: if present, add the security function in
+ this context name
+ if active_context_name and active_context are not present, add the
+ security function in all context (ie, in all slaves)
+ :return: None
+ """
+ logger.info(self.driver.get_pods())
+ for key, value in self.driver.get_pods().items():
+ for _pod in value:
+ if _pod.get('keystone_project_id') == keystone_project_id:
+ logger.warning("A pod for this Keystone project {} "
+ "already exists.".format(keystone_project_id))
+ return
+ plugins = configuration.get_plugins()
+ conf = configuration.get_configuration("components/interface")
+ i_hostname = conf["components/interface"].get("hostname", "interface")
+ i_port = conf["components/interface"].get("port", 80)
+ i_container = conf["components/interface"].get(
+ "container",
+ "wukongsun/moon_interface:v4.3")
+ data = [
+ {
+ "name": i_hostname + "-" + get_random_name(),
+ "container": i_container,
+ "port": i_port,
+ 'pdp_id': pdp_id,
+ 'genre': "interface",
+ 'keystone_project_id': keystone_project_id,
+ "namespace": "moon"
+ },
+ ]
+ policies = requests.get("http://{}:{}/policies".format(
+ self.manager_hostname, self.manager_port)).json().get(
+ "policies", dict())
+ models = requests.get("http://{}:{}/models".format(
+ self.manager_hostname, self.manager_port)).json().get(
+ "models", dict())
+
+ for policy_id in policy_ids:
+ if policy_id in policies:
+ genre = policies[policy_id].get("genre", "authz")
+ if genre in plugins:
+ for meta_rule in models[policies[policy_id]['model_id']]['meta_rules']:
+ data.append({
+ "name": genre + "-" + get_random_name(),
+ "container": plugins[genre]['container'],
+ 'pdp_id': pdp_id,
+ "port": plugins[genre].get('port', 8080),
+ 'genre': genre,
+ 'policy_id': policy_id,
+ 'meta_rule_id': meta_rule,
+ 'keystone_project_id': keystone_project_id,
+ "namespace": "moon"
+ })
+ contexts, _active_context = self.driver.get_slaves()
+ if active_context_name:
+ for _context in contexts:
+ if _context["name"] == active_context_name:
+ active_context = _context
+ break
+ if active_context:
+ active_context = _active_context
+ _config = config.new_client_from_config(
+ context=active_context['name'])
+ logger.info("_config={}".format(_config))
+ api_client = client.CoreV1Api(_config)
+ ext_client = client.ExtensionsV1beta1Api(_config)
+ self.driver.load_pod(data, api_client, ext_client)
+ return
+ for _ctx in contexts:
+ _config = config.new_client_from_config(context=_ctx['name'])
+ logger.info("_config={}".format(_config))
+ api_client = client.CoreV1Api(_config)
+ ext_client = client.ExtensionsV1beta1Api(_config)
+ self.driver.load_pod(data, api_client, ext_client)
+
+
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/messenger.py b/moonv4/moon_orchestrator/moon_orchestrator/messenger.py
deleted file mode 100644
index 2b7b3866..00000000
--- a/moonv4/moon_orchestrator/moon_orchestrator/messenger.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
-# This software is distributed under the terms and conditions of the 'Apache-2.0'
-# license which can be found in the file 'LICENSE' in this package distribution
-# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-
-import oslo_messaging
-from oslo_log import log as logging
-import time
-from moon_utilities.api import APIList
-from moon_utilities import configuration
-
-from oslo_config import cfg
-from moon_orchestrator.api.generic import Status, Logs
-from moon_orchestrator.api.containers import Containers
-from moon_orchestrator.api.slaves import Slaves
-
-TOPIC = "orchestrator"
-LOG = logging.getLogger("moon.orchestrator.messenger")
-CONF = cfg.CONF
-
-
-class Server:
-
- def __init__(self, containers, docker_manager, slaves):
- cfg.CONF.transport_url = self.__get_transport_url()
- self.CONTAINERS = containers
- self.transport = oslo_messaging.get_transport(cfg.CONF)
- self.target = oslo_messaging.Target(topic=TOPIC, server='server1')
- LOG.info("Starting MQ server with topic: {}".format(TOPIC))
- self.docker_manager = docker_manager
- for _container in containers:
- Status._container = containers[_container]
- self.endpoints = [
- APIList((Status, Logs, Containers)),
- Status(),
- Logs(),
- Containers(self.docker_manager),
- Slaves(slaves)
- ]
- self.server = oslo_messaging.get_rpc_server(self.transport, self.target, self.endpoints,
- executor='threading',
- access_policy=oslo_messaging.DefaultRPCAccessPolicy)
-
- @staticmethod
- def __get_transport_url():
- messenger = configuration.get_configuration(configuration.MESSENGER)["messenger"]
- return messenger['url']
-
- def run(self):
- try:
- self.server.start()
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- LOG.warning("Stopping server by crtl+c (please be patient, closing connections...)")
- except SystemExit:
- LOG.warning("Stopping server (please be patient, closing connections...)")
- except Exception as e:
- LOG.error("Exception occurred: {}".format(e))
-
- self.server.stop()
- self.server.wait()
-
diff --git a/moonv4/moon_orchestrator/moon_orchestrator/server.py b/moonv4/moon_orchestrator/moon_orchestrator/server.py
index 8fd2740f..d8e312d0 100644
--- a/moonv4/moon_orchestrator/moon_orchestrator/server.py
+++ b/moonv4/moon_orchestrator/moon_orchestrator/server.py
@@ -3,174 +3,32 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-import sys
import os
-import hashlib
-from oslo_log import log as logging
-from docker import Client
-import docker.errors as docker_errors
+import logging
from moon_utilities import configuration, exceptions
-from moon_orchestrator import messenger
-
+from moon_orchestrator.http_server import HTTPServer
LOG = logging.getLogger("moon.orchestrator")
+DOMAIN = "moon_orchestrator"
-CONTAINERS = {}
-SLAVES = {}
-docker_conf = configuration.get_configuration("docker")['docker']
-docker = Client(base_url=docker_conf['url'])
-LOG.info("docker_url={}".format(docker_conf['url']))
-docker_network = docker_conf['network']
-
-
-def kill_handler(signum, frame):
- _exit(0)
-
-
-class DockerManager:
-
- def load(self, component, uuid=None, container_data=None):
- """Load a new docker mapping the component given
-
- :param component: the name of the component (policy or function)
- :param uuid: the uuid of the intra_extension linked to that component
- :return: the created component
- """
- component_id = component+"_"+hashlib.sha224(uuid.encode("utf-8")).hexdigest()
- plugins = configuration.get_plugins()
- if component in plugins.keys():
- components = configuration.get_components()
- configuration.add_component(
- name=component_id,
- uuid=component_id,
- port=configuration.increment_port(),
- bind="0.0.0.0",
- extra=container_data,
- container=plugins[component]['container']
- )
- # _command = plugins[component]['command']
- # try:
- # _index = _command.index("<UUID>")
- # _command[_index] = component_id
- # except ValueError:
- # pass
- self.run(component_id, environment={"UUID": component_id})
- CONTAINERS[component_id] = components.get(component_id)
- CONTAINERS[component_id]["running"] = True
- return CONTAINERS[component_id]
-
- def load_all_containers(self):
- LOG.info("Try to load all containers...")
- current_containers = [item["Names"][0] for item in docker.containers()]
- components = configuration.get_components()
- containers_not_running = []
- for c_name in (
- '/keystone',
- '/consul',
- '/db',
- '/messenger'
- ):
- if c_name not in current_containers:
- containers_not_running.append(c_name)
- if containers_not_running:
- raise exceptions.ContainerMissing(
- "Following containers are missing: {}".format(", ".join(containers_not_running)))
- for c_name in (
- '/interface',
- '/manager',
- '/router'):
- if c_name not in current_containers:
- LOG.info("Starting container {}...".format(c_name))
- self.run(c_name.strip("/"))
- else:
- LOG.info("Container {} already running...".format(c_name))
- CONTAINERS[c_name] = components.get(c_name.strip("/"))
- CONTAINERS[c_name]["running"] = True
-
- def run(self, name, environment=None):
- components = configuration.get_components()
- if name in components:
- image = components[name]['container']
- params = {
- 'image': image,
- 'name': name,
- 'hostname': name,
- 'detach': True,
- 'host_config': docker.create_host_config(network_mode=docker_network)
- }
- if 'port' in components[name] and components[name]['port']:
- params["ports"] = [components[name]['port'], ]
- params["host_config"] = docker.create_host_config(
- network_mode=docker_network,
- port_bindings={components[name]['port']: components[name]['port']}
- )
- if environment:
- params["environment"] = environment
- container = docker.create_container(**params)
- docker.start(container=container.get('Id'))
-
- @staticmethod
- def get_component(uuid=None):
- if uuid:
- return CONTAINERS.get(uuid, None)
- return CONTAINERS
-
- @staticmethod
- def kill(component_id, delete=True):
- LOG.info("Killing container {}".format(component_id))
- docker.kill(container=component_id)
- if delete:
- docker.remove_container(container=component_id)
-
+__CWD__ = os.path.dirname(os.path.abspath(__file__))
-def _exit(exit_number=0, error=None):
- for _container in CONTAINERS:
- LOG.warning("Deleting containers named {}...".format(_container))
- # print(40 * "-" + _container)
- try:
- # print(docker.logs(container=_container).decode("utf-8"))
- docker.kill(container=_container)
- except docker_errors.NotFound:
- LOG.error("The container {} was not found".format(_container))
- except docker_errors.APIError as e:
- LOG.error(e)
- else:
- docker.remove_container(container=_container)
- LOG.info("Moon orchestrator: offline")
-
- # TODO (asteroide): put in the debug log
- if error:
- LOG.info(str(error))
- sys.exit(exit_number)
-
-
-def __save_pid():
- try:
- open("/var/run/moon_orchestrator.pid", "w").write(str(os.getpid()))
- except PermissionError:
- LOG.warning("You don't have the right to write PID file in /var/run... Continuing anyway.")
- LOG.warning("Writing PID file in {}".format(os.getcwd()))
- open("./moon_orchestrator.pid", "w").write(str(os.getpid()))
-
-
-def server():
+def main():
configuration.init_logging()
- conf = configuration.add_component("orchestrator", "orchestrator")
- LOG.info("Starting main server {}".format(conf["components/orchestrator"]["hostname"]))
-
- docker_manager = DockerManager()
-
- docker_manager.load_all_containers()
- serv = messenger.Server(containers=CONTAINERS, docker_manager=docker_manager, slaves=SLAVES)
try:
- serv.run()
- finally:
- _exit(0)
-
-
-def main():
- server()
+ conf = configuration.get_configuration("components/orchestrator")
+ hostname = conf["components/orchestrator"].get("hostname", "orchestrator")
+ port = conf["components/orchestrator"].get("port", 80)
+ bind = conf["components/orchestrator"].get("bind", "127.0.0.1")
+ except exceptions.ConsulComponentNotFound:
+ hostname = "orchestrator"
+ bind = "127.0.0.1"
+ port = 80
+ configuration.add_component(uuid="orchestrator", name=hostname, port=port, bind=bind)
+ LOG.info("Starting server with IP {} on port {} bind to {}".format(hostname, port, bind))
+ server = HTTPServer(host=bind, port=port)
+ server.run()
if __name__ == '__main__':
diff --git a/moonv4/moon_orchestrator/requirements.txt b/moonv4/moon_orchestrator/requirements.txt
index c7653278..29885a49 100644
--- a/moonv4/moon_orchestrator/requirements.txt
+++ b/moonv4/moon_orchestrator/requirements.txt
@@ -1,12 +1,7 @@
-docker-py
-kombu !=4.0.1,!=4.0.0
-oslo.messaging !=5.14.0,!=5.13.0
-oslo.config
-oslo.log
-vine
-jinja2
-sqlalchemy
-pymysql
+flask
+flask_restful
+flask_cors
werkzeug
moon_utilities
-moon_db \ No newline at end of file
+moon_db
+kubernetes \ No newline at end of file