aboutsummaryrefslogtreecommitdiffstats
path: root/python_moondb/tests/unit_python/policies/test_policies.py
diff options
context:
space:
mode:
Diffstat (limited to 'python_moondb/tests/unit_python/policies/test_policies.py')
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_policies.py237
1 files changed, 78 insertions, 159 deletions
diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py
index f1dd258f..07ee87fd 100755
--- a/python_moondb/tests/unit_python/policies/test_policies.py
+++ b/python_moondb/tests/unit_python/policies/test_policies.py
@@ -4,70 +4,14 @@
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
import pytest
-import policies.mock_data as mock_data
+import helpers.mock_data as mock_data
+import helpers.policy_helper as policy_helper
from python_moonutilities.exceptions import *
-
-
-def get_policies():
- from python_moondb.core import PolicyManager
- return PolicyManager.get_policies("admin")
-
-
-def add_policies(policy_id=None, value=None):
- from python_moondb.core import PolicyManager
- if not value:
- value = {
- "name": "test_policiy",
- "model_id": "",
- "genre": "authz",
- "description": "test",
- }
- return PolicyManager.add_policy("admin", policy_id=policy_id, value=value)
-
-
-def delete_policies(uuid=None, name=None):
- from python_moondb.core import PolicyManager
- if not uuid:
- for policy_id, policy_value in get_policies():
- if name == policy_value['name']:
- uuid = policy_id
- break
- PolicyManager.delete_policy("admin", uuid)
-
-
-def update_policy(policy_id, value):
- from python_moondb.core import PolicyManager
- return PolicyManager.update_policy("admin", policy_id, value)
-
-
-def get_policy_from_meta_rules(meta_rule_id):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id)
-
-
-def get_rules(policy_id=None, meta_rule_id=None, rule_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id)
-
-
-def add_rule(policy_id=None, meta_rule_id=None, value=None):
- from python_moondb.core import PolicyManager
- if not value:
- value = {
- "rule": ("high", "medium", "vm-action"),
- "instructions": ({"decision": "grant"}),
- "enabled": "",
- }
- return PolicyManager.add_rule("", policy_id, meta_rule_id, value)
-
-
-def delete_rule(policy_id=None, rule_id=None):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_rule("", policy_id, rule_id)
+import helpers.pdp_helper as pdp_helper
def test_get_policies(db):
- policies = get_policies()
+ policies = policy_helper.get_policies()
assert isinstance(policies, dict)
assert not policies
@@ -79,7 +23,7 @@ def test_add_policies(db):
"genre": "authz",
"description": "test",
}
- policies = add_policies(value=value)
+ policies = policy_helper.add_policies(value=value)
assert isinstance(policies, dict)
assert policies
assert len(policies.keys()) == 1
@@ -97,9 +41,9 @@ def test_add_policies_twice_with_same_id(db):
"genre": "authz",
"description": "test",
}
- add_policies(policy_id, value)
+ policy_helper.add_policies(policy_id, value)
with pytest.raises(PolicyExisting) as exception_info:
- add_policies(policy_id, value)
+ policy_helper.add_policies(policy_id, value)
# assert str(exception_info.value) == '409: Policy Error'
@@ -110,9 +54,9 @@ def test_add_policies_twice_with_same_name(db):
"genre": "authz",
"description": "test",
}
- add_policies(value=value)
+ policy_helper.add_policies(value=value)
with pytest.raises(Exception) as exception_info:
- add_policies(value=value)
+ policy_helper.add_policies(value=value)
# assert str(exception_info.value) == '409: Policy Error'
@@ -123,7 +67,7 @@ def test_delete_policies(db):
"genre": "authz",
"description": "test",
}
- policies = add_policies(value=value)
+ policies = policy_helper.add_policies(value=value)
policy_id1 = list(policies.keys())[0]
value = {
"name": "test_policy2",
@@ -131,45 +75,23 @@ def test_delete_policies(db):
"genre": "authz",
"description": "test",
}
- policies = add_policies(value=value)
+ policies = policy_helper.add_policies(value=value)
policy_id2 = list(policies.keys())[0]
assert policy_id1 != policy_id2
- delete_policies(policy_id1)
- policies = get_policies()
+ policy_helper.delete_policies(policy_id1)
+ policies = policy_helper.get_policies()
assert policy_id1 not in policies
def test_delete_policies_with_invalid_id(db):
policy_id = 'policy_id_1'
with pytest.raises(PolicyUnknown) as exception_info:
- delete_policies(policy_id)
+ policy_helper.delete_policies(policy_id)
# assert str(exception_info.value) == '400: Policy Unknown'
-def test_delete_policies_with_pdp(db):
- from python_moondb.core import PDPManager
- value = {
- "name": "test_policy1",
- "model_id": "",
- "genre": "authz",
- "description": "test",
- }
- policies = add_policies(value=value)
- policy_id1 = list(policies.keys())[0]
- pdp_id = "pdp_id1"
- value = {
- "name": "test_pdp",
- "security_pipeline": [policy_id1],
- "keystone_project_id": "keystone_project_id1",
- "description": "...",
- }
- PDPManager.add_pdp(user_id="admin" ,pdp_id=pdp_id, value=value)
- with pytest.raises(DeletePolicyWithPdp) as exception_info:
- delete_policies(policy_id1)
-
-
def test_update_policy(db):
- policies = add_policies()
+ policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
value = {
"name": "test_policy4",
@@ -177,7 +99,7 @@ def test_update_policy(db):
"genre": "authz",
"description": "test-3",
}
- updated_policy = update_policy(policy_id, value)
+ updated_policy = policy_helper.update_policy(policy_id, value)
assert updated_policy
for key in ("genre", "name", "model_id", "description"):
assert key in updated_policy[policy_id]
@@ -193,32 +115,26 @@ def test_update_policy_with_invalid_id(db):
"description": "test-3",
}
with pytest.raises(PolicyUnknown) as exception_info:
- update_policy(policy_id, value)
+ policy_helper.update_policy(policy_id, value)
# assert str(exception_info.value) == '400: Policy Unknown'
def test_get_policy_from_meta_rules(db):
- import models.test_models as test_models
- import models.test_meta_rules as test_meta_rules
- import test_pdp as test_pdp
- meta_rule = test_meta_rules.add_meta_rule(value=mock_data.create_meta_rule())
- meta_rule_id = list(meta_rule.keys())[0]
- model = test_models.add_model(value=mock_data.create_model(meta_rule_id))
- model_id = list(model.keys())[0]
- value = mock_data.create_policy(model_id)
- policy = add_policies(value=value)
- assert policy
- policy_id = list(policy.keys())[0]
- pdp_ids = [policy_id, ]
- pdp_obj = mock_data.create_pdp(pdp_ids)
- test_pdp.add_pdp(value=pdp_obj)
- matched_policy_id = get_policy_from_meta_rules(meta_rule_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
+ security_pipeline = [policy_id]
+ pdp_obj = mock_data.create_pdp(security_pipeline)
+ pdp_helper.add_pdp(value=pdp_obj)
+ matched_policy_id = policy_helper.get_policy_from_meta_rules(meta_rule_id)
assert matched_policy_id
assert policy_id == matched_policy_id
def test_get_policy_from_meta_rules_with_no_policy_ids(db):
- import test_pdp as test_pdp
meta_rule_id = 'meta_rule_id'
value = {
"name": "test_pdp",
@@ -226,58 +142,31 @@ def test_get_policy_from_meta_rules_with_no_policy_ids(db):
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- test_pdp.add_pdp(value=value)
- matched_policy_id = get_policy_from_meta_rules(meta_rule_id)
+ pdp_helper.add_pdp(value=value)
+ matched_policy_id = policy_helper.get_policy_from_meta_rules(meta_rule_id)
assert not matched_policy_id
-def test_get_policy_from_meta_rules_with_no_policies(db):
- import test_pdp as test_pdp
- meta_rule_id = 'meta_rule_id'
- policy_id = 'invalid'
- pdp_ids = [policy_id, ]
- pdp_obj = mock_data.create_pdp(pdp_ids)
- test_pdp.add_pdp(value=pdp_obj)
- with pytest.raises(Exception) as exception_info:
- get_policy_from_meta_rules(meta_rule_id)
- assert str(exception_info.value) == '400: Policy Unknown'
-
-
-def test_get_policy_from_meta_rules_with_no_models(db):
- import models.test_meta_rules as test_meta_rules
- import test_pdp as test_pdp
- meta_rule = test_meta_rules.add_meta_rule(value=mock_data.create_meta_rule())
- meta_rule_id = list(meta_rule.keys())[0]
- model_id = 'invalid'
- value = mock_data.create_policy(model_id)
- policy = add_policies(value=value)
- assert policy
- policy_id = list(policy.keys())[0]
- pdp_ids = [policy_id, ]
- pdp_obj = mock_data.create_pdp(pdp_ids)
- test_pdp.add_pdp(value=pdp_obj)
- with pytest.raises(Exception) as exception_info:
- get_policy_from_meta_rules(meta_rule_id)
- assert str(exception_info.value) == '400: Model Unknown'
-
-
def test_get_rules(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category12",
+ object_category_name="object_category12",
+ action_category_name="action_category12",
+ meta_rule_name="meta_rule_12",
+ model_name="model12")
value = {
"rule": ("low", "medium", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = mock_data.get_policy_id()
- meta_rule_id = "1"
- add_rule(policy_id, meta_rule_id, value)
+ policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
value = {
"rule": ("low", "low", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- meta_rule_id = "1"
- add_rule(policy_id, meta_rule_id, value)
- rules = get_rules(policy_id, meta_rule_id)
+ policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
+ rules = policy_helper.get_rules(policy_id=policy_id, meta_rule_id=meta_rule_id)
assert isinstance(rules, dict)
assert rules
obj = rules.get('rules')
@@ -285,20 +174,25 @@ def test_get_rules(db):
def test_get_rules_with_invalid_policy_id_failure(db):
- rules = get_rules("invalid_policy_id", "meta_rule_id")
+ rules = policy_helper.get_rules("invalid_policy_id", "meta_rule_id")
assert not rules.get('meta_rule-id')
assert len(rules.get('rules')) == 0
def test_add_rule(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"rule": ("high", "medium", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = mock_data.get_policy_id()
- meta_rule_id = "1"
- rules = add_rule(policy_id, meta_rule_id, value)
+
+ rules = policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
assert rules
assert len(rules) == 1
assert isinstance(rules, dict)
@@ -308,19 +202,44 @@ def test_add_rule(db):
assert rules[rule_id][key] == value[key]
with pytest.raises(RuleExisting):
- add_rule(policy_id, meta_rule_id, value)
+ policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
def test_delete_rule(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category14",
+ object_category_name="object_category14",
+ action_category_name="action_category14",
+ meta_rule_name="meta_rule_14",
+ model_name="model14")
value = {
"rule": ("low", "low", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = mock_data.get_policy_id()
- meta_rule_id = "2"
- rules = add_rule(policy_id, meta_rule_id, value)
+ rules = policy_helper.add_rule(policy_id, meta_rule_id, value)
rule_id = list(rules.keys())[0]
- delete_rule(policy_id, rule_id)
- rules = get_rules(policy_id, meta_rule_id)
+ policy_helper.delete_rule(policy_id, rule_id)
+ rules = policy_helper.get_rules(policy_id, meta_rule_id)
assert not rules.get('rules')
+
+
+def test_delete_policies_with_pdp(db):
+ value = {
+ "name": "test_policy1",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policies = policy_helper.add_policies(value=value)
+ policy_id1 = list(policies.keys())[0]
+ pdp_id = "pdp_id1"
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": [policy_id1],
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ pdp_helper.add_pdp(pdp_id=pdp_id, value=value)
+ with pytest.raises(DeletePolicyWithPdp) as exception_info:
+ policy_helper.delete_policies(policy_id1)