From 7bb53c64da2dcf88894bfd31503accdd81498f3d Mon Sep 17 00:00:00 2001 From: Thomas Duval Date: Wed, 3 Jun 2020 10:06:52 +0200 Subject: Update to new version 5.4 Signed-off-by: Thomas Duval Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea --- .../python_moonclient/core/models.py | 279 +++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 old/python_moonclient/python_moonclient/core/models.py (limited to 'old/python_moonclient/python_moonclient/core/models.py') diff --git a/old/python_moonclient/python_moonclient/core/models.py b/old/python_moonclient/python_moonclient/core/models.py new file mode 100644 index 00000000..8d3c8858 --- /dev/null +++ b/old/python_moonclient/python_moonclient/core/models.py @@ -0,0 +1,279 @@ +import logging +import copy +import requests +from python_moonclient.core import config +from python_moonclient.core.check_tools import * + +LOGGER = logging.getLogger("moonclient.core.models") + +URL = None +HEADERS = None + +model_template = { + "name": "test_model", + "description": "test", + "meta_rules": [] +} + +category_template = { + "name": "name of the category", + "description": "description of the category" +} + +meta_rule_template = { + "name": "test_meta_rule", + "subject_categories": [], + "object_categories": [], + "action_categories": [] +} + + +def init(consul_host, consul_port): + conf_data = config.get_config_data(consul_host, consul_port) + global URL, HEADERS + URL = "http://{}:{}".format( + conf_data['manager_host'], + conf_data['manager_port']) + URL = URL + "{}" + HEADERS = {"content-type": "application/json"} + + +def check_model(model_id=None, do_check_model_name=True): + req = requests.get(URL.format("/models")) + req.raise_for_status() + result = req.json() + check_model_in_result(result) + if model_id: + check_model_name(model_template["name"], model_id, result, do_check_model_name) + return result + + +def add_model(name=None): + if name: + model_template['name'] = name + req = requests.post(URL.format("/models"), json=model_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_model_in_result(result) + model_id = list(result['models'].keys())[0] + check_model_name(model_template["name"], model_id, result, True) + return model_id + + +def delete_model(model_id): + req = requests.delete(URL.format("/models/{}".format(model_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + +def add_subject_category(name="subject_cat_1"): + category_template["name"] = name + req = requests.post(URL.format("/subject_categories"), json=category_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + + check_subject_category_in_result(result) + category_id = list(result['subject_categories'].keys())[0] + check_optionnal_result(result) + check_subject_categories_name(category_template["name"], category_id, result) + return category_id + + +def check_subject_category(category_id=None): + req = requests.get(URL.format("/subject_categories")) + req.raise_for_status() + result = req.json() + + check_subject_category_in_result(result) + check_optionnal_result(result) + if category_id is not None: + check_subject_categories_name(category_template["name"], category_id, result) + return result + + +def delete_subject_category(category_id): + req = requests.delete(URL.format("/subject_categories/{}".format(category_id))) + req.raise_for_status() + result = req.json() + check_optionnal_result(result) + + +def add_object_category(name="object_cat_1"): + category_template["name"] = name + req = requests.post(URL.format("/object_categories"), json=category_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_object_category_in_result(result) + category_id = list(result['object_categories'].keys())[0] + check_optionnal_result(result) + check_object_categories_name(category_template["name"], category_id, result) + return category_id + + +def check_object_category(category_id=None): + req = requests.get(URL.format("/object_categories")) + req.raise_for_status() + result = req.json() + check_object_category_in_result(result) + check_optionnal_result(result) + if category_id is not None: + check_object_categories_name(category_template["name"], category_id, result) + return result + + +def delete_object_category(category_id): + req = requests.delete(URL.format("/object_categories/{}".format(category_id))) + req.raise_for_status() + result = req.json() + check_optionnal_result(result) + + +def add_action_category(name="action_cat_1"): + category_template["name"] = name + req = requests.post(URL.format("/action_categories"), json=category_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_action_category_in_result(result) + category_id = list(result['action_categories'].keys())[0] + check_optionnal_result(result) + check_action_categories_name(category_template["name"], category_id, result) + return category_id + + +def check_action_category(category_id=None): + req = requests.get(URL.format("/action_categories")) + req.raise_for_status() + result = req.json() + print(result) + check_action_category_in_result(result) + check_optionnal_result(result) + if category_id is not None: + check_action_categories_name(category_template["name"], category_id, result) + return result + + +def delete_action_category(category_id): + req = requests.delete(URL.format("/action_categories/{}".format(category_id))) + req.raise_for_status() + result = req.json() + check_optionnal_result(result) + + +def add_categories_and_meta_rule(name="test_meta_rule"): + scat_id = add_subject_category() + ocat_id = add_object_category() + acat_id = add_action_category() + _meta_rule_template = copy.deepcopy(meta_rule_template) + _meta_rule_template["name"] = name + _meta_rule_template["subject_categories"].append(scat_id) + _meta_rule_template["object_categories"].append(ocat_id) + _meta_rule_template["action_categories"].append(acat_id) + req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_meta_rule_in_result(result) + meta_rule_id = list(result['meta_rules'].keys())[0] + check_optionnal_result(result) + check_meta_rules_name(_meta_rule_template["name"], meta_rule_id, result) + return meta_rule_id, scat_id, ocat_id, acat_id + + +def add_meta_rule(name="test_meta_rule", scat=[], ocat=[], acat=[]): + _meta_rule_template = copy.deepcopy(meta_rule_template) + _meta_rule_template["name"] = name + _meta_rule_template["subject_categories"] = [] + _meta_rule_template["subject_categories"].extend(scat) + _meta_rule_template["object_categories"] = [] + _meta_rule_template["object_categories"].extend(ocat) + _meta_rule_template["action_categories"] = [] + _meta_rule_template["action_categories"].extend(acat) + req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_meta_rule_in_result(result) + meta_rule_id = list(result['meta_rules'].keys())[0] + check_optionnal_result(result) + check_meta_rules_name(_meta_rule_template["name"], meta_rule_id, result) + return meta_rule_id + + +def check_meta_rule(meta_rule_id, scat_id=None, ocat_id=None, acat_id=None): + req = requests.get(URL.format("/meta_rules")) + req.raise_for_status() + result = req.json() + check_meta_rule_in_result(result) + check_optionnal_result(result) + if not meta_rule_id: + return result + check_meta_rules_name(None, meta_rule_id, result) + if scat_id: + check_scat_id_in_dict(scat_id, result['meta_rules'][meta_rule_id]["subject_categories"]) + if ocat_id: + check_ocat_id_in_dict(ocat_id, result['meta_rules'][meta_rule_id]["object_categories"]) + if acat_id: + check_acat_id_in_dict(acat_id, result['meta_rules'][meta_rule_id]["action_categories"]) + return result + + +def delete_meta_rule(meta_rule_id): + req = requests.delete(URL.format("/meta_rules/{}".format(meta_rule_id))) + req.raise_for_status() + result = req.json() + check_optionnal_result(result) + + +def add_meta_rule_to_model(model_id, meta_rule_id): + model = check_model(model_id, do_check_model_name=False)['models'] + meta_rule_list = model[model_id]["meta_rules"] + if meta_rule_id not in meta_rule_list: + meta_rule_list.append(meta_rule_id) + req = requests.patch(URL.format("/models/{}".format(model_id)), + json={"meta_rules": meta_rule_list}, + headers=HEADERS) + req.raise_for_status() + result = req.json() + check_model_in_result(result) + model_id = list(result['models'].keys())[0] + check_optionnal_result(result) + check_meta_rules_list_in_model(meta_rule_list, model_id, result) + + +def create_model(scenario, model_id=None): + LOGGER.info("Creating model {}".format(scenario.model_name)) + if not model_id: + LOGGER.info("Add model") + model_id = add_model(name=scenario.model_name) + LOGGER.info("Add subject categories") + for cat in scenario.subject_categories: + scenario.subject_categories[cat] = add_subject_category(name=cat) + LOGGER.info("Add object categories") + for cat in scenario.object_categories: + scenario.object_categories[cat] = add_object_category(name=cat) + LOGGER.info("Add action categories") + for cat in scenario.action_categories: + scenario.action_categories[cat] = add_action_category(name=cat) + sub_cat = [] + ob_cat = [] + act_cat = [] + meta_rule_list = [] + for item_name, item_value in scenario.meta_rule.items(): + for item in item_value["value"]: + if item in scenario.subject_categories: + sub_cat.append(scenario.subject_categories[item]) + elif item in scenario.object_categories: + ob_cat.append(scenario.object_categories[item]) + elif item in scenario.action_categories: + act_cat.append(scenario.action_categories[item]) + meta_rules = check_meta_rule(meta_rule_id=None) + for _meta_rule_id, _meta_rule_value in meta_rules['meta_rules'].items(): + if _meta_rule_value['name'] == item_name: + meta_rule_id = _meta_rule_id + break + else: + LOGGER.info("Add meta rule") + meta_rule_id = add_meta_rule(item_name, sub_cat, ob_cat, act_cat) + item_value["id"] = meta_rule_id + if meta_rule_id not in meta_rule_list: + meta_rule_list.append(meta_rule_id) + return model_id, meta_rule_list -- cgit 1.2.3-korg