aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/moon_manager/api/json_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/moon_manager/api/json_utils.py')
-rw-r--r--moon_manager/moon_manager/api/json_utils.py282
1 files changed, 0 insertions, 282 deletions
diff --git a/moon_manager/moon_manager/api/json_utils.py b/moon_manager/moon_manager/api/json_utils.py
deleted file mode 100644
index 6a5830f1..00000000
--- a/moon_manager/moon_manager/api/json_utils.py
+++ /dev/null
@@ -1,282 +0,0 @@
-import logging
-from moon_manager.api.base_exception import BaseException
-
-logger = logging.getLogger("moon.manager.api." + __name__)
-
-
-class UnknownName(BaseException):
- def __init__(self, message):
- # Call the base class constructor with the parameters it needs
- super(UnknownName, self).__init__(message)
-
-
-class UnknownId(BaseException):
- def __init__(self, message):
- # Call the base class constructor with the parameters it needs
- super(UnknownId, self).__init__(message)
-
-
-class MissingIdOrName(BaseException):
- def __init__(self, message):
- # Call the base class constructor with the parameters it needs
- super(MissingIdOrName, self).__init__(message)
-
-
-class UnknownField(BaseException):
- def __init__(self, message):
- # Call the base class constructor with the parameters it needs
- super(UnknownField, self).__init__(message)
-
-
-class JsonUtils:
- @staticmethod
- def get_override(json_content):
- if "override" in json_content:
- return json_content["override"]
- return False
-
- @staticmethod
- def get_mandatory(json_content):
- if "mandatory" in json_content:
- return json_content["mandatory"]
- return False
-
- @staticmethod
- def copy_field_if_exists(json_in, json_out, field_name, type_field, default_value=None):
- if field_name in json_in:
- json_out[field_name] = json_in[field_name]
- else:
- if type_field is bool:
- if default_value is None:
- default_value = False
- json_out[field_name] = default_value
- if type_field is str:
- if default_value is None:
- default_value = ""
- json_out[field_name] = default_value
- if type_field is dict:
- json_out[field_name] = dict()
- if type_field is list:
- json_out[field_name] = []
-
- @staticmethod
- def _get_element_in_db_from_id(element_type, element_id, user_id, policy_id, category_id,
- meta_rule_id, manager):
- # the item is supposed to be in the db, we check it exists!
- if element_type == "model":
- data_db = manager.get_models(user_id, model_id=element_id)
- elif element_type == "policy":
- data_db = manager.get_policies(user_id, policy_id=element_id)
- elif element_type == "subject":
- data_db = manager.get_subjects(user_id, policy_id, perimeter_id=element_id)
- elif element_type == "object":
- data_db = manager.get_objects(user_id, policy_id, perimeter_id=element_id)
- elif element_type == "action":
- data_db = manager.get_actions(user_id, policy_id, perimeter_id=element_id)
- elif element_type == "subject_category":
- data_db = manager.get_subject_categories(user_id, category_id=element_id)
- elif element_type == "object_category":
- data_db = manager.get_object_categories(user_id, category_id=element_id)
- elif element_type == "action_category":
- data_db = manager.get_action_categories(user_id, category_id=element_id)
- elif element_type == "meta_rule":
- data_db = manager.get_meta_rules(user_id, meta_rule_id=element_id)
- elif element_type == "subject_data":
- data_db = manager.get_subject_data(user_id, policy_id, data_id=element_id,
- category_id=category_id)
- elif element_type == "object_data":
- data_db = manager.get_object_data(user_id, policy_id, data_id=element_id,
- category_id=category_id)
- elif element_type == "action_data":
- data_db = manager.get_action_data(user_id, policy_id, data_id=element_id,
- category_id=category_id)
- elif element_type == "meta_rule":
- data_db = manager.get_meta_rules(user_id, meta_rule_id=meta_rule_id)
- else:
- raise Exception("Conversion of {} not implemented yet!".format(element_type))
-
- # logger.info(data_db)
-
- # do some post processing ... the result should be {key : { .... .... } }
- if element_type == "subject_data" or element_type == "object_data" or element_type == "action_data":
- if data_db is not None and isinstance(data_db, list):
- # TODO remove comments after fixing the bug on moondb when adding metarule : we can have several identical entries !
- # if len(data_db) > 1:
- # raise Exception("Several {} with the same id : {}".format(element_type, data_db))
- data_db = data_db[0]
-
- if data_db is not None and data_db["data"] is not None and isinstance(data_db["data"],
- dict):
- # TODO remove comments after fixing the bug on moondb when adding metarule : we can have several identical entries !
- # if len(data_db["data"].values()) != 1:
- # raise Exception("Several {} with the same id : {}".format(element_type, data_db))
- # data_db = data_db["data"]
- # TODO remove these two lines after fixing the bug on moondb when adding metarule : we can have several identical entries !
- list_values = list(data_db["data"].values())
- data_db = list_values[0]
- # logger.info("subject data after postprocessing {}".format(data_db))
- return data_db
-
- @staticmethod
- def _get_element_id_in_db_from_name(element_type, element_name, user_id, policy_id, category_id,
- meta_rule_id, manager):
- if element_type == "model":
- data_db = manager.get_models(user_id)
- elif element_type == "policy":
- data_db = manager.get_policies(user_id)
- elif element_type == "subject":
- data_db = manager.get_subjects(user_id, policy_id)
- elif element_type == "object":
- data_db = manager.get_objects(user_id, policy_id)
- elif element_type == "action":
- data_db = manager.get_actions(user_id, policy_id)
- elif element_type == "subject_category":
- data_db = manager.get_subject_categories(user_id)
- elif element_type == "object_category":
- data_db = manager.get_object_categories(user_id)
- elif element_type == "action_category":
- data_db = manager.get_action_categories(user_id)
- elif element_type == "meta_rule":
- data_db = manager.get_meta_rules(user_id)
- elif element_type == "subject_data":
- data_db = manager.get_subject_data(user_id, policy_id, category_id=category_id)
- elif element_type == "object_data":
- data_db = manager.get_object_data(user_id, policy_id, category_id=category_id)
- elif element_type == "action_data":
- data_db = manager.get_action_data(user_id, policy_id, category_id=category_id)
- elif element_type == "meta_rule":
- data_db = manager.get_meta_rules(user_id)
- elif element_type == "rule":
- data_db = manager.get_rules(user_id, policy_id)
- else:
- raise BaseException("Conversion of {} not implemented yet!".format(element_type))
-
- if isinstance(data_db, dict):
- for key_id in data_db:
- if isinstance(data_db[key_id], dict) and "name" in data_db[key_id]:
- if data_db[key_id]["name"] == element_name:
- return key_id
- else:
- for elt in data_db:
- if isinstance(elt,
- dict) and "data" in elt: # we handle here subject_data, object_data and action_data...
- for data_key in elt["data"]:
- # logger.info("data from the db {} ".format(elt["data"][data_key]))
- data = elt["data"][data_key]
- if "name" in data and data["name"] == element_name:
- return data_key
- if "value" in data and data["value"]["name"] == element_name:
- return data_key
- return None
-
- @staticmethod
- def convert_name_to_id(json_in, json_out, field_name_in, field_name_out, element_type, manager,
- user_id, policy_id=None, category_id=None, meta_rule_id=None,
- field_mandatory=True):
- if field_name_in not in json_in:
- raise UnknownField("The field {} is not in the input json".format(field_name_in))
-
- if "id" in json_in[field_name_in]:
- data_db = JsonUtils._get_element_in_db_from_id(element_type,
- json_in[field_name_in]["id"], user_id,
- policy_id, category_id, meta_rule_id,
- manager)
- if data_db is None:
- raise UnknownId("No {} with id {} found in database".format(element_type,
- json_in[field_name_in]["id"]))
- json_out[field_name_out] = json_in[field_name_in]["id"]
-
- elif "name" in json_in[field_name_in]:
- id_in_db = JsonUtils._get_element_id_in_db_from_name(element_type,
- json_in[field_name_in]["name"],
- user_id, policy_id, category_id,
- meta_rule_id, manager)
- if id_in_db is None:
- raise UnknownName(
- "No {} with name {} found in database".format(element_type,
- json_in[field_name_in]["name"]))
- json_out[field_name_out] = id_in_db
- elif field_mandatory is True:
- raise MissingIdOrName("No id or name found in the input json {}".format(json_in))
-
- @staticmethod
- def convert_id_to_name(id_, json_out, field_name_out, element_type, manager, user_id,
- policy_id=None, category_id=None, meta_rule_id=None):
- json_out[field_name_out] = {
- "name": JsonUtils.convert_id_to_name_string(id_, element_type, manager, user_id,
- policy_id, category_id, meta_rule_id)}
-
- @staticmethod
- def __convert_results_to_element(element):
- if isinstance(element, dict) and "name" not in element and "value" not in element:
- list_values = [v for v in element.values()]
- elif isinstance(element, list):
- list_values = element
- else:
- list_values = []
- list_values.append(element)
- return list_values[0]
-
- @staticmethod
- def convert_id_to_name_string(id_, element_type, manager, user_id,
- policy_id=None, category_id=None, meta_rule_id=None):
-
- element = JsonUtils._get_element_in_db_from_id(element_type, id_, user_id, policy_id,
- category_id, meta_rule_id, manager)
- # logger.info(element)
- if element is None:
- raise UnknownId("No {} with id {} found in database".format(element_type, id_))
- res = JsonUtils.__convert_results_to_element(element)
- # logger.info(res)
- if "name" in res:
- return res["name"]
- if "value" in res and "name" in res["value"]:
- return res["value"]["name"]
- return None
-
- @staticmethod
- def convert_names_to_ids(json_in, json_out, field_name_in, field_name_out, element_type,
- manager, user_id, policy_id=None, category_id=None, meta_rule_id=None,
- field_mandatory=True):
- ids = []
- if field_name_in not in json_in:
- raise UnknownField("The field {} is not in the input json".format(field_name_in))
-
- for elt in json_in[field_name_in]:
- if "id" in elt:
- data_db = JsonUtils._get_element_in_db_from_id(element_type, elt["id"], user_id,
- policy_id, category_id,
- meta_rule_id, manager)
- if data_db is None:
- raise UnknownId(
- "No {} with id {} found in database".format(element_type, elt["id"]))
- ids.append(elt["id"])
- elif "name" in elt:
- id_in_db = JsonUtils._get_element_id_in_db_from_name(element_type, elt["name"],
- user_id, policy_id,
- category_id, meta_rule_id,
- manager)
- if id_in_db is None:
- raise UnknownName(
- "No {} with name {} found in database".format(element_type, elt["name"]))
- ids.append(id_in_db)
- elif field_mandatory is True:
- raise MissingIdOrName("No id or name found in the input json {}".format(elt))
- json_out[field_name_out] = ids
-
- @staticmethod
- def convert_ids_to_names(ids, json_out, field_name_out, element_type, manager, user_id,
- policy_id=None, category_id=None, meta_rule_id=None):
- res_array = []
- for id_ in ids:
- element = JsonUtils._get_element_in_db_from_id(element_type, id_, user_id, policy_id,
- category_id, meta_rule_id, manager)
- if element is None:
- raise UnknownId("No {} with id {} found in database".format(element_type, id_))
- res = JsonUtils.__convert_results_to_element(element)
- # logger.info(res)
- if "name" in res:
- res_array.append({"name": res["name"]})
- if "value" in res and "name" in res["value"]:
- res_array.append({"name": res["value"]["name"]})
- json_out[field_name_out] = res_array