aboutsummaryrefslogtreecommitdiffstats
path: root/old/external_policy_checker/external_policy_checker/server.py
blob: cbb4a933ccae571e1c7d800764e645f62291f788 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Copyright 2018 Orange
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.

import sys
import flask
from flask import Flask
from flask import request
import json
import logging
import random

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
app = Flask(__name__)


def test_target(data, result):
    if "resource_id" in data:
        result["resource_id"] = data['object_id']
    if "object_id" in data:
        result["resource_id"] = data['object_id']
    if 'project_id' in data:
        result["project_id"] = data['project_id']
    if 'user_id' in data:
        result["user_id"] = data['user_id']


def test_credentials(data, result):
    if 'project_id' in data:
        result["project_id"] = data['project_id']
    if 'user_id' in data:
        result["user_id"] = data['user_id']
    if 'project_domain_id' in data:
        result["domain_id"] = data['project_domain_id']


def test_rule(data, result):
    result['action_name'] = data


def test_data():
    data = request.form
    result = {
        "user_id": "",
        "project_id": "",
        "action_name": "",
        "resource_id": "",
        "domain_id": "",
    }
    if not dict(request.form):
        data = json.loads(request.data.decode("utf-8"))
    try:
        target = json.loads(data.get('target', {}))
    except Exception:
        raise Exception("Error reading target")
    try:
        credentials = json.loads(data.get('credentials', {}))
    except Exception:
        raise Exception("Error reading credentials")
    try:
        rule = data.get('rule', "")
    except Exception:
        raise Exception("Error reading rule")
    test_target(target, result)
    test_credentials(credentials, result)
    test_rule(rule, result)
    return_value = True
    logger.info("Analysing request with {}".format(rule))
    for key in result:
        if not result[key] and key != "domain_id":
            return_value = False
            logger.error("Attribute {} is absent".format(key))
        if not result[key] and key == "domain_id":
            logger.warning("Attribute {} is missing.".format(key))
    return return_value


@app.route("/policy_checker", methods=["POST"])
def checker():
    information_is_complete = False
    try:
        information_is_complete = test_data()
    except Exception as e:
        logger.exception(e)
    if information_is_complete:
        response = flask.make_response("True")
        response.headers['content-type'] = 'application/octet-stream'
        return response
    else:
        response = flask.make_response("False")
        response.headers['content-type'] = 'application/octet-stream'
        return response, 403


def get_target():
    data = request.form
    if not dict(request.form):
        data = json.loads(request.data.decode("utf-8"))
    try:
        return json.loads(data.get('target', {}))
    except Exception:
        raise Exception("Error reading target")


@app.route("/authz/grant", methods=["POST"])
def wrapper_grant():
    logger.info("Requesting wrapper authz with {}".format(get_target()))
    response = flask.make_response("True")
    response.headers['content-type'] = 'application/octet-stream'
    return response


@app.route("/authz/deny", methods=["POST"])
def wrapper_deny():
    logger.info("Requesting wrapper authz with {}".format(get_target()))
    response = flask.make_response("False")
    response.headers['content-type'] = 'application/octet-stream'
    return response, 403


def main():
    port = 8080
    if len(sys.argv) > 1:
        try:
            port = int(sys.argv[1])
        except ValueError:
            logger.error("Argument for Port in command line is not an integer")
            sys.exit(1)
    app.run(host="0.0.0.0", port=port)


if __name__ == "__main__":
    main()