aboutsummaryrefslogtreecommitdiffstats
path: root/moonv4/moon_interface/tests
diff options
context:
space:
mode:
Diffstat (limited to 'moonv4/moon_interface/tests')
-rw-r--r--moonv4/moon_interface/tests/apitests/README.md33
-rw-r--r--moonv4/moon_interface/tests/apitests/populate_default_values.py149
-rw-r--r--moonv4/moon_interface/tests/apitests/scenario/mls.py44
-rw-r--r--moonv4/moon_interface/tests/apitests/scenario/rbac.py32
-rw-r--r--moonv4/moon_interface/tests/apitests/scenario/rbac_mls.py50
-rw-r--r--moonv4/moon_interface/tests/apitests/test_models.py37
-rw-r--r--moonv4/moon_interface/tests/apitests/test_pdp.py16
-rw-r--r--moonv4/moon_interface/tests/apitests/test_policies.py154
-rw-r--r--moonv4/moon_interface/tests/apitests/utils/__init__.py0
-rw-r--r--moonv4/moon_interface/tests/apitests/utils/models.py260
-rw-r--r--moonv4/moon_interface/tests/apitests/utils/pdp.py145
-rw-r--r--moonv4/moon_interface/tests/apitests/utils/policies.py581
12 files changed, 1501 insertions, 0 deletions
diff --git a/moonv4/moon_interface/tests/apitests/README.md b/moonv4/moon_interface/tests/apitests/README.md
new file mode 100644
index 00000000..ef74a1e3
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/README.md
@@ -0,0 +1,33 @@
+Test directory
+==============
+
+API tests
+---------
+To test all interfaces, you can use :
+
+```bash
+$ cd moonv4/moon_interface/tests/apitests
+$ pytest
+============================================================================= test session starts ==============================================================================
+platform linux -- Python 3.5.2, pytest-3.0.7, py-1.4.33, pluggy-0.4.0
+rootdir: /home/vdsq3226/projets/opnfv/moonv4/moon_interface, inifile:
+collected 15 items
+
+test_models.py .....
+test_pdp.py .
+test_policies.py .........
+
+```
+
+Populate default variables for a particular demonstration
+---------------------------------------------------------
+
+```bash
+$ cd moonv4/moon_interface/tests/apitests
+$ python3 populate_default_values.py scenario/rbac.py -v
+Loading: scenario/rbac.py
+2017-03-31 09:57:17,243 WARNING Creating model RBAC
+2017-03-31 09:57:17,507 WARNING Creating policy Multi policy example
+2017-03-31 09:57:18,690 WARNING Creating PDP pdp1
+
+``` \ No newline at end of file
diff --git a/moonv4/moon_interface/tests/apitests/populate_default_values.py b/moonv4/moon_interface/tests/apitests/populate_default_values.py
new file mode 100644
index 00000000..f58a7444
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/populate_default_values.py
@@ -0,0 +1,149 @@
+import argparse
+import logging
+from importlib.machinery import SourceFileLoader
+from utils.pdp import *
+from utils.models import *
+from utils.policies import *
+
+parser = argparse.ArgumentParser()
+parser.add_argument('filename', help='scenario filename', nargs=1)
+parser.add_argument("--verbose", "-v", action='store_true', help="verbose mode")
+args = parser.parse_args()
+
+FORMAT = '%(asctime)-15s %(levelname)s %(message)s'
+logging.basicConfig(
+ format=FORMAT,
+ level=logging.WARNING)
+
+logger = logging.getLogger(__name__)
+
+if args.filename:
+ print("Loading: {}".format(args.filename[0]))
+
+m = SourceFileLoader("scenario", args.filename[0])
+
+scenario = m.load_module()
+
+
+def create_model():
+ if args.verbose:
+ logger.warning("Creating model {}".format(scenario.model_name))
+ _model_id = add_model(name=scenario.model_name)
+ for cat in scenario.subject_categories:
+ scenario.subject_categories[cat] = add_subject_category(name=cat)
+ for cat in scenario.object_categories:
+ scenario.object_categories[cat] = add_object_category(name=cat)
+ for cat in scenario.action_categories:
+ scenario.action_categories[cat] = add_action_category(name=cat)
+ sub_cat = []
+ ob_cat = []
+ act_cat = []
+ meta_rule_list = []
+ for item_name, item_value in scenario.meta_rule.items():
+ for item in item_value["value"]:
+ if item in scenario.subject_categories:
+ sub_cat.append(scenario.subject_categories[item])
+ elif item in scenario.object_categories:
+ ob_cat.append(scenario.object_categories[item])
+ elif item in scenario.action_categories:
+ act_cat.append(scenario.action_categories[item])
+ meta_rule_id = add_meta_rule(item_name, sub_cat, ob_cat, act_cat)
+ item_value["id"] = meta_rule_id
+ meta_rule_list.append(meta_rule_id)
+ return _model_id, meta_rule_list
+
+
+def create_policy(model_id, meta_rule_list):
+ if args.verbose:
+ logger.warning("Creating policy {}".format(scenario.policy_name))
+ policy_id = add_policy(name=scenario.policy_name)
+
+ update_policy(policy_id, model_id)
+
+ for meta_rule_id in meta_rule_list:
+ add_meta_rule_to_model(model_id, meta_rule_id)
+
+ for subject_cat_name in scenario.subject_data:
+ for subject_data_name in scenario.subject_data[subject_cat_name]:
+ data_id = scenario.subject_data[subject_cat_name][subject_data_name] = add_subject_data(
+ policy_id=policy_id,
+ category_id=scenario.subject_categories[subject_cat_name], name=subject_data_name)
+ scenario.subject_data[subject_cat_name][subject_data_name] = data_id
+ for object_cat_name in scenario.object_data:
+ for object_data_name in scenario.object_data[object_cat_name]:
+ data_id = scenario.object_data[object_cat_name][object_data_name] = add_object_data(
+ policy_id=policy_id,
+ category_id=scenario.object_categories[object_cat_name], name=object_data_name)
+ scenario.object_data[object_cat_name][object_data_name] = data_id
+ for action_cat_name in scenario.action_data:
+ for action_data_name in scenario.action_data[action_cat_name]:
+ data_id = scenario.action_data[action_cat_name][action_data_name] = add_action_data(
+ policy_id=policy_id,
+ category_id=scenario.action_categories[action_cat_name], name=action_data_name)
+ scenario.action_data[action_cat_name][action_data_name] = data_id
+
+ for name in scenario.subjects:
+ scenario.subjects[name] = add_subject(policy_id, name=name)
+ for name in scenario.objects:
+ scenario.objects[name] = add_object(policy_id, name=name)
+ for name in scenario.actions:
+ scenario.actions[name] = add_action(policy_id, name=name)
+
+ for subject_name in scenario.subject_assignments:
+ for subject_category_name in scenario.subject_assignments[subject_name]:
+ subject_id = scenario.subjects[subject_name]
+ subject_cat_id = scenario.subject_categories[subject_category_name]
+ subject_data_id = scenario.subject_data[subject_category_name][scenario.subject_assignments[subject_name][subject_category_name]]
+ add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id)
+ for object_name in scenario.object_assignments:
+ for object_category_name in scenario.object_assignments[object_name]:
+ object_id = scenario.objects[object_name]
+ object_cat_id = scenario.object_categories[object_category_name]
+ object_data_id = scenario.object_data[object_category_name][scenario.object_assignments[object_name][object_category_name]]
+ add_object_assignments(policy_id, object_id, object_cat_id, object_data_id)
+ for action_name in scenario.action_assignments:
+ for action_category_name in scenario.action_assignments[action_name]:
+ action_id = scenario.actions[action_name]
+ action_cat_id = scenario.action_categories[action_category_name]
+ action_data_id = scenario.action_data[action_category_name][scenario.action_assignments[action_name][action_category_name]]
+ add_action_assignments(policy_id, action_id, action_cat_id, action_data_id)
+
+ for meta_rule_name in scenario.rules:
+ data_list = []
+ meta_rule_value = scenario.meta_rule[meta_rule_name]
+ for rule in scenario.rules[meta_rule_name]:
+ _meta_rule = list(meta_rule_value["value"])
+ for data_name in rule:
+ category_name = _meta_rule.pop(0)
+ if category_name in scenario.subject_categories:
+ data_list.append(scenario.subject_data[category_name][data_name])
+ elif category_name in scenario.object_categories:
+ data_list.append(scenario.object_data[category_name][data_name])
+ elif category_name in scenario.action_categories:
+ data_list.append(scenario.action_data[category_name][data_name])
+ add_rule(policy_id, meta_rule_value["id"], data_list)
+ return policy_id
+
+
+def create_pdp(policy_id=None):
+ if args.verbose:
+ logger.warning("Creating PDP {}".format(scenario.pdp_name))
+ projects = get_keystone_projects()
+ admin_project_id = None
+ for _project in projects['projects']:
+ if _project['name'] == "admin":
+ admin_project_id = _project['id']
+ assert admin_project_id
+ pdps = check_pdp()["pdps"]
+ for pdp_id, pdp_value in pdps.items():
+ if scenario.pdp_name == pdp_value["name"]:
+ update_pdp(pdp_id, policy_id=policy_id)
+ return pdp_id
+ _pdp_id = add_pdp(name=scenario.pdp_name, policy_id=policy_id)
+ map_to_keystone(pdp_id=_pdp_id, keystone_project_id=admin_project_id)
+ return _pdp_id
+
+if __name__ == "__main__":
+ model_id, meta_rule_list = create_model()
+ policy_id = create_policy(model_id, meta_rule_list)
+ pdp_id = create_pdp(policy_id)
diff --git a/moonv4/moon_interface/tests/apitests/scenario/mls.py b/moonv4/moon_interface/tests/apitests/scenario/mls.py
new file mode 100644
index 00000000..fab1d528
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/scenario/mls.py
@@ -0,0 +1,44 @@
+
+pdp_name = "pdp1"
+policy_name = "MLS Policy example"
+model_name = "MLS"
+
+subjects = {"user0": "", "user1": "", "user2": "", }
+objects = {"vm0": "", "vm1": "", }
+actions = {"start": "", "stop": ""}
+
+subject_categories = {"subject-security-level": "", }
+object_categories = {"object-security-level": "", }
+action_categories = {"action-type": "", }
+
+subject_data = {
+ "subject-security-level": {"low": "", "medium": "", "high": ""},
+}
+object_data = {
+ "object-security-level": {"low": "", "medium": "", "high": ""},
+}
+action_data = {"action-type": {"vm-action": "", "storage-action": "", }}
+
+subject_assignments = {
+ "user0": {"subject-security-level": "high"},
+ "user1": {"subject-security-level": "medium"},
+}
+object_assignments = {
+ "vm0": {"object-security-level": "medium"},
+ "vm1": {"object-security-level": "low"},
+}
+action_assignments = {
+ "start": {"action-type": "vm-action"},
+ "stop": {"action-type": "vm-action"}
+}
+
+meta_rule = {
+ "mls": {"id": "", "value": ("subject-security-level", "object-security-level", "action-type")},
+}
+
+rules = {
+ "mls": (
+ ("high", "medium", "vm-action"),
+ ("medium", "low", "vm-action"),
+ )
+}
diff --git a/moonv4/moon_interface/tests/apitests/scenario/rbac.py b/moonv4/moon_interface/tests/apitests/scenario/rbac.py
new file mode 100644
index 00000000..073f1d65
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/scenario/rbac.py
@@ -0,0 +1,32 @@
+
+pdp_name = "pdp1"
+policy_name = "RBAC policy example"
+model_name = "RBAC"
+
+subjects = {"user0": "", "user1": "", }
+objects = {"vm0": "", }
+actions = {"start": "", "stop": ""}
+
+subject_categories = {"role": "", }
+object_categories = {"id": "", }
+action_categories = {"action-type": "", }
+
+subject_data = {"role": {"admin": "", "employee": ""}}
+object_data = {"id": {"vm1": "", "vm2": ""}}
+action_data = {"action-type": {"vm-action": "", }}
+
+subject_assignments = {"user0": {"role": "admin"}, "user1": {"role": "employee"}, }
+object_assignments = {"vm0": {"id": "vm1"}}
+action_assignments = {"start": {"action-type": "vm-action"}, "stop": {"action-type": "vm-action"}}
+
+meta_rule = {
+ "rbac": {"id": "", "value": ("role", "id", "action-type")},
+}
+
+rules = {
+ "rbac": (
+ ("admin", "vm1", "vm-action"),
+ )
+}
+
+
diff --git a/moonv4/moon_interface/tests/apitests/scenario/rbac_mls.py b/moonv4/moon_interface/tests/apitests/scenario/rbac_mls.py
new file mode 100644
index 00000000..8a5362ea
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/scenario/rbac_mls.py
@@ -0,0 +1,50 @@
+
+pdp_name = "pdp1"
+policy_name = "Multi policy example"
+model_name = "RBAC"
+
+subjects = {"user0": "", "user1": "", "user2": "", }
+objects = {"vm0": "", "vm1": "", }
+actions = {"start": "", "stop": ""}
+
+subject_categories = {"role": "", "subject-security-level": "", }
+object_categories = {"id": "", "object-security-level": "", }
+action_categories = {"action-type": "", }
+
+subject_data = {
+ "role": {"admin": "", "employee": ""},
+ "subject-security-level": {"low": "", "medium": "", "high": ""},
+}
+object_data = {
+ "id": {"vm1": "", "vm2": ""},
+ "object-security-level": {"low": "", "medium": "", "high": ""},
+}
+action_data = {"action-type": {"vm-action": "", "storage-action": "", }}
+
+subject_assignments = {
+ "user0": {"role": "admin", "subject-security-level": "high"},
+ "user1": {"role": "employee", "subject-security-level": "medium"},
+}
+object_assignments = {
+ "vm0": {"id": "vm1", "object-security-level": "medium"},
+ "vm1": {"id": "vm2", "object-security-level": "low"},
+}
+action_assignments = {
+ "start": {"action-type": "vm-action"},
+ "stop": {"action-type": "vm-action"}
+}
+
+meta_rule = {
+ "rbac": {"id": "", "value": ("role", "id", "action-type")},
+ "mls": {"id": "", "value": ("subject-security-level", "object-security-level", "action-type")},
+}
+
+rules = {
+ "rbac": (
+ ("admin", "vm1", "vm-action"),
+ ),
+ "mls": (
+ ("high", "medium", "vm-action"),
+ ("medium", "low", "vm-action"),
+ )
+}
diff --git a/moonv4/moon_interface/tests/apitests/test_models.py b/moonv4/moon_interface/tests/apitests/test_models.py
new file mode 100644
index 00000000..0da40ce5
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/test_models.py
@@ -0,0 +1,37 @@
+from utils.models import *
+
+
+def test_models():
+ check_model()
+ model_id = add_model()
+ check_model(model_id)
+ delete_model(model_id)
+
+
+def test_meta_data_subject():
+ category_id = add_subject_category()
+ check_subject_category(category_id)
+ # TODO (asteroide): must implement the deletion of linked data
+ # delete_subject_category(category_id)
+
+
+def test_meta_data_object():
+ category_id = add_object_category()
+ check_object_category(category_id)
+ # TODO (asteroide): must implement the deletion of linked data
+ # delete_object_category(category_id)
+
+
+def test_meta_data_action():
+ category_id = add_action_category()
+ check_action_category(category_id)
+ # TODO (asteroide): must implement the deletion of linked data
+ # delete_action_category(category_id)
+
+
+def test_meta_rule():
+ meta_rule_id, scat_id, ocat_id, acat_id = add_categories_and_meta_rule()
+ check_meta_rule(meta_rule_id, scat_id, ocat_id, acat_id)
+ delete_meta_rule(meta_rule_id)
+
+
diff --git a/moonv4/moon_interface/tests/apitests/test_pdp.py b/moonv4/moon_interface/tests/apitests/test_pdp.py
new file mode 100644
index 00000000..6cd5365b
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/test_pdp.py
@@ -0,0 +1,16 @@
+from utils.pdp import *
+
+
+def test_pdp():
+ projects = get_keystone_projects()
+ admin_project_id = None
+ for _project in projects['projects']:
+ if _project['name'] == "admin":
+ admin_project_id = _project['id']
+ assert admin_project_id
+ check_pdp()
+ pdp_id = add_pdp()
+ check_pdp(pdp_id)
+ map_to_keystone(pdp_id=pdp_id, keystone_project_id=admin_project_id)
+ check_pdp(pdp_id=pdp_id, keystone_project_id=admin_project_id)
+ delete_pdp(pdp_id)
diff --git a/moonv4/moon_interface/tests/apitests/test_policies.py b/moonv4/moon_interface/tests/apitests/test_policies.py
new file mode 100644
index 00000000..310aad66
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/test_policies.py
@@ -0,0 +1,154 @@
+from utils.policies import *
+from utils.models import *
+
+
+def test_policies():
+ check_policy()
+ policy_id = add_policy()
+ check_policy(policy_id)
+ delete_policy(policy_id)
+
+
+def test_subjects():
+ policy_id = add_policy()
+ subject_id = add_subject()
+
+ update_subject(subject_id=subject_id, policy_id=policy_id)
+
+ check_subject(subject_id=subject_id, policy_id=policy_id)
+
+ delete_subject(subject_id, policy_id=policy_id)
+ delete_subject(subject_id)
+
+
+def test_objects():
+ policy_id = add_policy()
+ object_id = add_object()
+
+ update_object(object_id=object_id, policy_id=policy_id)
+ check_object(object_id=object_id, policy_id=policy_id)
+
+ delete_object(object_id=object_id, policy_id=policy_id)
+ delete_object(object_id=object_id)
+
+
+def test_actions():
+ policy_id = add_policy()
+ action_id = add_action()
+
+ update_action(action_id=action_id, policy_id=policy_id)
+ check_action(action_id=action_id, policy_id=policy_id)
+
+ delete_action(action_id=action_id, policy_id=policy_id)
+ delete_action(action_id=action_id)
+
+
+def test_subject_data():
+ policy_id = add_policy()
+
+ model_id = add_model()
+
+ update_policy(policy_id, model_id)
+
+ meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule()
+ add_meta_rule_to_model(model_id, meta_rule_id)
+
+ subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id)
+ check_subject_data(policy_id=policy_id, data_id=subject_data_id, category_id=subject_cat_id)
+
+
+def test_object_data():
+ policy_id = add_policy()
+
+ model_id = add_model()
+
+ update_policy(policy_id, model_id)
+
+ meta_rule_id, object_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule()
+ add_meta_rule_to_model(model_id, meta_rule_id)
+
+ object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id)
+ check_object_data(policy_id=policy_id, data_id=object_data_id, category_id=object_cat_id)
+
+
+def test_action_data():
+ policy_id = add_policy()
+
+ model_id = add_model()
+
+ update_policy(policy_id, model_id)
+
+ meta_rule_id, action_cat_id, action_cat_id, action_cat_id = add_categories_and_meta_rule()
+ add_meta_rule_to_model(model_id, meta_rule_id)
+
+ action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id)
+ check_action_data(policy_id=policy_id, data_id=action_data_id, category_id=action_cat_id)
+
+
+def test_assignments():
+ policy_id = add_policy()
+
+ model_id = add_model()
+
+ update_policy(policy_id, model_id)
+
+ meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule()
+ add_meta_rule_to_model(model_id, meta_rule_id)
+
+ subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id)
+ subject_data_id_bis = add_subject_data(policy_id=policy_id, category_id=subject_cat_id)
+ object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id)
+ object_data_id_bis = add_object_data(policy_id=policy_id, category_id=object_cat_id)
+ action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id)
+ action_data_id_bis = add_action_data(policy_id=policy_id, category_id=action_cat_id)
+
+ subject_id = add_subject(policy_id)
+ object_id = add_object(policy_id)
+ action_id = add_action(policy_id)
+
+ add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id)
+ add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id_bis)
+ add_object_assignments(policy_id, object_id, object_cat_id, object_data_id)
+ add_object_assignments(policy_id, object_id, object_cat_id, object_data_id_bis)
+ add_action_assignments(policy_id, action_id, action_cat_id, action_data_id)
+ add_action_assignments(policy_id, action_id, action_cat_id, action_data_id_bis)
+
+ check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id)
+ check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id_bis)
+ check_object_assignments(policy_id, object_id, object_cat_id, object_data_id)
+ check_object_assignments(policy_id, object_id, object_cat_id, object_data_id_bis)
+ check_action_assignments(policy_id, action_id, action_cat_id, action_data_id)
+ check_action_assignments(policy_id, action_id, action_cat_id, action_data_id_bis)
+
+ delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id)
+ delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id)
+ delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id)
+
+
+def test_rule():
+ policy_id = add_policy()
+
+ model_id = add_model()
+
+ update_policy(policy_id, model_id)
+
+ meta_rule_id, subject_cat_id, object_cat_id, action_cat_id = add_categories_and_meta_rule()
+ add_meta_rule_to_model(model_id, meta_rule_id)
+
+ subject_data_id = add_subject_data(policy_id=policy_id, category_id=subject_cat_id)
+ object_data_id = add_object_data(policy_id=policy_id, category_id=object_cat_id)
+ action_data_id = add_action_data(policy_id=policy_id, category_id=action_cat_id)
+
+ subject_id = add_subject(policy_id)
+ object_id = add_object(policy_id)
+ action_id = add_action(policy_id)
+
+ add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id)
+ add_object_assignments(policy_id, object_id, object_cat_id, object_data_id)
+ add_action_assignments(policy_id, action_id, action_cat_id, action_data_id)
+
+ rule_id = add_rule(policy_id, meta_rule_id, [subject_data_id, object_data_id, action_data_id])
+ check_rule(policy_id, meta_rule_id, rule_id, [subject_data_id, object_data_id, action_data_id])
+
+ delete_rule(policy_id, rule_id)
+
diff --git a/moonv4/moon_interface/tests/apitests/utils/__init__.py b/moonv4/moon_interface/tests/apitests/utils/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/utils/__init__.py
diff --git a/moonv4/moon_interface/tests/apitests/utils/models.py b/moonv4/moon_interface/tests/apitests/utils/models.py
new file mode 100644
index 00000000..1ba6e440
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/utils/models.py
@@ -0,0 +1,260 @@
+import requests
+import copy
+
+URL = "http://172.18.0.11:38001{}"
+HEADERS = {"content-type": "application/json"}
+
+model_template = {
+ "name": "test_model",
+ "description": "test",
+ "meta_rules": []
+}
+
+category_template = {
+ "name": "name of the category",
+ "description": "description of the category"
+}
+
+meta_rule_template = {
+ "name": "test_meta_rule",
+ "subject_categories": [],
+ "object_categories": [],
+ "action_categories": []
+}
+
+
+def check_model(model_id=None):
+ req = requests.get(URL.format("/models"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "models" in result
+ if model_id:
+ assert result["models"]
+ assert model_id in result['models']
+ assert "name" in result['models'][model_id]
+ assert model_template["name"] == result['models'][model_id]["name"]
+ return result['models']
+
+
+def add_model(name=None):
+ if name:
+ model_template['name'] = name
+ req = requests.post(URL.format("/models"), json=model_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ model_id = list(result['models'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['models'][model_id]
+ assert model_template["name"] == result['models'][model_id]["name"]
+ return model_id
+
+
+def delete_model(model_id):
+ req = requests.delete(URL.format("/models/{}".format(model_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "result" in result
+ assert result["result"]
+
+
+def add_subject_category(name="subject_cat_1"):
+ category_template["name"] = name
+ req = requests.post(URL.format("/subject_categories"), json=category_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "subject_categories" in result
+ category_id = list(result['subject_categories'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['subject_categories'][category_id]
+ assert category_template["name"] == result['subject_categories'][category_id]["name"]
+ return category_id
+
+
+def check_subject_category(category_id):
+ req = requests.get(URL.format("/subject_categories"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "subject_categories" in result
+ if "result" in result:
+ assert result["result"]
+ assert category_id in result['subject_categories']
+ assert "name" in result['subject_categories'][category_id]
+ assert category_template["name"] == result['subject_categories'][category_id]["name"]
+
+
+def delete_subject_category(category_id):
+ req = requests.delete(URL.format("/subject_categories/{}".format(category_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ if "result" in result:
+ assert result["result"]
+
+
+def add_object_category(name="object_cat_1"):
+ category_template["name"] = name
+ req = requests.post(URL.format("/object_categories"), json=category_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "object_categories" in result
+ category_id = list(result['object_categories'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['object_categories'][category_id]
+ assert category_template["name"] == result['object_categories'][category_id]["name"]
+ return category_id
+
+
+def check_object_category(category_id):
+ req = requests.get(URL.format("/object_categories"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "object_categories" in result
+ if "result" in result:
+ assert result["result"]
+ assert category_id in result['object_categories']
+ assert "name" in result['object_categories'][category_id]
+ assert category_template["name"] == result['object_categories'][category_id]["name"]
+
+
+def delete_object_category(category_id):
+ req = requests.delete(URL.format("/object_categories/{}".format(category_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ if "result" in result:
+ assert result["result"]
+
+
+def add_action_category(name="action_cat_1"):
+ category_template["name"] = name
+ req = requests.post(URL.format("/action_categories"), json=category_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "action_categories" in result
+ category_id = list(result['action_categories'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['action_categories'][category_id]
+ assert category_template["name"] == result['action_categories'][category_id]["name"]
+ return category_id
+
+
+def check_action_category(category_id):
+ req = requests.get(URL.format("/action_categories"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "action_categories" in result
+ if "result" in result:
+ assert result["result"]
+ assert category_id in result['action_categories']
+ assert "name" in result['action_categories'][category_id]
+ assert category_template["name"] == result['action_categories'][category_id]["name"]
+
+
+def delete_action_category(category_id):
+ req = requests.delete(URL.format("/action_categories/{}".format(category_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ if "result" in result:
+ assert result["result"]
+
+
+def add_categories_and_meta_rule(name="test_meta_rule"):
+ scat_id = add_subject_category()
+ ocat_id = add_object_category()
+ acat_id = add_action_category()
+ _meta_rule_template = copy.deepcopy(meta_rule_template)
+ _meta_rule_template["name"] = name
+ _meta_rule_template["subject_categories"].append(scat_id)
+ _meta_rule_template["object_categories"].append(ocat_id)
+ _meta_rule_template["action_categories"].append(acat_id)
+ req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "meta_rules" in result
+ meta_rule_id = list(result['meta_rules'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['meta_rules'][meta_rule_id]
+ assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"]
+ return meta_rule_id, scat_id, ocat_id, acat_id
+
+
+def add_meta_rule(name="test_meta_rule", scat=[], ocat=[], acat=[]):
+ _meta_rule_template = copy.deepcopy(meta_rule_template)
+ _meta_rule_template["name"] = name
+ _meta_rule_template["subject_categories"] = []
+ _meta_rule_template["subject_categories"].extend(scat)
+ _meta_rule_template["object_categories"] = []
+ _meta_rule_template["object_categories"].extend(ocat)
+ _meta_rule_template["action_categories"] = []
+ _meta_rule_template["action_categories"].extend(acat)
+ req = requests.post(URL.format("/meta_rules"), json=_meta_rule_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "meta_rules" in result
+ meta_rule_id = list(result['meta_rules'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['meta_rules'][meta_rule_id]
+ assert _meta_rule_template["name"] == result['meta_rules'][meta_rule_id]["name"]
+ return meta_rule_id
+
+
+def check_meta_rule(meta_rule_id, scat_id=None, ocat_id=None, acat_id=None):
+ req = requests.get(URL.format("/meta_rules"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "meta_rules" in result
+ if "result" in result:
+ assert result["result"]
+ assert meta_rule_id in result['meta_rules']
+ assert "name" in result['meta_rules'][meta_rule_id]
+ if scat_id:
+ assert scat_id in result['meta_rules'][meta_rule_id]["subject_categories"]
+ if ocat_id:
+ assert ocat_id in result['meta_rules'][meta_rule_id]["object_categories"]
+ if acat_id:
+ assert acat_id in result['meta_rules'][meta_rule_id]["action_categories"]
+
+
+def delete_meta_rule(meta_rule_id):
+ req = requests.delete(URL.format("/meta_rules/{}".format(meta_rule_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ if "result" in result:
+ assert result["result"]
+
+
+def add_meta_rule_to_model(model_id, meta_rule_id):
+ model = check_model(model_id)
+ meta_rule_list = model[model_id]["meta_rules"]
+ meta_rule_list.append(meta_rule_id)
+ req = requests.patch(URL.format("/models/{}".format(model_id)),
+ json={"meta_rules": meta_rule_list},
+ headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ model_id = list(result['models'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "meta_rules" in result['models'][model_id]
+ assert meta_rule_list == result['models'][model_id]["meta_rules"]
diff --git a/moonv4/moon_interface/tests/apitests/utils/pdp.py b/moonv4/moon_interface/tests/apitests/utils/pdp.py
new file mode 100644
index 00000000..81b50e38
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/utils/pdp.py
@@ -0,0 +1,145 @@
+import requests
+
+URL = "http://172.18.0.11:38001{}"
+HEADERS = {"content-type": "application/json"}
+KEYSTONE_USER = "admin"
+KEYSTONE_PASSWORD = "p4ssw0rd"
+KEYSTONE_PROJECT = "admin"
+
+pdp_template = {
+ "name": "test_pdp",
+ "security_pipeline": [],
+ "keystone_project_id": "",
+ "description": "test",
+}
+
+
+def get_keystone_projects():
+ server = "keystone"
+
+ HEADERS = {
+ "Content-Type": "application/json"
+ }
+
+ data_auth = {
+ "auth": {
+ "identity": {
+ "methods": [
+ "password"
+ ],
+ "password": {
+ "user": {
+ "domain": {
+ "id": "Default"
+ },
+ "name": KEYSTONE_USER,
+ "password": KEYSTONE_PASSWORD
+ }
+ }
+ },
+ "scope": {
+ "project": {
+ "domain": {
+ "id": "Default"
+ },
+ "name": KEYSTONE_PROJECT
+ }
+ }
+ }
+ }
+
+ req = requests.post("http://{}:5000/v3/auth/tokens".format(server), json=data_auth, headers=HEADERS)
+
+ assert req.status_code in (200, 201)
+ TOKEN = req.headers['X-Subject-Token']
+ HEADERS['X-Auth-Token'] = TOKEN
+ req = requests.get("http://{}:5000/v3/projects".format(server), headers=HEADERS)
+ assert req.status_code in (200, 201)
+ return req.json()
+
+
+def check_pdp(pdp_id=None, keystone_project_id=None):
+ req = requests.get(URL.format("/pdp"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "pdps" in result
+ if pdp_id:
+ assert result["pdps"]
+ assert pdp_id in result['pdps']
+ assert "name" in result['pdps'][pdp_id]
+ assert pdp_template["name"] == result['pdps'][pdp_id]["name"]
+ if keystone_project_id:
+ assert result["pdps"]
+ assert pdp_id in result['pdps']
+ assert "keystone_project_id" in result['pdps'][pdp_id]
+ assert keystone_project_id == result['pdps'][pdp_id]["keystone_project_id"]
+ return result
+
+
+def add_pdp(name="test_pdp", policy_id=None):
+ pdp_template['name'] = name
+ if policy_id:
+ pdp_template['security_pipeline'].append(policy_id)
+ req = requests.post(URL.format("/pdp"), json=pdp_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ pdp_id = list(result['pdps'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['pdps'][pdp_id]
+ assert pdp_template["name"] == result['pdps'][pdp_id]["name"]
+ return pdp_id
+
+
+def update_pdp(pdp_id, policy_id=None):
+ req = requests.get(URL.format("/pdp/{}".format(pdp_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "pdps" in result
+ assert pdp_id in result['pdps']
+ pipeline = result['pdps'][pdp_id]["security_pipeline"]
+ if policy_id not in pipeline:
+ pipeline.append(policy_id)
+ req = requests.patch(URL.format("/pdp/{}".format(pdp_id)),
+ json={"security_pipeline": pipeline})
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "pdps" in result
+ assert pdp_id in result['pdps']
+
+ req = requests.get(URL.format("/pdp/{}".format(pdp_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "pdps" in result
+ assert pdp_id in result['pdps']
+ assert policy_id in pipeline
+
+
+def map_to_keystone(pdp_id, keystone_project_id):
+ req = requests.patch(URL.format("/pdp/{}".format(pdp_id)), json={"keystone_project_id": keystone_project_id},
+ headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ if "result" in result:
+ assert result["result"]
+ assert pdp_id in result['pdps']
+ assert "name" in result['pdps'][pdp_id]
+ assert pdp_template["name"] == result['pdps'][pdp_id]["name"]
+ return pdp_id
+
+
+def delete_pdp(pdp_id):
+ req = requests.delete(URL.format("/pdp/{}".format(pdp_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "result" in result
+ assert result["result"]
+
+
diff --git a/moonv4/moon_interface/tests/apitests/utils/policies.py b/moonv4/moon_interface/tests/apitests/utils/policies.py
new file mode 100644
index 00000000..67be91e5
--- /dev/null
+++ b/moonv4/moon_interface/tests/apitests/utils/policies.py
@@ -0,0 +1,581 @@
+import requests
+
+URL = "http://172.18.0.11:38001{}"
+HEADERS = {"content-type": "application/json"}
+FILE = open("/tmp/test.log", "w")
+
+policy_template = {
+ "name": "test_policy",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+}
+
+subject_template = {
+ "name": "test_subject",
+ "description": "test",
+ "email": "mail",
+ "password": "my_pass",
+}
+
+object_template = {
+ "name": "test_subject",
+ "description": "test"
+}
+
+action_template = {
+ "name": "test_subject",
+ "description": "test"
+}
+
+subject_data_template = {
+ "name": "subject_data1",
+ "description": "description of the data subject"
+}
+
+object_data_template = {
+ "name": "object_data1",
+ "description": "description of the data subject"
+}
+
+action_data_template = {
+ "name": "action_data1",
+ "description": "description of the data subject"
+}
+
+subject_assignment_template = {
+ "id": "",
+ "category_id": "",
+ "scope_id": ""
+}
+
+
+def check_policy(policy_id=None):
+ req = requests.get(URL.format("/policies"))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "policies" in result
+ if policy_id:
+ assert result["policies"]
+ assert policy_id in result['policies']
+ assert "name" in result['policies'][policy_id]
+ assert policy_template["name"] == result['policies'][policy_id]["name"]
+
+
+def add_policy(name="test_policy"):
+ policy_template["name"] = name
+ req = requests.post(URL.format("/policies"), json=policy_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ policy_id = list(result['policies'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "name" in result['policies'][policy_id]
+ assert policy_template["name"] == result['policies'][policy_id]["name"]
+ return policy_id
+
+
+def update_policy(policy_id, model_id):
+ req = requests.patch(URL.format("/policies/{}".format(policy_id)),
+ json={"model_id": model_id}, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ policy_id = list(result['policies'].keys())[0]
+ if "result" in result:
+ assert result["result"]
+ assert "model_id" in result['policies'][policy_id]
+ assert model_id == result['policies'][policy_id]["model_id"]
+
+
+def delete_policy(policy_id):
+ req = requests.delete(URL.format("/policies/{}".format(policy_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "result" in result
+ assert result["result"]
+
+
+def add_subject(policy_id=None, name="test_subject"):
+ subject_template['name'] = name
+ if policy_id:
+ req = requests.post(URL.format("/policies/{}/subjects".format(policy_id)),
+ json=subject_template, headers=HEADERS)
+ else:
+ req = requests.post(URL.format("/subjects"), json=subject_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "subjects" in result
+ subject_id = list(result['subjects'].keys())[0]
+ return subject_id
+
+
+def update_subject(subject_id, policy_id=None, description=None):
+ if policy_id and not description:
+ req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)),
+ json={})
+ elif policy_id and description:
+ req = requests.patch(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)),
+ json={"description": description})
+ else:
+ req = requests.patch(URL.format("/subjects/{}".format(subject_id)),
+ json={"description": description})
+ assert req.status_code == 200
+ result = req.json()
+ assert "subjects" in result
+ assert "name" in result["subjects"][subject_id]
+ assert subject_template["name"] == result["subjects"][subject_id]["name"]
+ assert "policy_list" in result["subjects"][subject_id]
+ if policy_id:
+ assert policy_id in result["subjects"][subject_id]["policy_list"]
+ if description:
+ assert description in result["subjects"][subject_id]["description"]
+
+
+def check_subject(subject_id=None, policy_id=None):
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/subjects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/subjects"))
+ assert req.status_code == 200
+ result = req.json()
+ assert "subjects" in result
+ assert "name" in result["subjects"][subject_id]
+ assert subject_template["name"] == result["subjects"][subject_id]["name"]
+ if policy_id:
+ assert "policy_list" in result["subjects"][subject_id]
+ assert policy_id in result["subjects"][subject_id]["policy_list"]
+
+
+def delete_subject(subject_id, policy_id=None):
+ if policy_id:
+ req = requests.delete(URL.format("/policies/{}/subjects/{}".format(policy_id, subject_id)))
+ else:
+ req = requests.delete(URL.format("/subjects/{}".format(subject_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "result" in result
+ assert result["result"]
+
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/subjects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/subjects"))
+ assert req.status_code == 200
+ result = req.json()
+ assert "subjects" in result
+ if subject_id in result["subjects"]:
+ assert "name" in result["subjects"][subject_id]
+ assert subject_template["name"] == result["subjects"][subject_id]["name"]
+ if policy_id:
+ assert "policy_list" in result["subjects"][subject_id]
+ assert policy_id not in result["subjects"][subject_id]["policy_list"]
+
+
+def add_object(policy_id=None, name="test_object"):
+ object_template['name'] = name
+ if policy_id:
+ req = requests.post(URL.format("/policies/{}/objects".format(policy_id)),
+ json=object_template, headers=HEADERS)
+ else:
+ req = requests.post(URL.format("/objects"), json=object_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "objects" in result
+ object_id = list(result['objects'].keys())[0]
+ return object_id
+
+
+def update_object(object_id, policy_id):
+ req = requests.patch(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)), json={})
+ assert req.status_code == 200
+ result = req.json()
+ assert "objects" in result
+ assert "name" in result["objects"][object_id]
+ assert object_template["name"] == result["objects"][object_id]["name"]
+ assert "policy_list" in result["objects"][object_id]
+ assert policy_id in result["objects"][object_id]["policy_list"]
+
+
+def check_object(object_id=None, policy_id=None):
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/objects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/objects"))
+ assert req.status_code == 200
+ result = req.json()
+ assert "objects" in result
+ assert "name" in result["objects"][object_id]
+ assert object_template["name"] == result["objects"][object_id]["name"]
+ if policy_id:
+ assert "policy_list" in result["objects"][object_id]
+ assert policy_id in result["objects"][object_id]["policy_list"]
+
+
+def delete_object(object_id, policy_id=None):
+ if policy_id:
+ req = requests.delete(URL.format("/policies/{}/objects/{}".format(policy_id, object_id)))
+ else:
+ req = requests.delete(URL.format("/objects/{}".format(object_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "result" in result
+ assert result["result"]
+
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/objects".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/objects"))
+ assert req.status_code == 200
+ result = req.json()
+ assert "objects" in result
+ if object_id in result["objects"]:
+ assert "name" in result["objects"][object_id]
+ assert object_template["name"] == result["objects"][object_id]["name"]
+ if policy_id:
+ assert "policy_list" in result["objects"][object_id]
+ assert policy_id not in result["objects"][object_id]["policy_list"]
+
+
+def add_action(policy_id=None, name="test_action"):
+ action_template['name'] = name
+ if policy_id:
+ req = requests.post(URL.format("/policies/{}/actions".format(policy_id)),
+ json=action_template, headers=HEADERS)
+ else:
+ req = requests.post(URL.format("/actions"), json=action_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "actions" in result
+ action_id = list(result['actions'].keys())[0]
+ return action_id
+
+
+def update_action(action_id, policy_id):
+ req = requests.patch(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)), json={})
+ assert req.status_code == 200
+ result = req.json()
+ assert "actions" in result
+ assert "name" in result["actions"][action_id]
+ assert action_template["name"] == result["actions"][action_id]["name"]
+ assert "policy_list" in result["actions"][action_id]
+ assert policy_id in result["actions"][action_id]["policy_list"]
+
+
+def check_action(action_id=None, policy_id=None):
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/actions".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/actions"))
+ assert req.status_code == 200
+ result = req.json()
+ assert "actions" in result
+ assert "name" in result["actions"][action_id]
+ assert action_template["name"] == result["actions"][action_id]["name"]
+ if policy_id:
+ assert "policy_list" in result["actions"][action_id]
+ assert policy_id in result["actions"][action_id]["policy_list"]
+
+
+def delete_action(action_id, policy_id=None):
+ if policy_id:
+ req = requests.delete(URL.format("/policies/{}/actions/{}".format(policy_id, action_id)))
+ else:
+ req = requests.delete(URL.format("/actions/{}".format(action_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert type(result) is dict
+ assert "result" in result
+ assert result["result"]
+
+ if policy_id:
+ req = requests.get(URL.format("/policies/{}/actions".format(policy_id)))
+ else:
+ req = requests.get(URL.format("/actions"))
+ assert req.status_code == 200
+ result = req.json()
+ assert "actions" in result
+ if action_id in result["actions"]:
+ assert "name" in result["actions"][action_id]
+ assert action_template["name"] == result["actions"][action_id]["name"]
+ if policy_id:
+ assert "policy_list" in result["actions"][action_id]
+ assert policy_id not in result["actions"][action_id]["policy_list"]
+
+
+def add_subject_data(policy_id, category_id, name="subject_data1"):
+ subject_data_template['name'] = name
+ req = requests.post(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)),
+ json=subject_data_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "subject_data" in result
+ subject_id = list(result['subject_data']['data'].keys())[0]
+ return subject_id
+
+
+def check_subject_data(policy_id, data_id, category_id):
+ req = requests.get(URL.format("/policies/{}/subject_data/{}".format(policy_id, category_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "subject_data" in result
+ for _data in result['subject_data']:
+ assert data_id in list(_data['data'].keys())
+ assert category_id == _data["category_id"]
+
+
+def add_object_data(policy_id, category_id, name="object_data1"):
+ object_data_template['name'] = name
+ req = requests.post(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)),
+ json=object_data_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "object_data" in result
+ object_id = list(result['object_data']['data'].keys())[0]
+ return object_id
+
+
+def check_object_data(policy_id, data_id, category_id):
+ req = requests.get(URL.format("/policies/{}/object_data/{}".format(policy_id, category_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "object_data" in result
+ for _data in result['object_data']:
+ assert data_id in list(_data['data'].keys())
+ assert category_id == _data["category_id"]
+
+
+def add_action_data(policy_id, category_id, name="action_data1"):
+ action_data_template['name'] = name
+ req = requests.post(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)),
+ json=action_data_template, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "action_data" in result
+ action_id = list(result['action_data']['data'].keys())[0]
+ return action_id
+
+
+def check_action_data(policy_id, data_id, category_id):
+ req = requests.get(URL.format("/policies/{}/action_data/{}".format(policy_id, category_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "action_data" in result
+ for _data in result['action_data']:
+ assert data_id in list(_data['data'].keys())
+ assert category_id == _data["category_id"]
+
+
+def add_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id):
+ req = requests.post(URL.format("/policies/{}/subject_assignments".format(policy_id)),
+ json={
+ "id": subject_id,
+ "category_id": subject_cat_id,
+ "data_id": subject_data_id
+ }, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "subject_assignments" in result
+ assert result["subject_assignments"]
+
+
+def check_subject_assignments(policy_id, subject_id, subject_cat_id, subject_data_id):
+ req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, subject_id, subject_cat_id, subject_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "subject_assignments" in result
+ assert result["subject_assignments"]
+ for key in result["subject_assignments"]:
+ assert "subject_id" in result["subject_assignments"][key]
+ assert "category_id" in result["subject_assignments"][key]
+ assert "assignments" in result["subject_assignments"][key]
+ if result["subject_assignments"][key]['subject_id'] == subject_id and \
+ result["subject_assignments"][key]["category_id"] == subject_cat_id:
+ assert subject_data_id in result["subject_assignments"][key]["assignments"]
+
+
+def check_object_assignments(policy_id, object_id, object_cat_id, object_data_id):
+ req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, object_id, object_cat_id, object_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "object_assignments" in result
+ assert result["object_assignments"]
+ for key in result["object_assignments"]:
+ assert "object_id" in result["object_assignments"][key]
+ assert "category_id" in result["object_assignments"][key]
+ assert "assignments" in result["object_assignments"][key]
+ if result["object_assignments"][key]['object_id'] == object_id and \
+ result["object_assignments"][key]["category_id"] == object_cat_id:
+ assert object_data_id in result["object_assignments"][key]["assignments"]
+
+
+def check_action_assignments(policy_id, action_id, action_cat_id, action_data_id):
+ req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, action_cat_id, action_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "action_assignments" in result
+ assert result["action_assignments"]
+ for key in result["action_assignments"]:
+ assert "action_id" in result["action_assignments"][key]
+ assert "category_id" in result["action_assignments"][key]
+ assert "assignments" in result["action_assignments"][key]
+ if result["action_assignments"][key]['action_id'] == action_id and \
+ result["action_assignments"][key]["category_id"] == action_cat_id:
+ assert action_data_id in result["action_assignments"][key]["assignments"]
+
+
+def add_object_assignments(policy_id, object_id, object_cat_id, object_data_id):
+ req = requests.post(URL.format("/policies/{}/object_assignments".format(policy_id)),
+ json={
+ "id": object_id,
+ "category_id": object_cat_id,
+ "data_id": object_data_id
+ }, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "object_assignments" in result
+ assert result["object_assignments"]
+
+
+def add_action_assignments(policy_id, action_id, action_cat_id, action_data_id):
+ req = requests.post(URL.format("/policies/{}/action_assignments".format(policy_id)),
+ json={
+ "id": action_id,
+ "category_id": action_cat_id,
+ "data_id": action_data_id
+ }, headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "action_assignments" in result
+ assert result["action_assignments"]
+
+
+def delete_subject_assignment(policy_id, subject_id, subject_cat_id, subject_data_id):
+ req = requests.delete(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, subject_id, subject_cat_id, subject_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "result" in result
+ assert result["result"]
+
+ req = requests.get(URL.format("/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, subject_id, subject_cat_id, subject_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "subject_assignments" in result
+ assert result["subject_assignments"]
+ for key in result["subject_assignments"]:
+ assert "subject_id" in result["subject_assignments"][key]
+ assert "category_id" in result["subject_assignments"][key]
+ assert "assignments" in result["subject_assignments"][key]
+ if result["subject_assignments"][key]['subject_id'] == subject_id and \
+ result["subject_assignments"][key]["category_id"] == subject_cat_id:
+ assert subject_data_id not in result["subject_assignments"][key]["assignments"]
+
+
+def delete_object_assignment(policy_id, object_id, object_cat_id, object_data_id):
+ req = requests.delete(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, object_id, object_cat_id, object_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "result" in result
+ assert result["result"]
+
+ req = requests.get(URL.format("/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, object_id, object_cat_id, object_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "object_assignments" in result
+ assert result["object_assignments"]
+ for key in result["object_assignments"]:
+ assert "object_id" in result["object_assignments"][key]
+ assert "category_id" in result["object_assignments"][key]
+ assert "assignments" in result["object_assignments"][key]
+ if result["object_assignments"][key]['object_id'] == object_id and \
+ result["object_assignments"][key]["category_id"] == object_cat_id:
+ assert object_data_id not in result["object_assignments"][key]["assignments"]
+
+
+def delete_action_assignment(policy_id, action_id, action_cat_id, action_data_id):
+ req = requests.delete(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, action_cat_id, action_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "result" in result
+ assert result["result"]
+
+ req = requests.get(URL.format("/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, action_cat_id, action_data_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "action_assignments" in result
+ assert result["action_assignments"]
+ for key in result["action_assignments"]:
+ assert "action_id" in result["action_assignments"][key]
+ assert "category_id" in result["action_assignments"][key]
+ assert "assignments" in result["action_assignments"][key]
+ if result["action_assignments"][key]['action_id'] == action_id and \
+ result["action_assignments"][key]["category_id"] == action_cat_id:
+ assert action_data_id not in result["action_assignments"][key]["assignments"]
+
+
+def add_rule(policy_id, meta_rule_id, rule):
+ req = requests.post(URL.format("/policies/{}/rules".format(policy_id)),
+ json={
+ "meta_rule_id": meta_rule_id,
+ "rule": rule,
+ "enabled": True
+ },
+ headers=HEADERS)
+ assert req.status_code == 200
+ result = req.json()
+ assert "rules" in result
+ rule_id = list(result["rules"].keys())[0]
+ assert "policy_id" in result["rules"][rule_id]
+ assert policy_id == result["rules"][rule_id]["policy_id"]
+ assert "meta_rule_id" in result["rules"][rule_id]
+ assert meta_rule_id == result["rules"][rule_id]["meta_rule_id"]
+ assert rule == result["rules"][rule_id]["rule"]
+ return rule_id
+
+
+def check_rule(policy_id, meta_rule_id, rule_id, rule):
+ req = requests.get(URL.format("/policies/{}/rules".format(policy_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "rules" in result
+ assert "policy_id" in result["rules"]
+ assert policy_id == result["rules"]["policy_id"]
+ for item in result["rules"]["rules"]:
+ assert "meta_rule_id" in item
+ if meta_rule_id == item["meta_rule_id"]:
+ if rule_id == item["id"]:
+ assert rule == item["rule"]
+
+
+def delete_rule(policy_id, rule_id):
+ req = requests.delete(URL.format("/policies/{}/rules/{}".format(policy_id, rule_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "result" in result
+ assert result["result"]
+
+ req = requests.get(URL.format("/policies/{}/rules".format(policy_id)))
+ assert req.status_code == 200
+ result = req.json()
+ assert "rules" in result
+ assert "policy_id" in result["rules"]
+ assert policy_id == result["rules"]["policy_id"]
+ found_rule = False
+ for item in result["rules"]["rules"]:
+ if rule_id == item["id"]:
+ found_rule = True
+ assert not found_rule