aboutsummaryrefslogtreecommitdiffstats
path: root/python_moondb/tests/unit_python/policies
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2017-12-26 15:04:20 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-12-26 15:04:20 +0000
commit11835452d085ed40f7d110427458430705c4980e (patch)
treeabd80ebca62f072cd227cd13273ffa2a97e6c58f /python_moondb/tests/unit_python/policies
parent69910cdd22ec3b27f3e1f608b317f9683de1dcf6 (diff)
parent1b1e0d77ac23476a2d5bea1d63388402b768cc1e (diff)
Merge "add unit test for policy module"
Diffstat (limited to 'python_moondb/tests/unit_python/policies')
-rw-r--r--python_moondb/tests/unit_python/policies/__init__.py0
-rw-r--r--python_moondb/tests/unit_python/policies/mock_data.py45
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_assignments.py245
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_data.py513
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_policies.py161
5 files changed, 964 insertions, 0 deletions
diff --git a/python_moondb/tests/unit_python/policies/__init__.py b/python_moondb/tests/unit_python/policies/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python_moondb/tests/unit_python/policies/__init__.py
diff --git a/python_moondb/tests/unit_python/policies/mock_data.py b/python_moondb/tests/unit_python/policies/mock_data.py
new file mode 100644
index 00000000..b2642979
--- /dev/null
+++ b/python_moondb/tests/unit_python/policies/mock_data.py
@@ -0,0 +1,45 @@
+def create_meta_rule():
+ meta_rule_value = {
+ "name": "meta_rule1",
+ "algorithm": "name of the meta rule algorithm",
+ "subject_categories": ["subject_category_id1",
+ "subject_category_id2"],
+ "object_categories": ["object_category_id1"],
+ "action_categories": ["action_category_id1"]
+ }
+ return meta_rule_value
+
+
+def create_model(meta_rule_id):
+ value = {
+ "name": "test_model",
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+
+ }
+ return value
+
+
+def create_policy(model_id):
+ value = {
+ "name": "policy_1",
+ "model_id": model_id,
+ "genre": "authz",
+ "description": "test",
+ }
+ return value
+
+
+def get_policy_id():
+ import policies.test_policies as test_policies
+ import models.test_models as test_models
+ import models.test_meta_rules as test_meta_rules
+ meta_rule = test_meta_rules.add_meta_rule(value=create_meta_rule())
+ meta_rule_id = list(meta_rule.keys())[0]
+ model = test_models.add_model(value=create_model(meta_rule_id))
+ model_id = list(model.keys())[0]
+ value = create_policy(model_id)
+ policy = test_policies.add_policies(value)
+ assert policy
+ policy_id = list(policy.keys())[0]
+ return policy_id
diff --git a/python_moondb/tests/unit_python/policies/test_assignments.py b/python_moondb/tests/unit_python/policies/test_assignments.py
new file mode 100755
index 00000000..ccac205a
--- /dev/null
+++ b/python_moondb/tests/unit_python/policies/test_assignments.py
@@ -0,0 +1,245 @@
+def get_action_assignments(policy_id, action_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_action_assignments("", policy_id, action_id, category_id)
+
+
+def add_action_assignment(policy_id, action_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def delete_action_assignment(policy_id, action_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def get_object_assignments(policy_id, object_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_object_assignments("", policy_id, object_id, category_id)
+
+
+def add_object_assignment(policy_id, object_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def delete_object_assignment(policy_id, object_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def get_subject_assignments(policy_id, subject_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id)
+
+
+def add_subject_assignment(policy_id, subject_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
+
+def delete_subject_assignment(policy_id, subject_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
+
+def test_get_action_assignments(db):
+ policy_id = "admin"
+ action_id = "action_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ add_action_assignment(policy_id, action_id, category_id, data_id)
+ act_assignments = get_action_assignments(policy_id, action_id, category_id)
+ action_id_1 = list(act_assignments.keys())[0]
+ assert act_assignments[action_id_1]["policy_id"] == policy_id
+ assert act_assignments[action_id_1]["action_id"] == action_id
+ assert act_assignments[action_id_1]["category_id"] == category_id
+ assert len(act_assignments[action_id_1].get("assignments")) == 1
+ assert data_id in act_assignments[action_id_1].get("assignments")
+
+
+def test_get_action_assignments_by_policy_id(db):
+ policy_id = "admin"
+ action_id = "action_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ add_action_assignment(policy_id, action_id, category_id, data_id)
+ data_id = "data_id_2"
+ add_action_assignment(policy_id, action_id, category_id, data_id)
+ data_id = "data_id_3"
+ add_action_assignment(policy_id, action_id, category_id, data_id)
+ act_assignments = get_action_assignments(policy_id)
+ action_id_1 = list(act_assignments.keys())[0]
+ assert act_assignments[action_id_1]["policy_id"] == policy_id
+ assert act_assignments[action_id_1]["action_id"] == action_id
+ assert act_assignments[action_id_1]["category_id"] == category_id
+ assert len(act_assignments[action_id_1].get("assignments")) == 3
+
+
+def test_add_action_assignments(db):
+ policy_id = "admin"
+ action_id = "action_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ action_assignments = add_action_assignment(policy_id, action_id, category_id, data_id)
+ assert action_assignments
+ action_id_1 = list(action_assignments.keys())[0]
+ assert action_assignments[action_id_1]["policy_id"] == policy_id
+ assert action_assignments[action_id_1]["action_id"] == action_id
+ assert action_assignments[action_id_1]["category_id"] == category_id
+ assert len(action_assignments[action_id_1].get("assignments")) == 1
+ assert data_id in action_assignments[action_id_1].get("assignments")
+
+
+def test_delete_action_assignment(db):
+ policy_id = "admin_1"
+ add_action_assignment(policy_id, "", "", "")
+ policy_id = "admin_2"
+ action_id = "action_id_2"
+ category_id = "category_id_2"
+ data_id = "data_id_2"
+ add_action_assignment(policy_id, action_id, category_id, data_id)
+ delete_action_assignment(policy_id, "", "", "")
+ assignments = get_action_assignments(policy_id, )
+ assert len(assignments) == 1
+
+
+def test_delete_action_assignment_with_invalid_policy_id(db):
+ policy_id = "invalid_id"
+ delete_action_assignment(policy_id, "", "", "")
+ assignments = get_action_assignments(policy_id, )
+ assert len(assignments) == 0
+
+
+def test_get_object_assignments(db):
+ policy_id = "admin"
+ object_id = "object_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ add_object_assignment(policy_id, object_id, category_id, data_id)
+ obj_assignments = get_object_assignments(policy_id, object_id, category_id)
+ object_id_1 = list(obj_assignments.keys())[0]
+ assert obj_assignments[object_id_1]["policy_id"] == policy_id
+ assert obj_assignments[object_id_1]["object_id"] == object_id
+ assert obj_assignments[object_id_1]["category_id"] == category_id
+ assert len(obj_assignments[object_id_1].get("assignments")) == 1
+ assert data_id in obj_assignments[object_id_1].get("assignments")
+
+
+def test_get_object_assignments_by_policy_id(db):
+ policy_id = "admin"
+ object_id_1 = "object_id_1"
+ category_id_1 = "category_id_1"
+ data_id = "data_id_1"
+ add_action_assignment(policy_id, object_id_1, category_id_1, data_id)
+ object_id_2 = "object_id_2"
+ category_id_2 = "category_id_2"
+ data_id = "data_id_2"
+ add_action_assignment(policy_id, object_id_2, category_id_2, data_id)
+ object_id_3 = "object_id_3"
+ category_id_3 = "category_id_3"
+ data_id = "data_id_3"
+ add_action_assignment(policy_id, object_id_3, category_id_3, data_id)
+ act_assignments = get_action_assignments(policy_id)
+ assert len(act_assignments) == 3
+
+
+def test_add_object_assignments(db):
+ policy_id = "admin"
+ object_id = "object_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ object_assignments = add_object_assignment(policy_id, object_id, category_id, data_id)
+ assert object_assignments
+ object_id_1 = list(object_assignments.keys())[0]
+ assert object_assignments[object_id_1]["policy_id"] == policy_id
+ assert object_assignments[object_id_1]["object_id"] == object_id
+ assert object_assignments[object_id_1]["category_id"] == category_id
+ assert len(object_assignments[object_id_1].get("assignments")) == 1
+ assert data_id in object_assignments[object_id_1].get("assignments")
+
+
+def test_delete_object_assignment(db):
+ policy_id = "admin_1"
+ add_object_assignment(policy_id, "", "", "")
+ object_id = "action_id_2"
+ category_id = "category_id_2"
+ data_id = "data_id_2"
+ add_object_assignment(policy_id, object_id, category_id, data_id)
+ delete_object_assignment(policy_id, "", "", "")
+ assignments = get_object_assignments(policy_id, )
+ assert len(assignments) == 1
+
+
+def test_delete_object_assignment_with_invalid_policy_id(db):
+ policy_id = "invalid_id"
+ delete_object_assignment(policy_id, "", "", "")
+ assignments = get_object_assignments(policy_id, )
+ assert len(assignments) == 0
+
+
+def test_get_subject_assignments(db):
+ policy_id = "admin"
+ subject_id = "object_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ add_subject_assignment(policy_id, subject_id, category_id, data_id)
+ subj_assignments = get_subject_assignments(policy_id, subject_id, category_id)
+ subject_id_1 = list(subj_assignments.keys())[0]
+ assert subj_assignments[subject_id_1]["policy_id"] == policy_id
+ assert subj_assignments[subject_id_1]["subject_id"] == subject_id
+ assert subj_assignments[subject_id_1]["category_id"] == category_id
+ assert len(subj_assignments[subject_id_1].get("assignments")) == 1
+ assert data_id in subj_assignments[subject_id_1].get("assignments")
+
+
+def test_get_subject_assignments_by_policy_id(db):
+ policy_id = "admin"
+ subject_id_1 = "subject_id_1"
+ category_id_1 = "category_id_1"
+ data_id = "data_id_1"
+ add_subject_assignment(policy_id, subject_id_1, category_id_1, data_id)
+ subject_id_2 = "subject_id_2"
+ category_id_2 = "category_id_2"
+ data_id = "data_id_2"
+ add_subject_assignment(policy_id, subject_id_2, category_id_2, data_id)
+ subject_id_3 = "subject_id_3"
+ category_id_3 = "category_id_3"
+ data_id = "data_id_3"
+ add_subject_assignment(policy_id, subject_id_3, category_id_3, data_id)
+ subj_assignments = get_subject_assignments(policy_id)
+ assert len(subj_assignments) == 3
+
+
+def test_add_subject_assignments(db):
+ policy_id = "admin"
+ subject_id = "subject_id_1"
+ category_id = "category_id_1"
+ data_id = "data_id_1"
+ subject_assignments = add_subject_assignment(policy_id, subject_id, category_id, data_id)
+ assert subject_assignments
+ subject_id_1 = list(subject_assignments.keys())[0]
+ assert subject_assignments[subject_id_1]["policy_id"] == policy_id
+ assert subject_assignments[subject_id_1]["subject_id"] == subject_id
+ assert subject_assignments[subject_id_1]["category_id"] == category_id
+ assert len(subject_assignments[subject_id_1].get("assignments")) == 1
+ assert data_id in subject_assignments[subject_id_1].get("assignments")
+
+
+def test_delete_subject_assignment(db):
+ policy_id = "admin_1"
+ add_subject_assignment(policy_id, "", "", "")
+ subject_id = "subject_id_2"
+ category_id = "category_id_2"
+ data_id = "data_id_2"
+ add_subject_assignment(policy_id, subject_id, category_id, data_id)
+ delete_subject_assignment(policy_id, "", "", "")
+ assignments = get_subject_assignments(policy_id, )
+ assert len(assignments) == 1
+
+
+def test_delete_subject_assignment_with_invalid_policy_id(db):
+ policy_id = "invalid_id"
+ delete_subject_assignment(policy_id, "", "", "")
+ assignments = get_subject_assignments(policy_id, )
+ assert len(assignments) == 0
diff --git a/python_moondb/tests/unit_python/policies/test_data.py b/python_moondb/tests/unit_python/policies/test_data.py
new file mode 100755
index 00000000..68b1d2a0
--- /dev/null
+++ b/python_moondb/tests/unit_python/policies/test_data.py
@@ -0,0 +1,513 @@
+import policies.mock_data as mock_data
+import pytest
+
+
+def get_action_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_action_data("", policy_id, data_id, category_id)
+
+
+def add_action_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action_data("", policy_id, data_id, category_id, value)
+
+
+def delete_action_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action_data("", policy_id, data_id)
+
+
+def get_object_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_object_data("", policy_id, data_id, category_id)
+
+
+def add_object_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object_data("", policy_id, data_id, category_id, value)
+
+
+def delete_object_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object_data("", policy_id, data_id)
+
+
+def get_subject_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subject_data("", policy_id, data_id, category_id)
+
+
+def add_subject_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value)
+
+
+def delete_subject_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject_data("", policy_id, data_id)
+
+
+def get_actions(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_actions("", policy_id, perimeter_id)
+
+
+def add_action(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action("", policy_id, perimeter_id, value)
+
+
+def delete_action(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action("", policy_id, perimeter_id)
+
+
+def get_objects(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_objects("", policy_id, perimeter_id)
+
+
+def add_object(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object("", policy_id, perimeter_id, value)
+
+
+def delete_object(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object("", policy_id, perimeter_id)
+
+
+def get_subjects(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subjects("", policy_id, perimeter_id)
+
+
+def add_subject(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_subject("", policy_id, perimeter_id, value)
+
+
+def delete_subject(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject("", policy_id, perimeter_id)
+
+
+def get_available_metadata(policy_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_available_metadata("", policy_id)
+
+
+def test_get_action_data(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+
+ policy_id = policy_id
+ data_id = "data_id_1"
+ category_id = "action_category_id1"
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ add_action_data(policy_id, data_id, category_id, value)
+ action_data = get_action_data(policy_id, data_id, category_id)
+ assert action_data
+ assert len(action_data[0]['data']) == 1
+
+
+def test_get_action_data_with_invalid_category_id(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+
+ policy_id = policy_id
+ data_id = "data_id_1"
+ category_id = "action_category_id1"
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ add_action_data(policy_id, data_id, category_id, value)
+ action_data = get_action_data(policy_id)
+ assert action_data
+ assert len(action_data[0]['data']) == 1
+
+
+def test_add_action_data(db):
+ policy_id = "policy_id_1"
+ data_id = "data_id_1"
+ category_id = "category_id_1"
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ action_data = add_action_data(policy_id, data_id, category_id, value).get('data')
+ assert action_data
+ action_data_id = list(action_data.keys())[0]
+ assert action_data[action_data_id].get('policy_id') == policy_id
+
+
+def test_delete_action_data(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+ data_id = "data_id_1"
+ category_id = "category_id_1"
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ action_data = add_action_data(policy_id, data_id, category_id, value).get('data')
+ action_data_id = list(action_data.keys())[0]
+ delete_action_data(action_data[action_data_id].get('policy_id'), None)
+ new_action_data = get_action_data(policy_id)
+ assert len(new_action_data[0]['data']) == 0
+
+
+def test_get_object_data(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+
+ policy_id = policy_id
+ data_id = "data_id_1"
+ category_id = "object_category_id1"
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ add_object_data(policy_id, data_id, category_id, value)
+ object_data = get_object_data(policy_id, data_id, category_id)
+ assert object_data
+ assert len(object_data[0]['data']) == 1
+
+
+def test_get_object_data_with_invalid_category_id(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+
+ policy_id = policy_id
+ data_id = "data_id_1"
+ category_id = "object_category_id1"
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ add_object_data(policy_id, data_id, category_id, value)
+ object_data = get_object_data(policy_id)
+ assert object_data
+ assert len(object_data[0]['data']) == 1
+
+
+def test_add_object_data(db):
+ policy_id = "policy_id_1"
+ data_id = "data_id_1"
+ category_id = "object_category_id1"
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ object_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+ assert object_data
+ object_data_id = list(object_data.keys())[0]
+ assert object_data[object_data_id].get('policy_id') == policy_id
+
+
+def test_delete_object_data(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+ data_id = "data_id_1"
+ category_id = "object_category_id1"
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ object_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+ object_data_id = list(object_data.keys())[0]
+ delete_object_data(object_data[object_data_id].get('policy_id'), data_id)
+ new_object_data = get_object_data(policy_id)
+ assert len(new_object_data[0]['data']) == 0
+
+
+def test_get_subject_data(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+
+ policy_id = policy_id
+ data_id = "data_id_1"
+ category_id = "subject_category_id1"
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ add_subject_data(policy_id, data_id, category_id, value)
+ subject_data = get_subject_data(policy_id, data_id, category_id)
+ assert subject_data
+ assert len(subject_data[0]['data']) == 1
+
+
+def test_get_subject_data_with_invalid_category_id(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+
+ policy_id = policy_id
+ data_id = "data_id_1"
+ category_id = "subject_category_id1"
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ add_subject_data(policy_id, data_id, category_id, value)
+ subject_data = get_subject_data(policy_id)
+ assert subject_data
+ assert len(subject_data[0]['data']) == 1
+
+
+def test_add_subject_data(db):
+ policy_id = "policy_id_1"
+ data_id = "data_id_1"
+ category_id = "subject_category_id1"
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ subject_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+ assert subject_data
+ subject_data_id = list(subject_data.keys())[0]
+ assert subject_data[subject_data_id].get('policy_id') == policy_id
+
+
+def test_delete_subject_data(db):
+ policy_id = mock_data.get_policy_id()
+ get_available_metadata(policy_id)
+ data_id = "data_id_1"
+ category_id = "subject_category_id1"
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ subject_data = add_subject_data(policy_id, data_id, category_id, value).get('data')
+ subject_data_id = list(subject_data.keys())[0]
+ delete_subject_data(subject_data[subject_data_id].get('policy_id'), data_id)
+ new_subject_data = get_subject_data(policy_id)
+ assert len(new_subject_data[0]['data']) == 0
+
+
+def test_get_actions(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_action",
+ "description": "test",
+ }
+ add_action(policy_id=policy_id, value=value)
+ actions = get_actions(policy_id, )
+ assert actions
+ assert len(actions) == 1
+ action_id = list(actions.keys())[0]
+ assert actions[action_id].get('policy_list')[0] == policy_id
+
+
+def test_add_action(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_action",
+ "description": "test",
+ }
+ action = add_action(policy_id=policy_id, value=value)
+ assert action
+ action_id = list(action.keys())[0]
+ assert len(action[action_id].get('policy_list')) == 1
+
+
+def test_add_action_multiple_times(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_action",
+ "description": "test",
+ }
+ action = add_action(policy_id=policy_id, value=value)
+ action_id = list(action.keys())[0]
+ perimeter_id = action[action_id].get('id')
+ assert action
+ value = {
+ "name": "test_action",
+ "description": "test",
+ "policy_list": ['policy_id_3', 'policy_id_4']
+ }
+ action = add_action('policy_id_7', perimeter_id, value)
+ assert action
+ action_id = list(action.keys())[0]
+ assert len(action[action_id].get('policy_list')) == 2
+
+
+def test_delete_action(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_action",
+ "description": "test",
+ }
+ action = add_action(policy_id=policy_id, value=value)
+ action_id = list(action.keys())[0]
+ delete_action(policy_id, action_id)
+ actions = get_actions(policy_id, )
+ assert not actions
+
+
+def test_delete_action_with_invalid_perimeter_id(db):
+ policy_id = "invalid"
+ perimeter_id = "invalid"
+ with pytest.raises(Exception) as exception_info:
+ delete_action(policy_id, perimeter_id)
+ assert str(exception_info.value) == '400: Action Unknown'
+
+
+def test_get_objects(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_object",
+ "description": "test",
+ }
+ add_object(policy_id=policy_id, value=value)
+ objects = get_objects(policy_id, )
+ assert objects
+ assert len(objects) == 1
+ object_id = list(objects.keys())[0]
+ assert objects[object_id].get('policy_list')[0] == policy_id
+
+
+def test_add_object(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_object",
+ "description": "test",
+ }
+ added_object = add_object(policy_id=policy_id, value=value)
+ assert added_object
+ object_id = list(added_object.keys())[0]
+ assert len(added_object[object_id].get('policy_list')) == 1
+
+
+def test_add_objects_multiple_times(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_object",
+ "description": "test",
+ }
+ added_object = add_object(policy_id=policy_id, value=value)
+ object_id = list(added_object.keys())[0]
+ perimeter_id = added_object[object_id].get('id')
+ assert added_object
+ value = {
+ "name": "test_object",
+ "description": "test",
+ "policy_list": ['policy_id_3', 'policy_id_4']
+ }
+ added_object = add_object('policy_id_7', perimeter_id, value)
+ assert added_object
+ object_id = list(added_object.keys())[0]
+ assert len(added_object[object_id].get('policy_list')) == 2
+
+
+def test_delete_object(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "test_object",
+ "description": "test",
+ }
+ added_object = add_object(policy_id=policy_id, value=value)
+ object_id = list(added_object.keys())[0]
+ delete_object(policy_id, object_id)
+ objects = get_objects(policy_id, )
+ assert not objects
+
+
+def test_delete_object_with_invalid_perimeter_id(db):
+ policy_id = "invalid"
+ perimeter_id = "invalid"
+ with pytest.raises(Exception) as exception_info:
+ delete_object(policy_id, perimeter_id)
+ assert str(exception_info.value) == '400: Object Unknown'
+
+
+def test_get_subjects(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "testuser",
+ "description": "test",
+ }
+ add_subject(policy_id=policy_id, value=value)
+ subjects = get_subjects(policy_id, )
+ assert subjects
+ assert len(subjects) == 1
+ subject_id = list(subjects.keys())[0]
+ assert subjects[subject_id].get('policy_list')[0] == policy_id
+
+
+def test_add_subject(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "testuser",
+ "description": "test",
+ }
+ subject = add_subject(policy_id=policy_id, value=value)
+ assert subject
+ subject_id = list(subject.keys())[0]
+ assert len(subject[subject_id].get('policy_list')) == 1
+
+
+def test_add_subjects_multiple_times(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "testuser",
+ "description": "test",
+ }
+ subject = add_subject(policy_id=policy_id, value=value)
+ subject_id = list(subject.keys())[0]
+ perimeter_id = subject[subject_id].get('id')
+ assert subject
+ value = {
+ "name": "testuser",
+ "description": "test",
+ "policy_list": ['policy_id_3', 'policy_id_4']
+ }
+ subject = add_subject('policy_id_7', perimeter_id, value)
+ assert subject
+ subject_id = list(subject.keys())[0]
+ assert len(subject[subject_id].get('policy_list')) == 2
+
+
+def test_delete_subject(db):
+ policy_id = "policy_id_1"
+ value = {
+ "name": "testuser",
+ "description": "test",
+ }
+ subject = add_subject(policy_id=policy_id, value=value)
+ subject_id = list(subject.keys())[0]
+ delete_subject(policy_id, subject_id)
+ subjects = get_subjects(policy_id, )
+ assert not subjects
+
+
+def test_delete_subject_with_invalid_perimeter_id(db):
+ policy_id = "invalid"
+ perimeter_id = "invalid"
+ with pytest.raises(Exception) as exception_info:
+ delete_subject(policy_id, perimeter_id)
+ assert str(exception_info.value) == '400: Subject Unknown'
+
+
+def test_get_available_metadata(db):
+ policy_id = mock_data.get_policy_id()
+ metadata = get_available_metadata(policy_id)
+ assert metadata
+ assert metadata['object'][0] == "object_category_id1"
+ assert metadata['subject'][0] == "subject_category_id1"
+ assert metadata['subject'][1] == "subject_category_id2"
+
+
+def test_get_available_metadata_empty_model(db):
+ import policies.test_policies as test_policies
+ policy_id = mock_data.get_policy_id()
+ value = mock_data.create_policy("invalid")
+ policy = test_policies.add_policies(value)
+ assert policy
+ policy_id = list(policy.keys())[0]
+ metadata = get_available_metadata(policy_id)
+ assert metadata \ No newline at end of file
diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py
new file mode 100755
index 00000000..acd5d7a8
--- /dev/null
+++ b/python_moondb/tests/unit_python/policies/test_policies.py
@@ -0,0 +1,161 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+
+def get_policies():
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_policies("admin")
+
+
+def add_policies(value=None):
+ from python_moondb.core import PolicyManager
+ if not value:
+ value = {
+ "name": "test_policiy",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ return PolicyManager.add_policy("admin", value=value)
+
+
+def delete_policies(uuid=None, name=None):
+ from python_moondb.core import PolicyManager
+ if not uuid:
+ for policy_id, policy_value in get_policies():
+ if name == policy_value['name']:
+ uuid = policy_id
+ break
+ PolicyManager.delete_policy("admin", uuid)
+
+
+def get_rules(policy_id=None, meta_rule_id=None, rule_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id)
+
+
+def add_rule(policy_id=None, meta_rule_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ if not value:
+ value = {
+ "rule": ("high", "medium", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ return PolicyManager.add_rule("", policy_id, meta_rule_id, value)
+
+
+def delete_rule(policy_id=None, rule_id=None):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_rule("", policy_id, rule_id)
+
+
+def test_get_policies(db):
+ policies = get_policies()
+ assert isinstance(policies, dict)
+ assert not policies
+
+
+def test_add_policies(db):
+ value = {
+ "name": "test_policy",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policies = add_policies(value)
+ assert isinstance(policies, dict)
+ assert policies
+ assert len(policies.keys()) == 1
+ policy_id = list(policies.keys())[0]
+ for key in ("genre", "name", "model_id", "description"):
+ assert key in policies[policy_id]
+ assert policies[policy_id][key] == value[key]
+
+
+def test_delete_policies(db):
+ value = {
+ "name": "test_policy1",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policies = add_policies(value)
+ policy_id1 = list(policies.keys())[0]
+ value = {
+ "name": "test_policy2",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policies = add_policies(value)
+ policy_id2 = list(policies.keys())[0]
+ assert policy_id1 != policy_id2
+ delete_policies(policy_id1)
+ policies = get_policies()
+ assert policy_id1 not in policies
+
+
+def test_get_rules(db):
+ value = {
+ "rule": ("low", "medium", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ policy_id = "1"
+ meta_rule_id = "1"
+ add_rule(policy_id, meta_rule_id, value)
+ value = {
+ "rule": ("low", "low", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ policy_id = "1"
+ meta_rule_id = "1"
+ add_rule(policy_id, meta_rule_id, value)
+ rules = get_rules(policy_id, meta_rule_id)
+ assert isinstance(rules, dict)
+ assert rules
+ obj = rules.get('rules')
+ assert len(obj) == 2
+
+
+def test_get_rules_with_invalid_policy_id_failure(db):
+ rules = get_rules("invalid_policy_id", "meta_rule_id")
+ assert not rules.get('meta_rule-id')
+ assert len(rules.get('rules')) == 0
+
+
+def test_add_rule(db):
+ value = {
+ "rule": ("high", "medium", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ policy_id = "1"
+ meta_rule_id = "1"
+ rules = add_rule(policy_id, meta_rule_id, value)
+ assert rules
+ assert len(rules) == 1
+ assert isinstance(rules, dict)
+ rule_id = list(rules.keys())[0]
+ for key in ("rule", "instructions", "enabled"):
+ assert key in rules[rule_id]
+ assert rules[rule_id][key] == value[key]
+
+
+def test_delete_rule(db):
+ value = {
+ "rule": ("low", "low", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ policy_id = "2"
+ meta_rule_id = "2"
+ rules = add_rule(policy_id, meta_rule_id, value)
+ rule_id = list(rules.keys())[0]
+ delete_rule(policy_id, rule_id)
+ rules = get_rules(policy_id, meta_rule_id)
+ assert not rules.get('rules')