1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# Software Name: MOON
# Version: 5.4
# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
# SPDX-License-Identifier: Apache-2.0
# This software is distributed under the 'Apache License 2.0',
# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
# or see the "LICENSE" file for more details.
from moon_cache.context import Context
from moon_utilities import exceptions
import itertools
import logging
import hug
from moon_cache.cache import Cache
from moon_engine.api.configuration import get_configuration
LOGGER = logging.getLogger("moon.authz.api." + __name__)
class Validator(object):
__CACHE = None
def __init__(self):
self.__CACHE = Cache.getInstance(manager_url=get_configuration("manager_url"),
incremental=get_configuration("incremental_updates"),
manager_api_key=get_configuration("api_token"))
self.context = None
def authz(self, subject_name, object_name, action_name):
ctx = {
"pdp_id": get_configuration("uuid"),
"subject_name": subject_name,
"object_name": object_name,
"action_name": action_name
}
self.context = Context(ctx, self.__CACHE)
self.context.set_cache(self.__CACHE)
self.context.increment_index()
response = self.__authz_request()
self.context.delete_cache()
return response
def __authz_request(self):
LOGGER.debug("self.context.pdp_set={}".format(self.context.pdp_set))
result, message = self.__check_rules()
if result:
if self.__exec_instructions(result):
return hug.HTTP_204
else:
self.context.current_state = "deny"
return hug.HTTP_403
def __check_rules(self):
scopes_list = list()
current_header_id = self.context.headers[self.context.index]
if not self.context.pdp_set:
raise exceptions.PdpUnknown
if current_header_id not in self.context.pdp_set:
raise Exception('Invalid index')
current_pdp = self.context.pdp_set[current_header_id]
category_list = list()
if 'meta_rules' not in current_pdp:
raise exceptions.PdpContentError
try:
category_list.extend(current_pdp["meta_rules"]["subject_categories"])
category_list.extend(current_pdp["meta_rules"]["object_categories"])
category_list.extend(current_pdp["meta_rules"]["action_categories"])
except Exception:
raise exceptions.MetaRuleContentError
if 'target' not in current_pdp:
raise exceptions.PdpContentError
for category in category_list:
scope = list(current_pdp['target'][category])
scopes_list.append(scope)
if self.context.current_policy_id not in self.__CACHE.rules:
raise exceptions.PolicyUnknown
if 'rules' not in self.__CACHE.rules[self.context.current_policy_id]:
raise exceptions.RuleUnknown
for item in itertools.product(*scopes_list):
req = list(item)
for rule in self.__CACHE.rules[self.context.current_policy_id]["rules"]:
if req == rule['rule']:
return rule['instructions'], ""
if not list(itertools.product(*scopes_list)):
LOGGER.error("There is an error in retrieved scopes ({})".format(scopes_list))
cat_list = []
categories = dict(self.__CACHE.subject_categories)
categories.update(dict(self.__CACHE.object_categories))
categories.update(dict(self.__CACHE.action_categories))
for category in category_list:
if category.startswith("attributes:"):
cat_list.append(category)
else:
cat_list.append(categories[category].get('name'))
LOGGER.error("Categories are ({})".format(", ".join(cat_list)))
return False, "There is an error in retrieved scopes"
LOGGER.warning("No rule match the request...")
return False, "No rule match the request..."
def __exec_instructions(self, instructions):
for instruction in instructions:
for key in instruction:
if key == "decision":
if instruction["decision"] == "grant":
self.context.current_state = "grant"
LOGGER.info("__exec_instructions True {}".format(
self.context.current_state))
return True
else:
self.context.current_state = instruction["decision"].lower()
LOGGER.info("__exec_instructions False {}".format(self.context.current_state))
return False
|