aboutsummaryrefslogtreecommitdiffstats
path: root/moon_engine/moon_engine/api/pipeline/validator.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_engine/moon_engine/api/pipeline/validator.py')
-rw-r--r--moon_engine/moon_engine/api/pipeline/validator.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/moon_engine/moon_engine/api/pipeline/validator.py b/moon_engine/moon_engine/api/pipeline/validator.py
new file mode 100644
index 00000000..1fc8588a
--- /dev/null
+++ b/moon_engine/moon_engine/api/pipeline/validator.py
@@ -0,0 +1,125 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from moon_cache.context import Context
+from moon_utilities import exceptions
+import itertools
+import logging
+import hug
+from moon_cache.cache import Cache
+from moon_engine.api.configuration import get_configuration
+
+LOGGER = logging.getLogger("moon.authz.api." + __name__)
+
+
+class Validator(object):
+ __CACHE = None
+
+ def __init__(self):
+
+ self.__CACHE = Cache.getInstance(manager_url=get_configuration("manager_url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+ self.context = None
+
+ def authz(self, subject_name, object_name, action_name):
+
+ ctx = {
+ "pdp_id": get_configuration("uuid"),
+ "subject_name": subject_name,
+ "object_name": object_name,
+ "action_name": action_name
+ }
+ self.context = Context(ctx, self.__CACHE)
+
+ self.context.set_cache(self.__CACHE)
+ self.context.increment_index()
+ response = self.__authz_request()
+ self.context.delete_cache()
+ return response
+
+ def __authz_request(self):
+
+ LOGGER.debug("self.context.pdp_set={}".format(self.context.pdp_set))
+ result, message = self.__check_rules()
+ if result:
+ if self.__exec_instructions(result):
+ return hug.HTTP_204
+ else:
+ self.context.current_state = "deny"
+ return hug.HTTP_403
+
+ def __check_rules(self):
+
+ scopes_list = list()
+ current_header_id = self.context.headers[self.context.index]
+ if not self.context.pdp_set:
+ raise exceptions.PdpUnknown
+ if current_header_id not in self.context.pdp_set:
+ raise Exception('Invalid index')
+ current_pdp = self.context.pdp_set[current_header_id]
+ category_list = list()
+ if 'meta_rules' not in current_pdp:
+ raise exceptions.PdpContentError
+ try:
+ category_list.extend(current_pdp["meta_rules"]["subject_categories"])
+ category_list.extend(current_pdp["meta_rules"]["object_categories"])
+ category_list.extend(current_pdp["meta_rules"]["action_categories"])
+ except Exception:
+ raise exceptions.MetaRuleContentError
+ if 'target' not in current_pdp:
+ raise exceptions.PdpContentError
+ for category in category_list:
+ scope = list(current_pdp['target'][category])
+ scopes_list.append(scope)
+ if self.context.current_policy_id not in self.__CACHE.rules:
+ raise exceptions.PolicyUnknown
+ if 'rules' not in self.__CACHE.rules[self.context.current_policy_id]:
+ raise exceptions.RuleUnknown
+
+ for item in itertools.product(*scopes_list):
+ req = list(item)
+ for rule in self.__CACHE.rules[self.context.current_policy_id]["rules"]:
+ if req == rule['rule']:
+ return rule['instructions'], ""
+ if not list(itertools.product(*scopes_list)):
+ LOGGER.error("There is an error in retrieved scopes ({})".format(scopes_list))
+ cat_list = []
+ categories = dict(self.__CACHE.subject_categories)
+ categories.update(dict(self.__CACHE.object_categories))
+ categories.update(dict(self.__CACHE.action_categories))
+ for category in category_list:
+ if category.startswith("attributes:"):
+ cat_list.append(category)
+ else:
+ cat_list.append(categories[category].get('name'))
+ LOGGER.error("Categories are ({})".format(", ".join(cat_list)))
+ return False, "There is an error in retrieved scopes"
+ LOGGER.warning("No rule match the request...")
+ return False, "No rule match the request..."
+
+ def __exec_instructions(self, instructions):
+
+ for instruction in instructions:
+ for key in instruction:
+ if key == "decision":
+ if instruction["decision"] == "grant":
+ self.context.current_state = "grant"
+ LOGGER.info("__exec_instructions True {}".format(
+ self.context.current_state))
+ return True
+ else:
+ self.context.current_state = instruction["decision"].lower()
+
+ LOGGER.info("__exec_instructions False {}".format(self.context.current_state))
+
+ return False