diff options
Diffstat (limited to 'old/python_moonclient/python_moonclient/core/policies.py')
-rw-r--r-- | old/python_moonclient/python_moonclient/core/policies.py | 673 |
1 files changed, 673 insertions, 0 deletions
diff --git a/old/python_moonclient/python_moonclient/core/policies.py b/old/python_moonclient/python_moonclient/core/policies.py new file mode 100644 index 00000000..b9b05dd8 --- /dev/null +++ b/old/python_moonclient/python_moonclient/core/policies.py @@ -0,0 +1,673 @@ +import logging +import requests +from python_moonclient.core import models, config +from python_moonclient.core.check_tools import * + +LOGGER = logging.getLogger("moonclient.core.policies") + +URL = None +HEADERS = None + +policy_template = { + "name": "test_policy", + "model_id": "", + "genre": "authz", + "description": "test", +} + +subject_template = { + "name": "test_subject", + "description": "test", + "email": "mail", + "password": "my_pass", +} + +object_template = { + "name": "test_subject", + "description": "test" +} + +action_template = { + "name": "test_subject", + "description": "test" +} + +subject_data_template = { + "name": "subject_data1", + "description": "description of the data subject" +} + +object_data_template = { + "name": "object_data1", + "description": "description of the data subject" +} + +action_data_template = { + "name": "action_data1", + "description": "description of the data subject" +} + +subject_assignment_template = { + "id": "", + "category_id": "", + "scope_id": "" +} + + +def init(consul_host, consul_port): + conf_data = config.get_config_data(consul_host, consul_port) + global URL, HEADERS + URL = "http://{}:{}".format( + conf_data['manager_host'], + conf_data['manager_port']) + URL = URL + "{}" + HEADERS = {"content-type": "application/json"} + + +def check_policy(policy_id=None): + req = requests.get(URL.format("/policies")) + req.raise_for_status() + result = req.json() + check_policy_in_result(result) + if policy_id: + check_policy_name(policy_template["name"], policy_id, result) + return result + + +def add_policy(name="test_policy", genre="authz"): + policy_template["name"] = name + policy_template["genre"] = genre + req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_policy_in_result(result) + policy_id = list(result['policies'].keys())[0] + check_optionnal_result(result) + check_policy_name(policy_template["name"], policy_id, result) + return policy_id + + +def update_policy(policy_id, model_id): + req = requests.patch(URL.format("/policies/{}".format(policy_id)), + json={"model_id": model_id}, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_policy_in_result(result) + policy_id = list(result['policies'].keys())[0] + check_optionnal_result(result) + check_policy_model_id(model_id, policy_id, result) + + +def delete_policy(policy_id): + req = requests.delete(URL.format("/policies/{}".format(policy_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + +def add_subject(policy_id=None, name="test_subject"): + subject_template['name'] = name + if policy_id: + LOGGER.debug(URL.format("/policies/{}/subjects".format(policy_id))) + req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)), + json=subject_template, headers=HEADERS) + else: + LOGGER.debug(URL.format("/subjects")) + req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS) + LOGGER.debug(req.text) + req.raise_for_status() + result = req.json() + check_subject_in_result(result) + subject_id = list(result['subjects'].keys())[0] + return subject_id + + +def update_subject(subject_id, policy_id=None, description=None): + if policy_id and not description: + req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), + json={}) + elif policy_id and description: + req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), + json={"description": description}) + else: + req = requests.patch(URL.format("/subjects/{}".format(subject_id)), + json={"description": description}) + req.raise_for_status() + result = req.json() + check_subject_name(subject_template["name"], subject_id, result) + check_subject_policy(policy_id, result["subjects"][subject_id]) + check_subject_description(description, result["subjects"][subject_id]) + + +def check_subject(subject_id=None, policy_id=None): + if policy_id: + req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) + else: + req = requests.get(URL.format("/subjects")) + req.raise_for_status() + result = req.json() + check_subject_name(subject_template["name"], subject_id, result) + check_subject_policy(policy_id, result["subjects"][subject_id]) + + +def delete_subject(subject_id, policy_id=None): + if policy_id: + req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id))) + else: + req = requests.delete(URL.format("/subjects/{}".format(subject_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + if policy_id: + req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) + else: + req = requests.get(URL.format("/subjects")) + req.raise_for_status() + result = req.json() + check_subject_in_result(result) + if subject_id in result["subjects"]: + check_subject_name(subject_template["name"], subject_id, result) + check_subject_policy(policy_id, result["subjects"][subject_id]) + + +def add_object(policy_id=None, name="test_object"): + object_template['name'] = name + if policy_id: + req = requests.post(URL.format("/policies/{}/objects".format(policy_id)), + json=object_template, headers=HEADERS) + else: + req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_object_in_result(result) + object_id = list(result['objects'].keys())[0] + return object_id + + +def update_object(object_id, policy_id): + req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)), + json={}) + req.raise_for_status() + result = req.json() + check_object_in_result(result) + check_object_name(object_template["name"], object_id, result) + check_object_policy(policy_id, result["objects"][object_id]) + + +def check_object(object_id=None, policy_id=None): + if policy_id: + req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) + else: + req = requests.get(URL.format("/objects")) + req.raise_for_status() + result = req.json() + check_object_in_result(result) + check_object_name(object_template["name"], object_id, result) + if policy_id: + check_object_policy(policy_id, result["objects"][object_id]) + + +def delete_object(object_id, policy_id=None): + if policy_id: + req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id))) + else: + req = requests.delete(URL.format("/objects/{}".format(object_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + if policy_id: + req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) + else: + req = requests.get(URL.format("/objects")) + req.raise_for_status() + result = req.json() + check_object_in_result(result) + if object_id in result["objects"]: + check_object_name(object_template["name"], object_id, result) + if policy_id: + check_object_policy(policy_id, result["objects"][object_id]) + + +def add_action(policy_id=None, name="test_action"): + action_template['name'] = name + if policy_id: + req = requests.post(URL.format("/policies/{}/actions".format(policy_id)), + json=action_template, headers=HEADERS) + else: + req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_action_in_result(result) + action_id = list(result['actions'].keys())[0] + return action_id + + +def update_action(action_id, policy_id): + req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)), + json={}) + req.raise_for_status() + result = req.json() + check_action_in_result(result) + check_action_name(action_template["name"], action_id, result) + check_action_policy(policy_id, result["actions"][action_id]) + + +def check_action(action_id=None, policy_id=None): + if policy_id: + req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) + else: + req = requests.get(URL.format("/actions")) + req.raise_for_status() + result = req.json() + check_action_in_result(result) + check_action_name(action_template["name"], action_id, result) + if policy_id: + check_action_policy(policy_id, result["actions"][action_id]) + + +def delete_action(action_id, policy_id=None): + if policy_id: + req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id))) + else: + req = requests.delete(URL.format("/actions/{}".format(action_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + if policy_id: + req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) + else: + req = requests.get(URL.format("/actions")) + req.raise_for_status() + result = req.json() + check_action_in_result(result) + if action_id in result["actions"]: + check_action_name(action_template["name"], action_id, result) + if policy_id: + check_action_policy(policy_id, result["actions"][action_id]) + + +def add_subject_data(policy_id, category_id, name="subject_data1"): + subject_data_template['name'] = name + req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)), + json=subject_data_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_subject_data_data(result) + subject_id = list(result['subject_data']['data'].keys())[0] + return subject_id + + +def check_subject_data(policy_id, data_id, category_id): + req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) + req.raise_for_status() + result = req.json() + print(result) + if data_id is not None: + check_id_in_subject_data_data(data_id, result) + check_category_id_in_subject_data_data(category_id, result) + return result + + +def delete_subject_data(policy_id, category_id, data_id): + req = requests.delete( + URL.format("/policies/{}/subject_data/{}/{}".format(policy_id, category_id, data_id)), + headers=HEADERS) + req.raise_for_status() + req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) + req.raise_for_status() + result = req.json() + check_id_not_in_subject_data_data(data_id, result) + check_category_id_in_subject_data_data(category_id, result) + + +def add_object_data(policy_id, category_id, name="object_data1"): + object_data_template['name'] = name + req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)), + json=object_data_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_object_data_data(result) + object_id = list(result['object_data']['data'].keys())[0] + return object_id + + +def check_object_data(policy_id, data_id, category_id): + req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) + req.raise_for_status() + result = req.json() + if data_id is not None: + check_id_in_object_data_data(data_id, result) + check_category_id_in_object_data_data(category_id, result) + return result + + +def delete_object_data(policy_id, category_id, data_id): + req = requests.delete( + URL.format("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)), + headers=HEADERS) + req.raise_for_status() + req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) + req.raise_for_status() + result = req.json() + check_id_not_in_object_data_data(data_id, result) + check_category_id_in_object_data_data(category_id, result) + + +def add_action_data(policy_id, category_id, name="action_data1"): + action_data_template['name'] = name + req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)), + json=action_data_template, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_action_data_data(result) + action_id = list(result['action_data']['data'].keys())[0] + return action_id + + +def check_action_data(policy_id, data_id, category_id): + req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) + req.raise_for_status() + result = req.json() + print(result) + if data_id is not None: + check_id_in_action_data_data(data_id, result) + check_category_id_in_action_data_data(category_id, result) + return result + + +def delete_action_data(policy_id, category_id, data_id): + req = requests.delete( + URL.format("/policies/{}/action_data/{}/{}".format(policy_id, category_id, data_id)), + headers=HEADERS) + req.raise_for_status() + req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) + req.raise_for_status() + result = req.json() + check_id_not_in_action_data_data(data_id, result) + check_category_id_in_action_data_data(category_id, result) + + +def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): + req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)), + json={ + "id": subject_id, + "category_id": subject_cat_id, + "data_id": subject_data_id + }, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_subject_assignment_in_result(result) + + +def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): + req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, subject_id, subject_cat_id, subject_data_id))) + req.raise_for_status() + result = req.json() + check_subject_assignment_in_result(result) + check_subject_assignements(subject_id, subject_cat_id, subject_data_id, result) + + +def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id): + req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, object_id, object_cat_id, object_data_id))) + req.raise_for_status() + result = req.json() + check_object_assignment_in_result(result) + check_object_assignements(object_id, object_cat_id, object_data_id, result) + + +def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id): + req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, action_cat_id, action_data_id))) + req.raise_for_status() + result = req.json() + check_action_assignment_in_result(result) + check_action_assignements(action_id, action_cat_id, action_data_id, result) + + +def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id): + req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)), + json={ + "id": object_id, + "category_id": object_cat_id, + "data_id": object_data_id + }, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_object_assignment_in_result(result) + + +def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id): + req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)), + json={ + "id": action_id, + "category_id": action_cat_id, + "data_id": action_data_id + }, headers=HEADERS) + req.raise_for_status() + result = req.json() + check_action_assignment_in_result(result) + + +def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id): + req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, subject_id, subject_cat_id, subject_data_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, subject_id, subject_cat_id, subject_data_id))) + req.raise_for_status() + result = req.json() + check_subject_assignment_in_result(result) + check_not_subject_assignements(subject_id, subject_cat_id, subject_data_id, result) + + +def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id): + req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, object_id, object_cat_id, object_data_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, object_id, object_cat_id, object_data_id))) + req.raise_for_status() + result = req.json() + check_object_assignment_in_result(result) + check_not_object_assignements(object_id, object_cat_id, object_data_id, result) + + +def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id): + req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, action_cat_id, action_data_id))) + req.raise_for_status() + result = req.json() + check_result(result) + + req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, action_cat_id, action_data_id))) + req.raise_for_status() + result = req.json() + check_action_assignment_in_result(result) + check_not_action_assignements(action_id, action_cat_id, action_data_id, result) + + +def add_rule(policy_id, meta_rule_id, rule, + instructions={"chain": [{"security_pipeline": "rbac"}]}): + req = requests.post(URL.format("/policies/{}/rules".format(policy_id)), + json={ + "meta_rule_id": meta_rule_id, + "rule": rule, + "instructions": instructions, + "enabled": True + }, + headers=HEADERS) + req.raise_for_status() + result = req.json() + check_rule_in_result(result) + rule_id = list(result["rules"].keys())[0] + check_policy_id_in_dict(policy_id, result["rules"][rule_id]) + check_meta_rule_id_in_dict(meta_rule_id, result["rules"][rule_id]) + check_rule_in_dict(rule, result["rules"][rule_id]) + return rule_id + + +def check_rule(policy_id, meta_rule_id, rule_id, rule): + req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) + req.raise_for_status() + result = req.json() + check_rule_in_result(result) + check_policy_id_in_dict(policy_id, result["rules"]) + check_rule_id_in_list(meta_rule_id, rule_id, rule, result["rules"]["rules"]) + + +def delete_rule(policy_id, rule_id): + req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id))) + req.raise_for_status() + result = req.json() + check_result(result) + req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) + req.raise_for_status() + result = req.json() + check_rule_in_result(result) + check_policy_id_in_dict(policy_id, result["rules"]) + check_rule_id_not_in_list(rule_id, result["rules"]["rules"]) + + +def check_meta_rule(): + req = requests.get(URL.format("/meta_rules/")) + req.raise_for_status() + result = req.json() + print(result) + return result + + +def create_policy(scenario, model_id, meta_rule_list): + LOGGER.info("Creating policy {}".format(scenario.policy_name)) + _policies = check_policy() + for _policy_id, _policy_value in _policies["policies"].items(): + if _policy_value['name'] == scenario.policy_name: + policy_id = _policy_id + break + else: + policy_id = add_policy(name=scenario.policy_name, genre=scenario.policy_genre) + + update_policy(policy_id, model_id) + + for meta_rule_id in meta_rule_list: + LOGGER.debug("add_meta_rule_to_model {} {}".format(model_id, meta_rule_id)) + models.add_meta_rule_to_model(model_id, meta_rule_id) + + LOGGER.info("Add subject data") + for subject_cat_name in scenario.subject_data: + for subject_data_name in scenario.subject_data[subject_cat_name]: + data_id = scenario.subject_data[subject_cat_name][subject_data_name] = add_subject_data( + policy_id=policy_id, + category_id=scenario.subject_categories[subject_cat_name], name=subject_data_name) + scenario.subject_data[subject_cat_name][subject_data_name] = data_id + LOGGER.info("Add object data") + for object_cat_name in scenario.object_data: + for object_data_name in scenario.object_data[object_cat_name]: + data_id = scenario.object_data[object_cat_name][object_data_name] = add_object_data( + policy_id=policy_id, + category_id=scenario.object_categories[object_cat_name], name=object_data_name) + scenario.object_data[object_cat_name][object_data_name] = data_id + LOGGER.info("Add action data") + for action_cat_name in scenario.action_data: + for action_data_name in scenario.action_data[action_cat_name]: + data_id = scenario.action_data[action_cat_name][action_data_name] = add_action_data( + policy_id=policy_id, + category_id=scenario.action_categories[action_cat_name], name=action_data_name) + scenario.action_data[action_cat_name][action_data_name] = data_id + + LOGGER.info("Add subjects") + for name in scenario.subjects: + scenario.subjects[name] = add_subject(policy_id, name=name) + LOGGER.info("Add objects") + for name in scenario.objects: + scenario.objects[name] = add_object(policy_id, name=name) + LOGGER.info("Add actions") + for name in scenario.actions: + scenario.actions[name] = add_action(policy_id, name=name) + + LOGGER.info("Add subject assignments") + for subject_name in scenario.subject_assignments: + if type(scenario.subject_assignments[subject_name]) in (list, tuple): + for items in scenario.subject_assignments[subject_name]: + for subject_category_name in items: + subject_id = scenario.subjects[subject_name] + subject_cat_id = scenario.subject_categories[subject_category_name] + for data in scenario.subject_assignments[subject_name]: + subject_data_id = scenario.subject_data[subject_category_name][ + data[subject_category_name]] + add_subject_assignments(policy_id, subject_id, subject_cat_id, + subject_data_id) + else: + for subject_category_name in scenario.subject_assignments[subject_name]: + subject_id = scenario.subjects[subject_name] + subject_cat_id = scenario.subject_categories[subject_category_name] + subject_data_id = scenario.subject_data[subject_category_name][ + scenario.subject_assignments[subject_name][subject_category_name]] + add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id) + + LOGGER.info("Add object assignments") + for object_name in scenario.object_assignments: + if type(scenario.object_assignments[object_name]) in (list, tuple): + for items in scenario.object_assignments[object_name]: + for object_category_name in items: + object_id = scenario.objects[object_name] + object_cat_id = scenario.object_categories[object_category_name] + for data in scenario.object_assignments[object_name]: + object_data_id = scenario.object_data[object_category_name][ + data[object_category_name]] + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + else: + for object_category_name in scenario.object_assignments[object_name]: + object_id = scenario.objects[object_name] + object_cat_id = scenario.object_categories[object_category_name] + object_data_id = scenario.object_data[object_category_name][ + scenario.object_assignments[object_name][object_category_name]] + add_object_assignments(policy_id, object_id, object_cat_id, object_data_id) + + LOGGER.info("Add action assignments") + for action_name in scenario.action_assignments: + if type(scenario.action_assignments[action_name]) in (list, tuple): + for items in scenario.action_assignments[action_name]: + for action_category_name in items: + action_id = scenario.actions[action_name] + action_cat_id = scenario.action_categories[action_category_name] + for data in scenario.action_assignments[action_name]: + action_data_id = scenario.action_data[action_category_name][ + data[action_category_name]] + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + else: + for action_category_name in scenario.action_assignments[action_name]: + action_id = scenario.actions[action_name] + action_cat_id = scenario.action_categories[action_category_name] + action_data_id = scenario.action_data[action_category_name][ + scenario.action_assignments[action_name][action_category_name]] + add_action_assignments(policy_id, action_id, action_cat_id, action_data_id) + + LOGGER.info("Add rules") + for meta_rule_name in scenario.rules: + meta_rule_value = scenario.meta_rule[meta_rule_name] + for rule in scenario.rules[meta_rule_name]: + data_list = [] + _meta_rule = list(meta_rule_value["value"]) + for data_name in rule["rule"]: + category_name = _meta_rule.pop(0) + if category_name in scenario.subject_categories: + data_list.append(scenario.subject_data[category_name][data_name]) + elif category_name in scenario.object_categories: + data_list.append(scenario.object_data[category_name][data_name]) + elif category_name in scenario.action_categories: + data_list.append(scenario.action_data[category_name][data_name]) + instructions = rule["instructions"] + add_rule(policy_id, meta_rule_value["id"], data_list, instructions) + return policy_id |