aboutsummaryrefslogtreecommitdiffstats
path: root/old/python_moonclient/python_moonclient/core/policies.py
diff options
context:
space:
mode:
Diffstat (limited to 'old/python_moonclient/python_moonclient/core/policies.py')
-rw-r--r--old/python_moonclient/python_moonclient/core/policies.py673
1 files changed, 673 insertions, 0 deletions
diff --git a/old/python_moonclient/python_moonclient/core/policies.py b/old/python_moonclient/python_moonclient/core/policies.py
new file mode 100644
index 00000000..b9b05dd8
--- /dev/null
+++ b/old/python_moonclient/python_moonclient/core/policies.py
@@ -0,0 +1,673 @@
+import logging
+import requests
+from python_moonclient.core import models, config
+from python_moonclient.core.check_tools import *
+
+LOGGER = logging.getLogger("moonclient.core.policies")
+
+URL = None
+HEADERS = None
+
+policy_template = {
+ "name": "test_policy",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+}
+
+subject_template = {
+ "name": "test_subject",
+ "description": "test",
+ "email": "mail",
+ "password": "my_pass",
+}
+
+object_template = {
+ "name": "test_subject",
+ "description": "test"
+}
+
+action_template = {
+ "name": "test_subject",
+ "description": "test"
+}
+
+subject_data_template = {
+ "name": "subject_data1",
+ "description": "description of the data subject"
+}
+
+object_data_template = {
+ "name": "object_data1",
+ "description": "description of the data subject"
+}
+
+action_data_template = {
+ "name": "action_data1",
+ "description": "description of the data subject"
+}
+
+subject_assignment_template = {
+ "id": "",
+ "category_id": "",
+ "scope_id": ""
+}
+
+
+def init(consul_host, consul_port):
+ conf_data = config.get_config_data(consul_host, consul_port)
+ global URL, HEADERS
+ URL = "http://{}:{}".format(
+ conf_data['manager_host'],
+ conf_data['manager_port'])
+ URL = URL + "{}"
+ HEADERS = {"content-type": "application/json"}
+
+
+def check_policy(policy_id=None):
+ req = requests.get(URL.format("/policies"))
+ req.raise_for_status()
+ result = req.json()
+ check_policy_in_result(result)
+ if policy_id:
+ check_policy_name(policy_template["name"], policy_id, result)
+ return result
+
+
+def add_policy(name="test_policy", genre="authz"):
+ policy_template["name"] = name
+ policy_template["genre"] = genre
+ req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_policy_in_result(result)
+ policy_id = list(result['policies'].keys())[0]
+ check_optionnal_result(result)
+ check_policy_name(policy_template["name"], policy_id, result)
+ return policy_id
+
+
+def update_policy(policy_id, model_id):
+ req = requests.patch(URL.format("/policies/{}".format(policy_id)),
+ json={"model_id": model_id}, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_policy_in_result(result)
+ policy_id = list(result['policies'].keys())[0]
+ check_optionnal_result(result)
+ check_policy_model_id(model_id, policy_id, result)
+
+
+def delete_policy(policy_id):
+ req = requests.delete(URL.format("/policies/{}".format(policy_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+
+def add_subject(policy_id=None, name="test_subject"):
+ subject_template['name'] = name
+ if policy_id:
+ LOGGER.debug(URL.format("/policies/{}/subjects".format(policy_id)))
+ req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)),
+ json=subject_template, headers=HEADERS)
+ else:
+ LOGGER.debug(URL.format("/subjects"))
+ req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS)
+ LOGGER.debug(req.text)
+ req.raise_for_status()
+ result = req.json()
+ check_subject_in_result(result)
+ subject_id = list(result['subjects'].keys())[0]
+ return subject_id
+
+
+def update_subject(subject_id, policy_id=None, description=None):
+ if policy_id and not description:
+ req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)),
+ json={})
+ elif policy_id and description:
+ req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)),
+ json={"description": description})
+ else:
+ req = requests.patch(URL.format("/subjects/{}".format(subject_id)),
+ json={"description": description})
+ req.raise_for_status()
+ result = req.json()
+ check_subject_name(subject_template["name"], subject_id, result)
+ check_subject_policy(policy_id, result["subjects"][subject_id])
+ check_subject_description(description, result["subjects"][subject_id])
+
+
+def check_subject(subject_id=None, policy_id=None):
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/subjects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/subjects"))
+ req.raise_for_status()
+ result = req.json()
+ check_subject_name(subject_template["name"], subject_id, result)
+ check_subject_policy(policy_id, result["subjects"][subject_id])
+
+
+def delete_subject(subject_id, policy_id=None):
+ if policy_id:
+ req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)))
+ else:
+ req = requests.delete(URL.format("/subjects/{}".format(subject_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/subjects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/subjects"))
+ req.raise_for_status()
+ result = req.json()
+ check_subject_in_result(result)
+ if subject_id in result["subjects"]:
+ check_subject_name(subject_template["name"], subject_id, result)
+ check_subject_policy(policy_id, result["subjects"][subject_id])
+
+
+def add_object(policy_id=None, name="test_object"):
+ object_template['name'] = name
+ if policy_id:
+ req = requests.post(URL.format("/policies/{}/objects".format(policy_id)),
+ json=object_template, headers=HEADERS)
+ else:
+ req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_object_in_result(result)
+ object_id = list(result['objects'].keys())[0]
+ return object_id
+
+
+def update_object(object_id, policy_id):
+ req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)),
+ json={})
+ req.raise_for_status()
+ result = req.json()
+ check_object_in_result(result)
+ check_object_name(object_template["name"], object_id, result)
+ check_object_policy(policy_id, result["objects"][object_id])
+
+
+def check_object(object_id=None, policy_id=None):
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/objects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/objects"))
+ req.raise_for_status()
+ result = req.json()
+ check_object_in_result(result)
+ check_object_name(object_template["name"], object_id, result)
+ if policy_id:
+ check_object_policy(policy_id, result["objects"][object_id])
+
+
+def delete_object(object_id, policy_id=None):
+ if policy_id:
+ req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)))
+ else:
+ req = requests.delete(URL.format("/objects/{}".format(object_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/objects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/objects"))
+ req.raise_for_status()
+ result = req.json()
+ check_object_in_result(result)
+ if object_id in result["objects"]:
+ check_object_name(object_template["name"], object_id, result)
+ if policy_id:
+ check_object_policy(policy_id, result["objects"][object_id])
+
+
+def add_action(policy_id=None, name="test_action"):
+ action_template['name'] = name
+ if policy_id:
+ req = requests.post(URL.format("/policies/{}/actions".format(policy_id)),
+ json=action_template, headers=HEADERS)
+ else:
+ req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_action_in_result(result)
+ action_id = list(result['actions'].keys())[0]
+ return action_id
+
+
+def update_action(action_id, policy_id):
+ req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)),
+ json={})
+ req.raise_for_status()
+ result = req.json()
+ check_action_in_result(result)
+ check_action_name(action_template["name"], action_id, result)
+ check_action_policy(policy_id, result["actions"][action_id])
+
+
+def check_action(action_id=None, policy_id=None):
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/actions".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/actions"))
+ req.raise_for_status()
+ result = req.json()
+ check_action_in_result(result)
+ check_action_name(action_template["name"], action_id, result)
+ if policy_id:
+ check_action_policy(policy_id, result["actions"][action_id])
+
+
+def delete_action(action_id, policy_id=None):
+ if policy_id:
+ req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)))
+ else:
+ req = requests.delete(URL.format("/actions/{}".format(action_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/actions".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/actions"))
+ req.raise_for_status()
+ result = req.json()
+ check_action_in_result(result)
+ if action_id in result["actions"]:
+ check_action_name(action_template["name"], action_id, result)
+ if policy_id:
+ check_action_policy(policy_id, result["actions"][action_id])
+
+
+def add_subject_data(policy_id, category_id, name="subject_data1"):
+ subject_data_template['name'] = name
+ req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)),
+ json=subject_data_template, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_subject_data_data(result)
+ subject_id = list(result['subject_data']['data'].keys())[0]
+ return subject_id
+
+
+def check_subject_data(policy_id, data_id, category_id):
+ req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)))
+ req.raise_for_status()
+ result = req.json()
+ print(result)
+ if data_id is not None:
+ check_id_in_subject_data_data(data_id, result)
+ check_category_id_in_subject_data_data(category_id, result)
+ return result
+
+
+def delete_subject_data(policy_id, category_id, data_id):
+ req = requests.delete(
+ URL.format("/policies/{}/subject_data/{}/{}".format(policy_id, category_id, data_id)),
+ headers=HEADERS)
+ req.raise_for_status()
+ req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_id_not_in_subject_data_data(data_id, result)
+ check_category_id_in_subject_data_data(category_id, result)
+
+
+def add_object_data(policy_id, category_id, name="object_data1"):
+ object_data_template['name'] = name
+ req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)),
+ json=object_data_template, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_object_data_data(result)
+ object_id = list(result['object_data']['data'].keys())[0]
+ return object_id
+
+
+def check_object_data(policy_id, data_id, category_id):
+ req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)))
+ req.raise_for_status()
+ result = req.json()
+ if data_id is not None:
+ check_id_in_object_data_data(data_id, result)
+ check_category_id_in_object_data_data(category_id, result)
+ return result
+
+
+def delete_object_data(policy_id, category_id, data_id):
+ req = requests.delete(
+ URL.format("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)),
+ headers=HEADERS)
+ req.raise_for_status()
+ req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_id_not_in_object_data_data(data_id, result)
+ check_category_id_in_object_data_data(category_id, result)
+
+
+def add_action_data(policy_id, category_id, name="action_data1"):
+ action_data_template['name'] = name
+ req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)),
+ json=action_data_template, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_action_data_data(result)
+ action_id = list(result['action_data']['data'].keys())[0]
+ return action_id
+
+
+def check_action_data(policy_id, data_id, category_id):
+ req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)))
+ req.raise_for_status()
+ result = req.json()
+ print(result)
+ if data_id is not None:
+ check_id_in_action_data_data(data_id, result)
+ check_category_id_in_action_data_data(category_id, result)
+ return result
+
+
+def delete_action_data(policy_id, category_id, data_id):
+ req = requests.delete(
+ URL.format("/policies/{}/action_data/{}/{}".format(policy_id, category_id, data_id)),
+ headers=HEADERS)
+ req.raise_for_status()
+ req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_id_not_in_action_data_data(data_id, result)
+ check_category_id_in_action_data_data(category_id, result)
+
+
+def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id):
+ req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)),
+ json={
+ "id": subject_id,
+ "category_id": subject_cat_id,
+ "data_id": subject_data_id
+ }, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_subject_assignment_in_result(result)
+
+
+def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id):
+ req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, subject_id, subject_cat_id, subject_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_subject_assignment_in_result(result)
+ check_subject_assignements(subject_id, subject_cat_id, subject_data_id, result)
+
+
+def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id):
+ req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, object_id, object_cat_id, object_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_object_assignment_in_result(result)
+ check_object_assignements(object_id, object_cat_id, object_data_id, result)
+
+
+def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id):
+ req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, action_cat_id, action_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_action_assignment_in_result(result)
+ check_action_assignements(action_id, action_cat_id, action_data_id, result)
+
+
+def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id):
+ req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)),
+ json={
+ "id": object_id,
+ "category_id": object_cat_id,
+ "data_id": object_data_id
+ }, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_object_assignment_in_result(result)
+
+
+def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id):
+ req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)),
+ json={
+ "id": action_id,
+ "category_id": action_cat_id,
+ "data_id": action_data_id
+ }, headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_action_assignment_in_result(result)
+
+
+def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id):
+ req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, subject_id, subject_cat_id, subject_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+ req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, subject_id, subject_cat_id, subject_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_subject_assignment_in_result(result)
+ check_not_subject_assignements(subject_id, subject_cat_id, subject_data_id, result)
+
+
+def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id):
+ req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, object_id, object_cat_id, object_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+ req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, object_id, object_cat_id, object_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_object_assignment_in_result(result)
+ check_not_object_assignements(object_id, object_cat_id, object_data_id, result)
+
+
+def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id):
+ req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, action_cat_id, action_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+
+ req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, action_cat_id, action_data_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_action_assignment_in_result(result)
+ check_not_action_assignements(action_id, action_cat_id, action_data_id, result)
+
+
+def add_rule(policy_id, meta_rule_id, rule,
+ instructions={"chain": [{"security_pipeline": "rbac"}]}):
+ req = requests.post(URL.format("/policies/{}/rules".format(policy_id)),
+ json={
+ "meta_rule_id": meta_rule_id,
+ "rule": rule,
+ "instructions": instructions,
+ "enabled": True
+ },
+ headers=HEADERS)
+ req.raise_for_status()
+ result = req.json()
+ check_rule_in_result(result)
+ rule_id = list(result["rules"].keys())[0]
+ check_policy_id_in_dict(policy_id, result["rules"][rule_id])
+ check_meta_rule_id_in_dict(meta_rule_id, result["rules"][rule_id])
+ check_rule_in_dict(rule, result["rules"][rule_id])
+ return rule_id
+
+
+def check_rule(policy_id, meta_rule_id, rule_id, rule):
+ req = requests.get(URL.format("/policies/{}/rules".format(policy_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_rule_in_result(result)
+ check_policy_id_in_dict(policy_id, result["rules"])
+ check_rule_id_in_list(meta_rule_id, rule_id, rule, result["rules"]["rules"])
+
+
+def delete_rule(policy_id, rule_id):
+ req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_result(result)
+ req = requests.get(URL.format("/policies/{}/rules".format(policy_id)))
+ req.raise_for_status()
+ result = req.json()
+ check_rule_in_result(result)
+ check_policy_id_in_dict(policy_id, result["rules"])
+ check_rule_id_not_in_list(rule_id, result["rules"]["rules"])
+
+
+def check_meta_rule():
+ req = requests.get(URL.format("/meta_rules/"))
+ req.raise_for_status()
+ result = req.json()
+ print(result)
+ return result
+
+
+def create_policy(scenario, model_id, meta_rule_list):
+ LOGGER.info("Creating policy {}".format(scenario.policy_name))
+ _policies = check_policy()
+ for _policy_id, _policy_value in _policies["policies"].items():
+ if _policy_value['name'] == scenario.policy_name:
+ policy_id = _policy_id
+ break
+ else:
+ policy_id = add_policy(name=scenario.policy_name, genre=scenario.policy_genre)
+
+ update_policy(policy_id, model_id)
+
+ for meta_rule_id in meta_rule_list:
+ LOGGER.debug("add_meta_rule_to_model {} {}".format(model_id, meta_rule_id))
+ models.add_meta_rule_to_model(model_id, meta_rule_id)
+
+ LOGGER.info("Add subject data")
+ for subject_cat_name in scenario.subject_data:
+ for subject_data_name in scenario.subject_data[subject_cat_name]:
+ data_id = scenario.subject_data[subject_cat_name][subject_data_name] = add_subject_data(
+ policy_id=policy_id,
+ category_id=scenario.subject_categories[subject_cat_name], name=subject_data_name)
+ scenario.subject_data[subject_cat_name][subject_data_name] = data_id
+ LOGGER.info("Add object data")
+ for object_cat_name in scenario.object_data:
+ for object_data_name in scenario.object_data[object_cat_name]:
+ data_id = scenario.object_data[object_cat_name][object_data_name] = add_object_data(
+ policy_id=policy_id,
+ category_id=scenario.object_categories[object_cat_name], name=object_data_name)
+ scenario.object_data[object_cat_name][object_data_name] = data_id
+ LOGGER.info("Add action data")
+ for action_cat_name in scenario.action_data:
+ for action_data_name in scenario.action_data[action_cat_name]:
+ data_id = scenario.action_data[action_cat_name][action_data_name] = add_action_data(
+ policy_id=policy_id,
+ category_id=scenario.action_categories[action_cat_name], name=action_data_name)
+ scenario.action_data[action_cat_name][action_data_name] = data_id
+
+ LOGGER.info("Add subjects")
+ for name in scenario.subjects:
+ scenario.subjects[name] = add_subject(policy_id, name=name)
+ LOGGER.info("Add objects")
+ for name in scenario.objects:
+ scenario.objects[name] = add_object(policy_id, name=name)
+ LOGGER.info("Add actions")
+ for name in scenario.actions:
+ scenario.actions[name] = add_action(policy_id, name=name)
+
+ LOGGER.info("Add subject assignments")
+ for subject_name in scenario.subject_assignments:
+ if type(scenario.subject_assignments[subject_name]) in (list, tuple):
+ for items in scenario.subject_assignments[subject_name]:
+ for subject_category_name in items:
+ subject_id = scenario.subjects[subject_name]
+ subject_cat_id = scenario.subject_categories[subject_category_name]
+ for data in scenario.subject_assignments[subject_name]:
+ subject_data_id = scenario.subject_data[subject_category_name][
+ data[subject_category_name]]
+ add_subject_assignments(policy_id, subject_id, subject_cat_id,
+ subject_data_id)
+ else:
+ for subject_category_name in scenario.subject_assignments[subject_name]:
+ subject_id = scenario.subjects[subject_name]
+ subject_cat_id = scenario.subject_categories[subject_category_name]
+ subject_data_id = scenario.subject_data[subject_category_name][
+ scenario.subject_assignments[subject_name][subject_category_name]]
+ add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id)
+
+ LOGGER.info("Add object assignments")
+ for object_name in scenario.object_assignments:
+ if type(scenario.object_assignments[object_name]) in (list, tuple):
+ for items in scenario.object_assignments[object_name]:
+ for object_category_name in items:
+ object_id = scenario.objects[object_name]
+ object_cat_id = scenario.object_categories[object_category_name]
+ for data in scenario.object_assignments[object_name]:
+ object_data_id = scenario.object_data[object_category_name][
+ data[object_category_name]]
+ add_object_assignments(policy_id, object_id, object_cat_id, object_data_id)
+ else:
+ for object_category_name in scenario.object_assignments[object_name]:
+ object_id = scenario.objects[object_name]
+ object_cat_id = scenario.object_categories[object_category_name]
+ object_data_id = scenario.object_data[object_category_name][
+ scenario.object_assignments[object_name][object_category_name]]
+ add_object_assignments(policy_id, object_id, object_cat_id, object_data_id)
+
+ LOGGER.info("Add action assignments")
+ for action_name in scenario.action_assignments:
+ if type(scenario.action_assignments[action_name]) in (list, tuple):
+ for items in scenario.action_assignments[action_name]:
+ for action_category_name in items:
+ action_id = scenario.actions[action_name]
+ action_cat_id = scenario.action_categories[action_category_name]
+ for data in scenario.action_assignments[action_name]:
+ action_data_id = scenario.action_data[action_category_name][
+ data[action_category_name]]
+ add_action_assignments(policy_id, action_id, action_cat_id, action_data_id)
+ else:
+ for action_category_name in scenario.action_assignments[action_name]:
+ action_id = scenario.actions[action_name]
+ action_cat_id = scenario.action_categories[action_category_name]
+ action_data_id = scenario.action_data[action_category_name][
+ scenario.action_assignments[action_name][action_category_name]]
+ add_action_assignments(policy_id, action_id, action_cat_id, action_data_id)
+
+ LOGGER.info("Add rules")
+ for meta_rule_name in scenario.rules:
+ meta_rule_value = scenario.meta_rule[meta_rule_name]
+ for rule in scenario.rules[meta_rule_name]:
+ data_list = []
+ _meta_rule = list(meta_rule_value["value"])
+ for data_name in rule["rule"]:
+ category_name = _meta_rule.pop(0)
+ if category_name in scenario.subject_categories:
+ data_list.append(scenario.subject_data[category_name][data_name])
+ elif category_name in scenario.object_categories:
+ data_list.append(scenario.object_data[category_name][data_name])
+ elif category_name in scenario.action_categories:
+ data_list.append(scenario.action_data[category_name][data_name])
+ instructions = rule["instructions"]
+ add_rule(policy_id, meta_rule_value["id"], data_list, instructions)
+ return policy_id