diff options
Diffstat (limited to 'moon_manager/moon_manager/api/json_utils.py')
-rw-r--r-- | moon_manager/moon_manager/api/json_utils.py | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/moon_manager/moon_manager/api/json_utils.py b/moon_manager/moon_manager/api/json_utils.py new file mode 100644 index 00000000..cc4c8b0f --- /dev/null +++ b/moon_manager/moon_manager/api/json_utils.py @@ -0,0 +1,255 @@ +import logging +from moon_manager.api.base_exception import BaseException + +logger = logging.getLogger("moon.manager.api." + __name__) + + +class UnknownName(BaseException): + def __init__(self, message): + + # Call the base class constructor with the parameters it needs + super(UnknownName, self).__init__(message) + + +class UnknownId(BaseException): + def __init__(self, message): + + # Call the base class constructor with the parameters it needs + super(UnknownId, self).__init__(message) + + +class MissingIdOrName(BaseException): + def __init__(self, message): + + # Call the base class constructor with the parameters it needs + super(MissingIdOrName, self).__init__(message) + + +class UnknownField(BaseException): + def __init__(self, message): + + # Call the base class constructor with the parameters it needs + super(UnknownField, self).__init__(message) + + +class JsonUtils: + @staticmethod + def get_override(json_content): + if "override" in json_content: + return json_content["override"] + return False + + @staticmethod + def get_mandatory(json_content): + if "mandatory" in json_content: + return json_content["mandatory"] + return False + + @staticmethod + def copy_field_if_exists(json_in, json_out, field_name, type_field, default_value=None): + if field_name in json_in: + json_out[field_name] = json_in[field_name] + else: + if type_field is bool: + if default_value is None: + default_value = False + json_out[field_name] = default_value + if type_field is str: + if default_value is None: + default_value = "" + json_out[field_name] = default_value + if type_field is dict: + json_out[field_name] = dict() + if type_field is list: + json_out[field_name] = [] + + @staticmethod + def _get_element_in_db_from_id(element_type, element_id, user_id, policy_id, category_id, meta_rule_id, manager): + # the item is supposed to be in the db, we check it exists! + if element_type == "model": + data_db = manager.get_models(user_id, model_id=element_id) + elif element_type == "policy": + data_db = manager.get_policies(user_id, policy_id=element_id) + elif element_type == "subject": + data_db = manager.get_subjects(user_id, policy_id, perimeter_id=element_id) + elif element_type == "object": + data_db = manager.get_objects(user_id, policy_id, perimeter_id=element_id) + elif element_type == "action": + data_db = manager.get_actions(user_id, policy_id, perimeter_id=element_id) + elif element_type == "subject_category": + data_db = manager.get_subject_categories(user_id, category_id=element_id) + elif element_type == "object_category": + data_db = manager.get_object_categories(user_id, category_id=element_id) + elif element_type == "action_category": + data_db = manager.get_action_categories(user_id, category_id=element_id) + elif element_type == "meta_rule": + data_db = manager.get_meta_rules(user_id, meta_rule_id=element_id) + elif element_type == "subject_data": + data_db = manager.get_subject_data(user_id, policy_id, data_id=element_id, category_id=category_id) + elif element_type == "object_data": + data_db = manager.get_object_data(user_id, policy_id, data_id=element_id, category_id=category_id) + elif element_type == "action_data": + data_db = manager.get_action_data(user_id, policy_id, data_id=element_id, category_id=category_id) + elif element_type == "meta_rule": + data_db = manager.get_meta_rules(user_id, meta_rule_id=meta_rule_id) + else: + raise Exception("Conversion of {} not implemented yet!".format(element_type)) + + # logger.info(data_db) + + # do some post processing ... the result should be {key : { .... .... } } + if element_type == "subject_data" or element_type == "object_data" or element_type == "action_data": + if data_db is not None and isinstance(data_db, list): + # TODO remove comments after fixing the bug on moondb when adding metarule : we can have several identical entries ! + #if len(data_db) > 1: + # raise Exception("Several {} with the same id : {}".format(element_type, data_db)) + data_db = data_db[0] + + if data_db is not None and data_db["data"] is not None and isinstance(data_db["data"], dict): + # TODO remove comments after fixing the bug on moondb when adding metarule : we can have several identical entries ! + #if len(data_db["data"].values()) != 1: + # raise Exception("Several {} with the same id : {}".format(element_type, data_db)) + #data_db = data_db["data"] + # TODO remove these two lines after fixing the bug on moondb when adding metarule : we can have several identical entries ! + list_values = list(data_db["data"].values()) + data_db = list_values[0] + # logger.info("subject data after postprocessing {}".format(data_db)) + return data_db + + @staticmethod + def _get_element_id_in_db_from_name(element_type, element_name, user_id, policy_id, category_id, meta_rule_id, manager): + if element_type == "model": + data_db = manager.get_models(user_id) + elif element_type == "policy": + data_db = manager.get_policies(user_id) + elif element_type == "subject": + data_db = manager.get_subjects(user_id, policy_id) + elif element_type == "object": + data_db = manager.get_objects(user_id, policy_id) + elif element_type == "action": + data_db = manager.get_actions(user_id, policy_id) + elif element_type == "subject_category": + data_db = manager.get_subject_categories(user_id) + elif element_type == "object_category": + data_db = manager.get_object_categories(user_id) + elif element_type == "action_category": + data_db = manager.get_action_categories(user_id) + elif element_type == "meta_rule": + data_db = manager.get_meta_rules(user_id) + elif element_type == "subject_data": + data_db = manager.get_subject_data(user_id, policy_id, category_id=category_id) + elif element_type == "object_data": + data_db = manager.get_object_data(user_id, policy_id, category_id=category_id) + elif element_type == "action_data": + data_db = manager.get_action_data(user_id, policy_id, category_id=category_id) + elif element_type == "meta_rule": + data_db = manager.get_meta_rules(user_id) + elif element_type == "rule": + data_db = manager.get_rules(user_id, policy_id) + else: + raise BaseException("Conversion of {} not implemented yet!".format(element_type)) + + if isinstance(data_db, dict): + for key_id in data_db: + if isinstance(data_db[key_id], dict) and "name" in data_db[key_id]: + if data_db[key_id]["name"] == element_name: + return key_id + else: + for elt in data_db: + if isinstance(elt, dict) and "data" in elt: # we handle here subject_data, object_data and action_data... + for data_key in elt["data"]: + # logger.info("data from the db {} ".format(elt["data"][data_key])) + data = elt["data"][data_key] + if "name" in data and data["name"] == element_name: + return data_key + if "value" in data and data["value"]["name"] == element_name: + return data_key + return None + + @staticmethod + def convert_name_to_id(json_in, json_out, field_name_in, field_name_out, element_type, manager, user_id, policy_id=None, category_id=None, meta_rule_id=None, field_mandatory=True): + if field_name_in not in json_in: + raise UnknownField("The field {} is not in the input json".format(field_name_in)) + + if "id" in json_in[field_name_in]: + data_db = JsonUtils._get_element_in_db_from_id(element_type, json_in[field_name_in]["id"], user_id, policy_id, category_id, meta_rule_id, manager) + if data_db is None: + raise UnknownId("No {} with id {} found in database".format(element_type, json_in[field_name_in]["id"])) + json_out[field_name_out] = json_in[field_name_in]["id"] + + elif "name" in json_in[field_name_in]: + id_in_db = JsonUtils._get_element_id_in_db_from_name(element_type, json_in[field_name_in]["name"], user_id, policy_id, category_id, meta_rule_id, manager) + if id_in_db is None: + raise UnknownName("No {} with name {} found in database".format(element_type,json_in[field_name_in]["name"])) + json_out[field_name_out] = id_in_db + elif field_mandatory is True: + raise MissingIdOrName("No id or name found in the input json {}".format(json_in)) + + @staticmethod + def convert_id_to_name(id_, json_out, field_name_out, element_type, manager, user_id, + policy_id=None, category_id=None, meta_rule_id=None): + json_out[field_name_out] = {"name": JsonUtils.convert_id_to_name_string(id_, element_type, manager, user_id, policy_id, category_id, meta_rule_id)} + + @staticmethod + def __convert_results_to_element(element): + if isinstance(element, dict) and "name" not in element and "value" not in element: + list_values = [v for v in element.values()] + elif isinstance(element, list): + list_values = element + else: + list_values = [] + list_values.append(element) + return list_values[0] + + @staticmethod + def convert_id_to_name_string(id_, element_type, manager, user_id, + policy_id=None, category_id=None, meta_rule_id=None): + + element = JsonUtils._get_element_in_db_from_id(element_type, id_, user_id, policy_id, category_id, meta_rule_id, manager) + # logger.info(element) + if element is None: + raise UnknownId("No {} with id {} found in database".format(element_type, id_)) + res = JsonUtils.__convert_results_to_element(element) + # logger.info(res) + if "name" in res: + return res["name"] + if "value" in res and "name" in res["value"]: + return res["value"]["name"] + return None + + @staticmethod + def convert_names_to_ids(json_in, json_out, field_name_in, field_name_out, element_type, manager, user_id, policy_id=None, category_id=None, meta_rule_id=None, field_mandatory=True): + ids = [] + if field_name_in not in json_in: + raise UnknownField("The field {} is not in the input json".format(field_name_in)) + + for elt in json_in[field_name_in]: + if "id" in elt: + data_db = JsonUtils._get_element_in_db_from_id(element_type, elt["id"], user_id, policy_id, category_id, meta_rule_id, manager) + if data_db is None: + raise UnknownId("No {} with id {} found in database".format(element_type, elt["id"])) + ids.append(elt["id"]) + elif "name" in elt: + id_in_db = JsonUtils._get_element_id_in_db_from_name(element_type, elt["name"], user_id, policy_id, category_id, meta_rule_id, manager) + if id_in_db is None: + raise UnknownName("No {} with name {} found in database".format(element_type, elt["name"])) + ids.append(id_in_db) + elif field_mandatory is True: + raise MissingIdOrName("No id or name found in the input json {}".format(elt)) + json_out[field_name_out] = ids + + @staticmethod + def convert_ids_to_names(ids, json_out, field_name_out, element_type, manager, user_id, policy_id=None, category_id=None, meta_rule_id=None): + res_array = [] + for id_ in ids: + element = JsonUtils._get_element_in_db_from_id(element_type, id_, user_id, policy_id, category_id, meta_rule_id, manager) + if element is None: + raise UnknownId("No {} with id {} found in database".format(element_type, id_)) + res = JsonUtils.__convert_results_to_element(element) + # logger.info(res) + if "name" in res: + res_array.append({"name": res["name"]}) + if "value" in res and "name" in res["value"]: + res_array.append({"name": res["value"]["name"]}) + json_out[field_name_out] = res_array + |