diff options
Diffstat (limited to 'templates/moon_forming/utils/policies.py')
-rw-r--r-- | templates/moon_forming/utils/policies.py | 635 |
1 files changed, 0 insertions, 635 deletions
diff --git a/templates/moon_forming/utils/policies.py b/templates/moon_forming/utils/policies.py deleted file mode 100644 index bd08291a..00000000 --- a/templates/moon_forming/utils/policies.py +++ /dev/null @@ -1,635 +0,0 @@ -import logging -import requests -import utils.config - -config = utils.config.get_config_data() -logger = logging.getLogger("moonforming.utils.policies") - -URL = "http://{}:{}".format(config['components']['manager']['hostname'], config['components']['manager']['port']) -URL = URL + "{}" -HEADERS = {"content-type": "application/json"} -FILE = open("/tmp/test.log", "w") - -policy_template = { - "name": "test_policy", - "model_id": "", - "genre": "authz", - "description": "test", -} - -subject_template = { - "name": "test_subject", - "description": "test", - "email": "mail", - "password": "my_pass", -} - -object_template = { - "name": "test_subject", - "description": "test" -} - -action_template = { - "name": "test_subject", - "description": "test" -} - -subject_data_template = { - "name": "subject_data1", - "description": "description of the data subject" -} - -object_data_template = { - "name": "object_data1", - "description": "description of the data subject" -} - -action_data_template = { - "name": "action_data1", - "description": "description of the data subject" -} - -subject_assignment_template = { - "id": "", - "category_id": "", - "scope_id": "" -} - - -def check_policy(policy_id=None): - req = requests.get(URL.format("/policies")) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "policies" in result - if policy_id: - assert result["policies"] - assert policy_id in result['policies'] - assert "name" in result['policies'][policy_id] - assert policy_template["name"] == result['policies'][policy_id]["name"] - return result - - -def add_policy(name="test_policy", genre="authz"): - policy_template["name"] = name - policy_template["genre"] = genre - req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - policy_id = list(result['policies'].keys())[0] - if "result" in result: - assert result["result"] - assert "name" in result['policies'][policy_id] - assert policy_template["name"] == result['policies'][policy_id]["name"] - return policy_id - - -def update_policy(policy_id, model_id): - req = requests.patch(URL.format("/policies/{}".format(policy_id)), - json={"model_id": model_id}, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - policy_id = list(result['policies'].keys())[0] - if "result" in result: - assert result["result"] - assert "model_id" in result['policies'][policy_id] - assert model_id == result['policies'][policy_id]["model_id"] - - -def delete_policy(policy_id): - req = requests.delete(URL.format("/policies/{}".format(policy_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - -def add_subject(policy_id=None, name="test_subject"): - subject_template['name'] = name - if policy_id: - logger.debug(URL.format("/policies/{}/subjects".format(policy_id))) - req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)), - json=subject_template, headers=HEADERS) - else: - logger.debug(URL.format("/subjects")) - req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS) - logger.debug(req.text) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - subject_id = list(result['subjects'].keys())[0] - return subject_id - - -def update_subject(subject_id, policy_id=None, description=None): - if policy_id and not description: - req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), - json={}) - elif policy_id and description: - req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)), - json={"description": description}) - else: - req = requests.patch(URL.format("/subjects/{}".format(subject_id)), - json={"description": description}) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - assert "name" in result["subjects"][subject_id] - assert subject_template["name"] == result["subjects"][subject_id]["name"] - assert "policy_list" in result["subjects"][subject_id] - if policy_id: - assert policy_id in result["subjects"][subject_id]["policy_list"] - if description: - assert description in result["subjects"][subject_id]["description"] - - -def check_subject(subject_id=None, policy_id=None): - if policy_id: - req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) - else: - req = requests.get(URL.format("/subjects")) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - assert "name" in result["subjects"][subject_id] - assert subject_template["name"] == result["subjects"][subject_id]["name"] - if policy_id: - assert "policy_list" in result["subjects"][subject_id] - assert policy_id in result["subjects"][subject_id]["policy_list"] - - -def delete_subject(subject_id, policy_id=None): - if policy_id: - req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id))) - else: - req = requests.delete(URL.format("/subjects/{}".format(subject_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - if policy_id: - req = requests.get(URL.format("/policies/{}/subjects".format(policy_id))) - else: - req = requests.get(URL.format("/subjects")) - assert req.status_code == 200 - result = req.json() - assert "subjects" in result - if subject_id in result["subjects"]: - assert "name" in result["subjects"][subject_id] - assert subject_template["name"] == result["subjects"][subject_id]["name"] - if policy_id: - assert "policy_list" in result["subjects"][subject_id] - assert policy_id not in result["subjects"][subject_id]["policy_list"] - - -def add_object(policy_id=None, name="test_object"): - object_template['name'] = name - if policy_id: - req = requests.post(URL.format("/policies/{}/objects".format(policy_id)), - json=object_template, headers=HEADERS) - else: - req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - object_id = list(result['objects'].keys())[0] - return object_id - - -def update_object(object_id, policy_id): - req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)), json={}) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - assert "name" in result["objects"][object_id] - assert object_template["name"] == result["objects"][object_id]["name"] - assert "policy_list" in result["objects"][object_id] - assert policy_id in result["objects"][object_id]["policy_list"] - - -def check_object(object_id=None, policy_id=None): - if policy_id: - req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) - else: - req = requests.get(URL.format("/objects")) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - assert "name" in result["objects"][object_id] - assert object_template["name"] == result["objects"][object_id]["name"] - if policy_id: - assert "policy_list" in result["objects"][object_id] - assert policy_id in result["objects"][object_id]["policy_list"] - - -def delete_object(object_id, policy_id=None): - if policy_id: - req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id))) - else: - req = requests.delete(URL.format("/objects/{}".format(object_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - if policy_id: - req = requests.get(URL.format("/policies/{}/objects".format(policy_id))) - else: - req = requests.get(URL.format("/objects")) - assert req.status_code == 200 - result = req.json() - assert "objects" in result - if object_id in result["objects"]: - assert "name" in result["objects"][object_id] - assert object_template["name"] == result["objects"][object_id]["name"] - if policy_id: - assert "policy_list" in result["objects"][object_id] - assert policy_id not in result["objects"][object_id]["policy_list"] - - -def add_action(policy_id=None, name="test_action"): - action_template['name'] = name - if policy_id: - req = requests.post(URL.format("/policies/{}/actions".format(policy_id)), - json=action_template, headers=HEADERS) - else: - req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - action_id = list(result['actions'].keys())[0] - return action_id - - -def update_action(action_id, policy_id): - req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)), json={}) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - assert "name" in result["actions"][action_id] - assert action_template["name"] == result["actions"][action_id]["name"] - assert "policy_list" in result["actions"][action_id] - assert policy_id in result["actions"][action_id]["policy_list"] - - -def check_action(action_id=None, policy_id=None): - if policy_id: - req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) - else: - req = requests.get(URL.format("/actions")) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - assert "name" in result["actions"][action_id] - assert action_template["name"] == result["actions"][action_id]["name"] - if policy_id: - assert "policy_list" in result["actions"][action_id] - assert policy_id in result["actions"][action_id]["policy_list"] - - -def delete_action(action_id, policy_id=None): - if policy_id: - req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id))) - else: - req = requests.delete(URL.format("/actions/{}".format(action_id))) - assert req.status_code == 200 - result = req.json() - assert type(result) is dict - assert "result" in result - assert result["result"] - - if policy_id: - req = requests.get(URL.format("/policies/{}/actions".format(policy_id))) - else: - req = requests.get(URL.format("/actions")) - assert req.status_code == 200 - result = req.json() - assert "actions" in result - if action_id in result["actions"]: - assert "name" in result["actions"][action_id] - assert action_template["name"] == result["actions"][action_id]["name"] - if policy_id: - assert "policy_list" in result["actions"][action_id] - assert policy_id not in result["actions"][action_id]["policy_list"] - - -def add_subject_data(policy_id, category_id, name="subject_data1"): - subject_data_template['name'] = name - req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)), - json=subject_data_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "subject_data" in result - subject_id = list(result['subject_data']['data'].keys())[0] - return subject_id - - -def check_subject_data(policy_id, data_id, category_id): - req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_data" in result - for _data in result['subject_data']: - assert data_id in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def delete_subject_data(policy_id, category_id, data_id): - req = requests.delete(URL.format("/policies/{}/subject_data/{}/{}".format(policy_id, category_id, data_id)), - headers=HEADERS) - assert req.status_code == 200 - req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_data" in result - for _data in result['subject_data']: - assert data_id not in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def add_object_data(policy_id, category_id, name="object_data1"): - object_data_template['name'] = name - req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)), - json=object_data_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "object_data" in result - object_id = list(result['object_data']['data'].keys())[0] - return object_id - - -def check_object_data(policy_id, data_id, category_id): - req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "object_data" in result - for _data in result['object_data']: - assert data_id in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def delete_object_data(policy_id, category_id, data_id): - req = requests.delete(URL.format("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)), - headers=HEADERS) - assert req.status_code == 200 - req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "object_data" in result - for _data in result['object_data']: - assert data_id not in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def add_action_data(policy_id, category_id, name="action_data1"): - action_data_template['name'] = name - req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)), - json=action_data_template, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "action_data" in result - action_id = list(result['action_data']['data'].keys())[0] - return action_id - - -def check_action_data(policy_id, data_id, category_id): - req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "action_data" in result - for _data in result['action_data']: - assert data_id in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def delete_action_data(policy_id, category_id, data_id): - req = requests.delete(URL.format("/policies/{}/action_data/{}/{}".format(policy_id, category_id, data_id)), - headers=HEADERS) - assert req.status_code == 200 - req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id))) - assert req.status_code == 200 - result = req.json() - assert "action_data" in result - for _data in result['action_data']: - assert data_id not in list(_data['data'].keys()) - assert category_id == _data["category_id"] - - -def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): - req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)), - json={ - "id": subject_id, - "category_id": subject_cat_id, - "data_id": subject_data_id - }, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "subject_assignments" in result - assert result["subject_assignments"] - - -def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id): - req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( - policy_id, subject_id, subject_cat_id, subject_data_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_assignments" in result - assert result["subject_assignments"] - for key in result["subject_assignments"]: - assert "subject_id" in result["subject_assignments"][key] - assert "category_id" in result["subject_assignments"][key] - assert "assignments" in result["subject_assignments"][key] - if result["subject_assignments"][key]['subject_id'] == subject_id and \ - result["subject_assignments"][key]["category_id"] == subject_cat_id: - assert subject_data_id in result["subject_assignments"][key]["assignments"] - - -def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id): - req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( - policy_id, object_id, object_cat_id, object_data_id))) - assert req.status_code == 200 - result = req.json() - assert "object_assignments" in result - assert result["object_assignments"] - for key in result["object_assignments"]: - assert "object_id" in result["object_assignments"][key] - assert "category_id" in result["object_assignments"][key] - assert "assignments" in result["object_assignments"][key] - if result["object_assignments"][key]['object_id'] == object_id and \ - result["object_assignments"][key]["category_id"] == object_cat_id: - assert object_data_id in result["object_assignments"][key]["assignments"] - - -def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id): - req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( - policy_id, action_id, action_cat_id, action_data_id))) - assert req.status_code == 200 - result = req.json() - assert "action_assignments" in result - assert result["action_assignments"] - for key in result["action_assignments"]: - assert "action_id" in result["action_assignments"][key] - assert "category_id" in result["action_assignments"][key] - assert "assignments" in result["action_assignments"][key] - if result["action_assignments"][key]['action_id'] == action_id and \ - result["action_assignments"][key]["category_id"] == action_cat_id: - assert action_data_id in result["action_assignments"][key]["assignments"] - - -def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id): - req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)), - json={ - "id": object_id, - "category_id": object_cat_id, - "data_id": object_data_id - }, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "object_assignments" in result - assert result["object_assignments"] - - -def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id): - req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)), - json={ - "id": action_id, - "category_id": action_cat_id, - "data_id": action_data_id - }, headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "action_assignments" in result - assert result["action_assignments"] - - -def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id): - req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( - policy_id, subject_id, subject_cat_id, subject_data_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format( - policy_id, subject_id, subject_cat_id, subject_data_id))) - assert req.status_code == 200 - result = req.json() - assert "subject_assignments" in result - assert result["subject_assignments"] - for key in result["subject_assignments"]: - assert "subject_id" in result["subject_assignments"][key] - assert "category_id" in result["subject_assignments"][key] - assert "assignments" in result["subject_assignments"][key] - if result["subject_assignments"][key]['subject_id'] == subject_id and \ - result["subject_assignments"][key]["category_id"] == subject_cat_id: - assert subject_data_id not in result["subject_assignments"][key]["assignments"] - - -def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id): - req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( - policy_id, object_id, object_cat_id, object_data_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format( - policy_id, object_id, object_cat_id, object_data_id))) - assert req.status_code == 200 - result = req.json() - assert "object_assignments" in result - assert result["object_assignments"] - for key in result["object_assignments"]: - assert "object_id" in result["object_assignments"][key] - assert "category_id" in result["object_assignments"][key] - assert "assignments" in result["object_assignments"][key] - if result["object_assignments"][key]['object_id'] == object_id and \ - result["object_assignments"][key]["category_id"] == object_cat_id: - assert object_data_id not in result["object_assignments"][key]["assignments"] - - -def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id): - req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( - policy_id, action_id, action_cat_id, action_data_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format( - policy_id, action_id, action_cat_id, action_data_id))) - assert req.status_code == 200 - result = req.json() - assert "action_assignments" in result - assert result["action_assignments"] - for key in result["action_assignments"]: - assert "action_id" in result["action_assignments"][key] - assert "category_id" in result["action_assignments"][key] - assert "assignments" in result["action_assignments"][key] - if result["action_assignments"][key]['action_id'] == action_id and \ - result["action_assignments"][key]["category_id"] == action_cat_id: - assert action_data_id not in result["action_assignments"][key]["assignments"] - - -def add_rule(policy_id, meta_rule_id, rule, instructions={"chain": [{"security_pipeline": "rbac"}]}): - req = requests.post(URL.format("/policies/{}/rules".format(policy_id)), - json={ - "meta_rule_id": meta_rule_id, - "rule": rule, - "instructions": instructions, - "enabled": True - }, - headers=HEADERS) - assert req.status_code == 200 - result = req.json() - assert "rules" in result - try: - rule_id = list(result["rules"].keys())[0] - except Exception as e: - return False - assert "policy_id" in result["rules"][rule_id] - assert policy_id == result["rules"][rule_id]["policy_id"] - assert "meta_rule_id" in result["rules"][rule_id] - assert meta_rule_id == result["rules"][rule_id]["meta_rule_id"] - assert rule == result["rules"][rule_id]["rule"] - return rule_id - - -def check_rule(policy_id, meta_rule_id, rule_id, rule): - req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) - assert req.status_code == 200 - result = req.json() - assert "rules" in result - assert "policy_id" in result["rules"] - assert policy_id == result["rules"]["policy_id"] - for item in result["rules"]["rules"]: - assert "meta_rule_id" in item - if meta_rule_id == item["meta_rule_id"]: - if rule_id == item["id"]: - assert rule == item["rule"] - - -def delete_rule(policy_id, rule_id): - req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id))) - assert req.status_code == 200 - result = req.json() - assert "result" in result - assert result["result"] - - req = requests.get(URL.format("/policies/{}/rules".format(policy_id))) - assert req.status_code == 200 - result = req.json() - assert "rules" in result - assert "policy_id" in result["rules"] - assert policy_id == result["rules"]["policy_id"] - found_rule = False - for item in result["rules"]["rules"]: - if rule_id == item["id"]: - found_rule = True - assert not found_rule |