diff options
author | Thomas Duval <thomas.duval@orange.com> | 2020-06-03 10:06:52 +0200 |
---|---|---|
committer | Thomas Duval <thomas.duval@orange.com> | 2020-06-03 10:06:52 +0200 |
commit | 7bb53c64da2dcf88894bfd31503accdd81498f3d (patch) | |
tree | 4310e12366818af27947b5e2c80cb162da93a4b5 /moon_manager/moon_manager/api/json_utils.py | |
parent | cbea4e360e9bfaa9698cf7c61c83c96a1ba89b8c (diff) |
Update to new version 5.4HEADstable/jermamaster
Signed-off-by: Thomas Duval <thomas.duval@orange.com>
Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea
Diffstat (limited to 'moon_manager/moon_manager/api/json_utils.py')
-rw-r--r-- | moon_manager/moon_manager/api/json_utils.py | 282 |
1 files changed, 0 insertions, 282 deletions
diff --git a/moon_manager/moon_manager/api/json_utils.py b/moon_manager/moon_manager/api/json_utils.py deleted file mode 100644 index 6a5830f1..00000000 --- a/moon_manager/moon_manager/api/json_utils.py +++ /dev/null @@ -1,282 +0,0 @@ -import logging -from moon_manager.api.base_exception import BaseException - -logger = logging.getLogger("moon.manager.api." + __name__) - - -class UnknownName(BaseException): - def __init__(self, message): - # Call the base class constructor with the parameters it needs - super(UnknownName, self).__init__(message) - - -class UnknownId(BaseException): - def __init__(self, message): - # Call the base class constructor with the parameters it needs - super(UnknownId, self).__init__(message) - - -class MissingIdOrName(BaseException): - def __init__(self, message): - # Call the base class constructor with the parameters it needs - super(MissingIdOrName, self).__init__(message) - - -class UnknownField(BaseException): - def __init__(self, message): - # Call the base class constructor with the parameters it needs - super(UnknownField, self).__init__(message) - - -class JsonUtils: - @staticmethod - def get_override(json_content): - if "override" in json_content: - return json_content["override"] - return False - - @staticmethod - def get_mandatory(json_content): - if "mandatory" in json_content: - return json_content["mandatory"] - return False - - @staticmethod - def copy_field_if_exists(json_in, json_out, field_name, type_field, default_value=None): - if field_name in json_in: - json_out[field_name] = json_in[field_name] - else: - if type_field is bool: - if default_value is None: - default_value = False - json_out[field_name] = default_value - if type_field is str: - if default_value is None: - default_value = "" - json_out[field_name] = default_value - if type_field is dict: - json_out[field_name] = dict() - if type_field is list: - json_out[field_name] = [] - - @staticmethod - def _get_element_in_db_from_id(element_type, element_id, user_id, policy_id, category_id, - meta_rule_id, manager): - # the item is supposed to be in the db, we check it exists! - if element_type == "model": - data_db = manager.get_models(user_id, model_id=element_id) - elif element_type == "policy": - data_db = manager.get_policies(user_id, policy_id=element_id) - elif element_type == "subject": - data_db = manager.get_subjects(user_id, policy_id, perimeter_id=element_id) - elif element_type == "object": - data_db = manager.get_objects(user_id, policy_id, perimeter_id=element_id) - elif element_type == "action": - data_db = manager.get_actions(user_id, policy_id, perimeter_id=element_id) - elif element_type == "subject_category": - data_db = manager.get_subject_categories(user_id, category_id=element_id) - elif element_type == "object_category": - data_db = manager.get_object_categories(user_id, category_id=element_id) - elif element_type == "action_category": - data_db = manager.get_action_categories(user_id, category_id=element_id) - elif element_type == "meta_rule": - data_db = manager.get_meta_rules(user_id, meta_rule_id=element_id) - elif element_type == "subject_data": - data_db = manager.get_subject_data(user_id, policy_id, data_id=element_id, - category_id=category_id) - elif element_type == "object_data": - data_db = manager.get_object_data(user_id, policy_id, data_id=element_id, - category_id=category_id) - elif element_type == "action_data": - data_db = manager.get_action_data(user_id, policy_id, data_id=element_id, - category_id=category_id) - elif element_type == "meta_rule": - data_db = manager.get_meta_rules(user_id, meta_rule_id=meta_rule_id) - else: - raise Exception("Conversion of {} not implemented yet!".format(element_type)) - - # logger.info(data_db) - - # do some post processing ... the result should be {key : { .... .... } } - if element_type == "subject_data" or element_type == "object_data" or element_type == "action_data": - if data_db is not None and isinstance(data_db, list): - # TODO remove comments after fixing the bug on moondb when adding metarule : we can have several identical entries ! - # if len(data_db) > 1: - # raise Exception("Several {} with the same id : {}".format(element_type, data_db)) - data_db = data_db[0] - - if data_db is not None and data_db["data"] is not None and isinstance(data_db["data"], - dict): - # TODO remove comments after fixing the bug on moondb when adding metarule : we can have several identical entries ! - # if len(data_db["data"].values()) != 1: - # raise Exception("Several {} with the same id : {}".format(element_type, data_db)) - # data_db = data_db["data"] - # TODO remove these two lines after fixing the bug on moondb when adding metarule : we can have several identical entries ! - list_values = list(data_db["data"].values()) - data_db = list_values[0] - # logger.info("subject data after postprocessing {}".format(data_db)) - return data_db - - @staticmethod - def _get_element_id_in_db_from_name(element_type, element_name, user_id, policy_id, category_id, - meta_rule_id, manager): - if element_type == "model": - data_db = manager.get_models(user_id) - elif element_type == "policy": - data_db = manager.get_policies(user_id) - elif element_type == "subject": - data_db = manager.get_subjects(user_id, policy_id) - elif element_type == "object": - data_db = manager.get_objects(user_id, policy_id) - elif element_type == "action": - data_db = manager.get_actions(user_id, policy_id) - elif element_type == "subject_category": - data_db = manager.get_subject_categories(user_id) - elif element_type == "object_category": - data_db = manager.get_object_categories(user_id) - elif element_type == "action_category": - data_db = manager.get_action_categories(user_id) - elif element_type == "meta_rule": - data_db = manager.get_meta_rules(user_id) - elif element_type == "subject_data": - data_db = manager.get_subject_data(user_id, policy_id, category_id=category_id) - elif element_type == "object_data": - data_db = manager.get_object_data(user_id, policy_id, category_id=category_id) - elif element_type == "action_data": - data_db = manager.get_action_data(user_id, policy_id, category_id=category_id) - elif element_type == "meta_rule": - data_db = manager.get_meta_rules(user_id) - elif element_type == "rule": - data_db = manager.get_rules(user_id, policy_id) - else: - raise BaseException("Conversion of {} not implemented yet!".format(element_type)) - - if isinstance(data_db, dict): - for key_id in data_db: - if isinstance(data_db[key_id], dict) and "name" in data_db[key_id]: - if data_db[key_id]["name"] == element_name: - return key_id - else: - for elt in data_db: - if isinstance(elt, - dict) and "data" in elt: # we handle here subject_data, object_data and action_data... - for data_key in elt["data"]: - # logger.info("data from the db {} ".format(elt["data"][data_key])) - data = elt["data"][data_key] - if "name" in data and data["name"] == element_name: - return data_key - if "value" in data and data["value"]["name"] == element_name: - return data_key - return None - - @staticmethod - def convert_name_to_id(json_in, json_out, field_name_in, field_name_out, element_type, manager, - user_id, policy_id=None, category_id=None, meta_rule_id=None, - field_mandatory=True): - if field_name_in not in json_in: - raise UnknownField("The field {} is not in the input json".format(field_name_in)) - - if "id" in json_in[field_name_in]: - data_db = JsonUtils._get_element_in_db_from_id(element_type, - json_in[field_name_in]["id"], user_id, - policy_id, category_id, meta_rule_id, - manager) - if data_db is None: - raise UnknownId("No {} with id {} found in database".format(element_type, - json_in[field_name_in]["id"])) - json_out[field_name_out] = json_in[field_name_in]["id"] - - elif "name" in json_in[field_name_in]: - id_in_db = JsonUtils._get_element_id_in_db_from_name(element_type, - json_in[field_name_in]["name"], - user_id, policy_id, category_id, - meta_rule_id, manager) - if id_in_db is None: - raise UnknownName( - "No {} with name {} found in database".format(element_type, - json_in[field_name_in]["name"])) - json_out[field_name_out] = id_in_db - elif field_mandatory is True: - raise MissingIdOrName("No id or name found in the input json {}".format(json_in)) - - @staticmethod - def convert_id_to_name(id_, json_out, field_name_out, element_type, manager, user_id, - policy_id=None, category_id=None, meta_rule_id=None): - json_out[field_name_out] = { - "name": JsonUtils.convert_id_to_name_string(id_, element_type, manager, user_id, - policy_id, category_id, meta_rule_id)} - - @staticmethod - def __convert_results_to_element(element): - if isinstance(element, dict) and "name" not in element and "value" not in element: - list_values = [v for v in element.values()] - elif isinstance(element, list): - list_values = element - else: - list_values = [] - list_values.append(element) - return list_values[0] - - @staticmethod - def convert_id_to_name_string(id_, element_type, manager, user_id, - policy_id=None, category_id=None, meta_rule_id=None): - - element = JsonUtils._get_element_in_db_from_id(element_type, id_, user_id, policy_id, - category_id, meta_rule_id, manager) - # logger.info(element) - if element is None: - raise UnknownId("No {} with id {} found in database".format(element_type, id_)) - res = JsonUtils.__convert_results_to_element(element) - # logger.info(res) - if "name" in res: - return res["name"] - if "value" in res and "name" in res["value"]: - return res["value"]["name"] - return None - - @staticmethod - def convert_names_to_ids(json_in, json_out, field_name_in, field_name_out, element_type, - manager, user_id, policy_id=None, category_id=None, meta_rule_id=None, - field_mandatory=True): - ids = [] - if field_name_in not in json_in: - raise UnknownField("The field {} is not in the input json".format(field_name_in)) - - for elt in json_in[field_name_in]: - if "id" in elt: - data_db = JsonUtils._get_element_in_db_from_id(element_type, elt["id"], user_id, - policy_id, category_id, - meta_rule_id, manager) - if data_db is None: - raise UnknownId( - "No {} with id {} found in database".format(element_type, elt["id"])) - ids.append(elt["id"]) - elif "name" in elt: - id_in_db = JsonUtils._get_element_id_in_db_from_name(element_type, elt["name"], - user_id, policy_id, - category_id, meta_rule_id, - manager) - if id_in_db is None: - raise UnknownName( - "No {} with name {} found in database".format(element_type, elt["name"])) - ids.append(id_in_db) - elif field_mandatory is True: - raise MissingIdOrName("No id or name found in the input json {}".format(elt)) - json_out[field_name_out] = ids - - @staticmethod - def convert_ids_to_names(ids, json_out, field_name_out, element_type, manager, user_id, - policy_id=None, category_id=None, meta_rule_id=None): - res_array = [] - for id_ in ids: - element = JsonUtils._get_element_in_db_from_id(element_type, id_, user_id, policy_id, - category_id, meta_rule_id, manager) - if element is None: - raise UnknownId("No {} with id {} found in database".format(element_type, id_)) - res = JsonUtils.__convert_results_to_element(element) - # logger.info(res) - if "name" in res: - res_array.append({"name": res["name"]}) - if "value" in res and "name" in res["value"]: - res_array.append({"name": res["value"]["name"]}) - json_out[field_name_out] = res_array |