diff options
Diffstat (limited to 'moon_engine/moon_engine/api/pipeline/validator.py')
-rw-r--r-- | moon_engine/moon_engine/api/pipeline/validator.py | 125 |
1 files changed, 125 insertions, 0 deletions
diff --git a/moon_engine/moon_engine/api/pipeline/validator.py b/moon_engine/moon_engine/api/pipeline/validator.py new file mode 100644 index 00000000..1fc8588a --- /dev/null +++ b/moon_engine/moon_engine/api/pipeline/validator.py @@ -0,0 +1,125 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from moon_cache.context import Context +from moon_utilities import exceptions +import itertools +import logging +import hug +from moon_cache.cache import Cache +from moon_engine.api.configuration import get_configuration + +LOGGER = logging.getLogger("moon.authz.api." + __name__) + + +class Validator(object): + __CACHE = None + + def __init__(self): + + self.__CACHE = Cache.getInstance(manager_url=get_configuration("manager_url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + self.context = None + + def authz(self, subject_name, object_name, action_name): + + ctx = { + "pdp_id": get_configuration("uuid"), + "subject_name": subject_name, + "object_name": object_name, + "action_name": action_name + } + self.context = Context(ctx, self.__CACHE) + + self.context.set_cache(self.__CACHE) + self.context.increment_index() + response = self.__authz_request() + self.context.delete_cache() + return response + + def __authz_request(self): + + LOGGER.debug("self.context.pdp_set={}".format(self.context.pdp_set)) + result, message = self.__check_rules() + if result: + if self.__exec_instructions(result): + return hug.HTTP_204 + else: + self.context.current_state = "deny" + return hug.HTTP_403 + + def __check_rules(self): + + scopes_list = list() + current_header_id = self.context.headers[self.context.index] + if not self.context.pdp_set: + raise exceptions.PdpUnknown + if current_header_id not in self.context.pdp_set: + raise Exception('Invalid index') + current_pdp = self.context.pdp_set[current_header_id] + category_list = list() + if 'meta_rules' not in current_pdp: + raise exceptions.PdpContentError + try: + category_list.extend(current_pdp["meta_rules"]["subject_categories"]) + category_list.extend(current_pdp["meta_rules"]["object_categories"]) + category_list.extend(current_pdp["meta_rules"]["action_categories"]) + except Exception: + raise exceptions.MetaRuleContentError + if 'target' not in current_pdp: + raise exceptions.PdpContentError + for category in category_list: + scope = list(current_pdp['target'][category]) + scopes_list.append(scope) + if self.context.current_policy_id not in self.__CACHE.rules: + raise exceptions.PolicyUnknown + if 'rules' not in self.__CACHE.rules[self.context.current_policy_id]: + raise exceptions.RuleUnknown + + for item in itertools.product(*scopes_list): + req = list(item) + for rule in self.__CACHE.rules[self.context.current_policy_id]["rules"]: + if req == rule['rule']: + return rule['instructions'], "" + if not list(itertools.product(*scopes_list)): + LOGGER.error("There is an error in retrieved scopes ({})".format(scopes_list)) + cat_list = [] + categories = dict(self.__CACHE.subject_categories) + categories.update(dict(self.__CACHE.object_categories)) + categories.update(dict(self.__CACHE.action_categories)) + for category in category_list: + if category.startswith("attributes:"): + cat_list.append(category) + else: + cat_list.append(categories[category].get('name')) + LOGGER.error("Categories are ({})".format(", ".join(cat_list))) + return False, "There is an error in retrieved scopes" + LOGGER.warning("No rule match the request...") + return False, "No rule match the request..." + + def __exec_instructions(self, instructions): + + for instruction in instructions: + for key in instruction: + if key == "decision": + if instruction["decision"] == "grant": + self.context.current_state = "grant" + LOGGER.info("__exec_instructions True {}".format( + self.context.current_state)) + return True + else: + self.context.current_state = instruction["decision"].lower() + + LOGGER.info("__exec_instructions False {}".format(self.context.current_state)) + + return False |