diff options
Diffstat (limited to 'moon_manager/moon_manager/api/json_export.py')
-rw-r--r-- | moon_manager/moon_manager/api/json_export.py | 282 |
1 files changed, 20 insertions, 262 deletions
diff --git a/moon_manager/moon_manager/api/json_export.py b/moon_manager/moon_manager/api/json_export.py index 069e5884..06a7ba71 100644 --- a/moon_manager/moon_manager/api/json_export.py +++ b/moon_manager/moon_manager/api/json_export.py @@ -1,279 +1,37 @@ -# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors -# This software is distributed under the terms and conditions of the 'Apache-2.0' -# license which can be found in the file 'LICENSE' in this package distribution -# or at 'http://www.apache.org/licenses/LICENSE-2.0'. +# Software Name: MOON -import logging -from flask_restful import Resource -from python_moonutilities.security_functions import check_auth -from python_moondb.core import PDPManager -from python_moondb.core import PolicyManager -from python_moondb.core import ModelManager -from moon_manager.api.json_utils import JsonUtils, BaseException - -__version__ = "4.5.0" - -logger = logging.getLogger("moon.manager.api." + __name__) - - -class JsonExport(Resource): - __urls__ = ( - "/export", - "/export/", - ) - - def _export_rules(self, json_content): - policies = PolicyManager.get_policies(self._user_id) - rules_array = [] - - for policy_key in policies: - rules = PolicyManager.get_rules(self._user_id, policy_key) - rules = rules["rules"] - # logger.info(rules) - for rule in rules: - rule_dict = dict() - JsonUtils.copy_field_if_exists(rule, rule_dict, "instructions", dict) - JsonUtils.copy_field_if_exists(rule, rule_dict, "enabled", True) - JsonUtils.convert_id_to_name(rule["meta_rule_id"], rule_dict, "meta_rule", - "meta_rule", ModelManager, self._user_id) - JsonUtils.convert_id_to_name(policy_key, rule_dict, "policy", "policy", - PolicyManager, self._user_id) - ids = rule["rule"] - rule_description = dict() - meta_rule = ModelManager.get_meta_rules(self._user_id, rule["meta_rule_id"]) - meta_rule = [v for v in meta_rule.values()] - meta_rule = meta_rule[0] - index_subject_data = len(meta_rule["subject_categories"]) - 1 - index_object_data = len(meta_rule["subject_categories"]) + len( - meta_rule["object_categories"]) - 1 - index_action_data = len(meta_rule["subject_categories"]) + len( - meta_rule["object_categories"]) + len(meta_rule["action_categories"]) - 1 - ids_subject_data = [ids[0]] if len(meta_rule["subject_categories"]) == 1 else ids[ - 0:index_subject_data] - ids_object_data = [ids[index_object_data]] if len( - meta_rule["object_categories"]) == 1 else ids[ - index_subject_data + 1:index_object_data] - ids_action_date = [ids[index_action_data]] if len( - meta_rule["action_categories"]) == 1 else ids[ - index_object_data + 1:index_action_data] - JsonUtils.convert_ids_to_names(ids_subject_data, rule_description, "subject_data", - "subject_data", PolicyManager, self._user_id, - policy_key) - JsonUtils.convert_ids_to_names(ids_object_data, rule_description, "object_data", - "object_data", PolicyManager, self._user_id, - policy_key) - JsonUtils.convert_ids_to_names(ids_action_date, rule_description, "action_data", - "action_data", PolicyManager, self._user_id, - policy_key) - rule_dict["rule"] = rule_description - rules_array.append(rule_dict) - - if len(rules_array) > 0: - json_content['rules'] = rules_array - - def _export_meta_rules(self, json_content): - meta_rules = ModelManager.get_meta_rules(self._user_id) - meta_rules_array = [] - # logger.info(meta_rules) - for meta_rule_key in meta_rules: - # logger.info(meta_rules[meta_rule_key]) - meta_rule_dict = dict() - JsonUtils.copy_field_if_exists(meta_rules[meta_rule_key], meta_rule_dict, "name", str) - JsonUtils.copy_field_if_exists(meta_rules[meta_rule_key], meta_rule_dict, "description", - str) - JsonUtils.convert_ids_to_names(meta_rules[meta_rule_key]["subject_categories"], - meta_rule_dict, "subject_categories", "subject_category", - ModelManager, self._user_id) - JsonUtils.convert_ids_to_names(meta_rules[meta_rule_key]["object_categories"], - meta_rule_dict, "object_categories", "object_category", - ModelManager, self._user_id) - JsonUtils.convert_ids_to_names(meta_rules[meta_rule_key]["action_categories"], - meta_rule_dict, "action_categories", "action_category", - ModelManager, self._user_id) - logger.info("Exporting meta rule {}".format(meta_rule_dict)) - meta_rules_array.append(meta_rule_dict) - if len(meta_rules_array) > 0: - json_content['meta_rules'] = meta_rules_array +# Version: 5.4 - def _export_subject_object_action_assignments(self, type_element, json_content): - export_method_data = getattr(PolicyManager, 'get_' + type_element + '_assignments') - policies = PolicyManager.get_policies(self._user_id) - element_assignments_array = [] - for policy_key in policies: - assignments = export_method_data(self._user_id, policy_key) - # logger.info(assignments) - for assignment_key in assignments: - assignment_dict = dict() - JsonUtils.convert_id_to_name(assignments[assignment_key][type_element + "_id"], - assignment_dict, type_element, type_element, - PolicyManager, self._user_id, policy_key) - JsonUtils.convert_id_to_name(assignments[assignment_key]["category_id"], - assignment_dict, "category", - type_element + "_category", ModelManager, - self._user_id, policy_key) - JsonUtils.convert_ids_to_names(assignments[assignment_key]["assignments"], - assignment_dict, "assignments", - type_element + "_data", PolicyManager, self._user_id, - policy_key) - element_assignments_array.append(assignment_dict) - logger.info("Exporting {} assignment {}".format(type_element, assignment_dict)) - if len(element_assignments_array) > 0: - json_content[type_element + '_assignments'] = element_assignments_array +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 - def _export_subject_object_action_datas(self, type_element, json_content): - export_method_data = getattr(PolicyManager, 'get_' + type_element + '_data') - policies = PolicyManager.get_policies(self._user_id) - element_datas_array = [] - for policy_key in policies: - datas = export_method_data(self._user_id, policy_key) - # logger.info("data found : {}".format(datas)) - for data_group in datas: - policy_id = data_group["policy_id"] - category_id = data_group["category_id"] - # logger.info(data_group["data"]) - for data_key in data_group["data"]: - data_dict = dict() - if type_element == 'subject': - JsonUtils.copy_field_if_exists(data_group["data"][data_key], data_dict, - "name", str) - JsonUtils.copy_field_if_exists(data_group["data"][data_key], data_dict, - "description", str) - else: - JsonUtils.copy_field_if_exists(data_group["data"][data_key], data_dict, - "name", str) - JsonUtils.copy_field_if_exists(data_group["data"][data_key], data_dict, - "description", str) +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. - JsonUtils.convert_id_to_name(policy_id, data_dict, "policy", "policy", - PolicyManager, self._user_id) - JsonUtils.convert_id_to_name(category_id, data_dict, "category", - type_element + "_category", ModelManager, - self._user_id, policy_key) - logger.info("Exporting {} data {}".format(type_element, data_dict)) - element_datas_array.append(data_dict) - if len(element_datas_array) > 0: - json_content[type_element + '_data'] = element_datas_array - - def _export_subject_object_action_categories(self, type_element, json_content): - export_method = getattr(ModelManager, 'get_' + type_element + '_categories') - element_categories = export_method(self._user_id) - element_categories_array = [] - for element_category_key in element_categories: - element_category = dict() - JsonUtils.copy_field_if_exists(element_categories[element_category_key], - element_category, "name", str) - JsonUtils.copy_field_if_exists(element_categories[element_category_key], - element_category, "description", str) - element_categories_array.append(element_category) - logger.info("Exporting {} category {}".format(type_element, element_category)) - if len(element_categories_array) > 0: - json_content[type_element + '_categories'] = element_categories_array - - def _export_subject_object_action(self, type_element, json_content): - export_method = getattr(PolicyManager, 'get_' + type_element + 's') - policies = PolicyManager.get_policies(self._user_id) - element_dict = dict() - elements_array = [] - for policy_key in policies: - elements = export_method(self._user_id, policy_key) - for element_key in elements: - # logger.info("Exporting {}".format(elements[element_key])) - element = dict() - JsonUtils.copy_field_if_exists(elements[element_key], element, "name", str) - JsonUtils.copy_field_if_exists(elements[element_key], element, "description", str) - JsonUtils.copy_field_if_exists(elements[element_key], element, "extra", dict) - if element["name"] not in element_dict: - element["policies"] = [] - element_dict[element["name"]] = element - current_element = element_dict[element["name"]] - current_element["policies"].append({"name": JsonUtils.convert_id_to_name_string( - policy_key, "policy", PolicyManager, self._user_id)}) - - for key in element_dict: - logger.info("Exporting {} {}".format(type_element, element_dict[key])) - elements_array.append(element_dict[key]) - - if len(elements_array) > 0: - json_content[type_element + 's'] = elements_array - - def _export_policies(self, json_content): - policies = PolicyManager.get_policies(self._user_id) - policies_array = [] - for policy_key in policies: - policy = dict() - JsonUtils.copy_field_if_exists(policies[policy_key], policy, "name", str) - JsonUtils.copy_field_if_exists(policies[policy_key], policy, "genre", str) - JsonUtils.copy_field_if_exists(policies[policy_key], policy, "description", str) - JsonUtils.convert_id_to_name(policies[policy_key]["model_id"], policy, "model", "model", - ModelManager, self._user_id) - logger.info("Exporting policy {}".format(policy)) - policies_array.append(policy) - if len(policies_array) > 0: - json_content["policies"] = policies_array - - def _export_models(self, json_content): - models = ModelManager.get_models(self._user_id) - models_array = [] - for model_key in models: - model = dict() - JsonUtils.copy_field_if_exists(models[model_key], model, "name", str) - JsonUtils.copy_field_if_exists(models[model_key], model, "description", str) - # logger.info(models[model_key]["meta_rules"]) - JsonUtils.convert_ids_to_names(models[model_key]["meta_rules"], model, "meta_rules", - "meta_rule", ModelManager, self._user_id) - logger.info("Exporting model {}".format(model)) - models_array.append(model) - if len(models_array) > 0: - json_content["models"] = models_array - - def _export_pdps(self, json_content): - pdps = PDPManager.get_pdp(self._user_id) - pdps_array = [] - for pdp_key in pdps: - logger.info("Exporting pdp {}".format(pdps[pdp_key])) - pdps_array.append(pdps[pdp_key]) - if len(pdps_array) > 0: - json_content["pdps"] = pdps_array +import hug +import logging +from moon_manager import db_driver as driver +from moon_utilities.auth_functions import api_key_authentication +from moon_utilities.json_utils import JsonExport - def _export_json(self, user_id): - self._user_id = user_id - json_content = dict() +logger = logging.getLogger("moon.manager.api." + __name__) - logger.info("Exporting pdps...") - self._export_pdps(json_content) - logger.info("Exporting policies...") - self._export_policies(json_content) - logger.info("Exporting models...") - self._export_models(json_content) - # export subjects, subject_data, subject_categories, subject_assignements idem for object and action - list_element = [{"key": "subject"}, {"key": "object"}, {"key": "action"}] - for elt in list_element: - logger.info("Exporting {}s...".format(elt["key"])) - self._export_subject_object_action(elt["key"], json_content) - logger.info("Exporting {} categories...".format(elt["key"])) - self._export_subject_object_action_categories(elt["key"], json_content) - logger.info("Exporting {} data...".format(elt["key"])) - self._export_subject_object_action_datas(elt["key"], json_content) - logger.info("Exporting {} assignments...".format(elt["key"])) - self._export_subject_object_action_assignments(elt["key"], json_content) - logger.info("Exporting meta rules...") - self._export_meta_rules(json_content) - logger.info("Exporting rules...") - self._export_rules(json_content) - return json_content +class Export(object): - @check_auth - def get(self, user_id=None): + @staticmethod + @hug.get("/export", requires=api_key_authentication) + def get(authed_user: hug.directives.user = None): """Import file. - :param user_id: user ID who do the request + :param authed_user: user ID who do the request :return: { } :internal_api: """ - json_file = self._export_json(user_id) - logger.info(json_file) + json_file = JsonExport(driver_name="db", driver=driver).export_json( + moon_user_id=authed_user) return {"content": json_file} |