diff options
Diffstat (limited to 'python_moondb/tests/unit_python/policies/test_policies.py')
-rwxr-xr-x | python_moondb/tests/unit_python/policies/test_policies.py | 237 |
1 files changed, 78 insertions, 159 deletions
diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py index f1dd258f..07ee87fd 100755 --- a/python_moondb/tests/unit_python/policies/test_policies.py +++ b/python_moondb/tests/unit_python/policies/test_policies.py @@ -4,70 +4,14 @@ # or at 'http://www.apache.org/licenses/LICENSE-2.0'. import pytest -import policies.mock_data as mock_data +import helpers.mock_data as mock_data +import helpers.policy_helper as policy_helper from python_moonutilities.exceptions import * - - -def get_policies(): - from python_moondb.core import PolicyManager - return PolicyManager.get_policies("admin") - - -def add_policies(policy_id=None, value=None): - from python_moondb.core import PolicyManager - if not value: - value = { - "name": "test_policiy", - "model_id": "", - "genre": "authz", - "description": "test", - } - return PolicyManager.add_policy("admin", policy_id=policy_id, value=value) - - -def delete_policies(uuid=None, name=None): - from python_moondb.core import PolicyManager - if not uuid: - for policy_id, policy_value in get_policies(): - if name == policy_value['name']: - uuid = policy_id - break - PolicyManager.delete_policy("admin", uuid) - - -def update_policy(policy_id, value): - from python_moondb.core import PolicyManager - return PolicyManager.update_policy("admin", policy_id, value) - - -def get_policy_from_meta_rules(meta_rule_id): - from python_moondb.core import PolicyManager - return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id) - - -def get_rules(policy_id=None, meta_rule_id=None, rule_id=None): - from python_moondb.core import PolicyManager - return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id) - - -def add_rule(policy_id=None, meta_rule_id=None, value=None): - from python_moondb.core import PolicyManager - if not value: - value = { - "rule": ("high", "medium", "vm-action"), - "instructions": ({"decision": "grant"}), - "enabled": "", - } - return PolicyManager.add_rule("", policy_id, meta_rule_id, value) - - -def delete_rule(policy_id=None, rule_id=None): - from python_moondb.core import PolicyManager - PolicyManager.delete_rule("", policy_id, rule_id) +import helpers.pdp_helper as pdp_helper def test_get_policies(db): - policies = get_policies() + policies = policy_helper.get_policies() assert isinstance(policies, dict) assert not policies @@ -79,7 +23,7 @@ def test_add_policies(db): "genre": "authz", "description": "test", } - policies = add_policies(value=value) + policies = policy_helper.add_policies(value=value) assert isinstance(policies, dict) assert policies assert len(policies.keys()) == 1 @@ -97,9 +41,9 @@ def test_add_policies_twice_with_same_id(db): "genre": "authz", "description": "test", } - add_policies(policy_id, value) + policy_helper.add_policies(policy_id, value) with pytest.raises(PolicyExisting) as exception_info: - add_policies(policy_id, value) + policy_helper.add_policies(policy_id, value) # assert str(exception_info.value) == '409: Policy Error' @@ -110,9 +54,9 @@ def test_add_policies_twice_with_same_name(db): "genre": "authz", "description": "test", } - add_policies(value=value) + policy_helper.add_policies(value=value) with pytest.raises(Exception) as exception_info: - add_policies(value=value) + policy_helper.add_policies(value=value) # assert str(exception_info.value) == '409: Policy Error' @@ -123,7 +67,7 @@ def test_delete_policies(db): "genre": "authz", "description": "test", } - policies = add_policies(value=value) + policies = policy_helper.add_policies(value=value) policy_id1 = list(policies.keys())[0] value = { "name": "test_policy2", @@ -131,45 +75,23 @@ def test_delete_policies(db): "genre": "authz", "description": "test", } - policies = add_policies(value=value) + policies = policy_helper.add_policies(value=value) policy_id2 = list(policies.keys())[0] assert policy_id1 != policy_id2 - delete_policies(policy_id1) - policies = get_policies() + policy_helper.delete_policies(policy_id1) + policies = policy_helper.get_policies() assert policy_id1 not in policies def test_delete_policies_with_invalid_id(db): policy_id = 'policy_id_1' with pytest.raises(PolicyUnknown) as exception_info: - delete_policies(policy_id) + policy_helper.delete_policies(policy_id) # assert str(exception_info.value) == '400: Policy Unknown' -def test_delete_policies_with_pdp(db): - from python_moondb.core import PDPManager - value = { - "name": "test_policy1", - "model_id": "", - "genre": "authz", - "description": "test", - } - policies = add_policies(value=value) - policy_id1 = list(policies.keys())[0] - pdp_id = "pdp_id1" - value = { - "name": "test_pdp", - "security_pipeline": [policy_id1], - "keystone_project_id": "keystone_project_id1", - "description": "...", - } - PDPManager.add_pdp(user_id="admin" ,pdp_id=pdp_id, value=value) - with pytest.raises(DeletePolicyWithPdp) as exception_info: - delete_policies(policy_id1) - - def test_update_policy(db): - policies = add_policies() + policies = policy_helper.add_policies() policy_id = list(policies.keys())[0] value = { "name": "test_policy4", @@ -177,7 +99,7 @@ def test_update_policy(db): "genre": "authz", "description": "test-3", } - updated_policy = update_policy(policy_id, value) + updated_policy = policy_helper.update_policy(policy_id, value) assert updated_policy for key in ("genre", "name", "model_id", "description"): assert key in updated_policy[policy_id] @@ -193,32 +115,26 @@ def test_update_policy_with_invalid_id(db): "description": "test-3", } with pytest.raises(PolicyUnknown) as exception_info: - update_policy(policy_id, value) + policy_helper.update_policy(policy_id, value) # assert str(exception_info.value) == '400: Policy Unknown' def test_get_policy_from_meta_rules(db): - import models.test_models as test_models - import models.test_meta_rules as test_meta_rules - import test_pdp as test_pdp - meta_rule = test_meta_rules.add_meta_rule(value=mock_data.create_meta_rule()) - meta_rule_id = list(meta_rule.keys())[0] - model = test_models.add_model(value=mock_data.create_model(meta_rule_id)) - model_id = list(model.keys())[0] - value = mock_data.create_policy(model_id) - policy = add_policies(value=value) - assert policy - policy_id = list(policy.keys())[0] - pdp_ids = [policy_id, ] - pdp_obj = mock_data.create_pdp(pdp_ids) - test_pdp.add_pdp(value=pdp_obj) - matched_policy_id = get_policy_from_meta_rules(meta_rule_id) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + security_pipeline = [policy_id] + pdp_obj = mock_data.create_pdp(security_pipeline) + pdp_helper.add_pdp(value=pdp_obj) + matched_policy_id = policy_helper.get_policy_from_meta_rules(meta_rule_id) assert matched_policy_id assert policy_id == matched_policy_id def test_get_policy_from_meta_rules_with_no_policy_ids(db): - import test_pdp as test_pdp meta_rule_id = 'meta_rule_id' value = { "name": "test_pdp", @@ -226,58 +142,31 @@ def test_get_policy_from_meta_rules_with_no_policy_ids(db): "keystone_project_id": "keystone_project_id1", "description": "...", } - test_pdp.add_pdp(value=value) - matched_policy_id = get_policy_from_meta_rules(meta_rule_id) + pdp_helper.add_pdp(value=value) + matched_policy_id = policy_helper.get_policy_from_meta_rules(meta_rule_id) assert not matched_policy_id -def test_get_policy_from_meta_rules_with_no_policies(db): - import test_pdp as test_pdp - meta_rule_id = 'meta_rule_id' - policy_id = 'invalid' - pdp_ids = [policy_id, ] - pdp_obj = mock_data.create_pdp(pdp_ids) - test_pdp.add_pdp(value=pdp_obj) - with pytest.raises(Exception) as exception_info: - get_policy_from_meta_rules(meta_rule_id) - assert str(exception_info.value) == '400: Policy Unknown' - - -def test_get_policy_from_meta_rules_with_no_models(db): - import models.test_meta_rules as test_meta_rules - import test_pdp as test_pdp - meta_rule = test_meta_rules.add_meta_rule(value=mock_data.create_meta_rule()) - meta_rule_id = list(meta_rule.keys())[0] - model_id = 'invalid' - value = mock_data.create_policy(model_id) - policy = add_policies(value=value) - assert policy - policy_id = list(policy.keys())[0] - pdp_ids = [policy_id, ] - pdp_obj = mock_data.create_pdp(pdp_ids) - test_pdp.add_pdp(value=pdp_obj) - with pytest.raises(Exception) as exception_info: - get_policy_from_meta_rules(meta_rule_id) - assert str(exception_info.value) == '400: Model Unknown' - - def test_get_rules(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category12", + object_category_name="object_category12", + action_category_name="action_category12", + meta_rule_name="meta_rule_12", + model_name="model12") value = { "rule": ("low", "medium", "vm-action"), "instructions": ({"decision": "grant"}), "enabled": "", } - policy_id = mock_data.get_policy_id() - meta_rule_id = "1" - add_rule(policy_id, meta_rule_id, value) + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) value = { "rule": ("low", "low", "vm-action"), "instructions": ({"decision": "grant"}), "enabled": "", } - meta_rule_id = "1" - add_rule(policy_id, meta_rule_id, value) - rules = get_rules(policy_id, meta_rule_id) + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + rules = policy_helper.get_rules(policy_id=policy_id, meta_rule_id=meta_rule_id) assert isinstance(rules, dict) assert rules obj = rules.get('rules') @@ -285,20 +174,25 @@ def test_get_rules(db): def test_get_rules_with_invalid_policy_id_failure(db): - rules = get_rules("invalid_policy_id", "meta_rule_id") + rules = policy_helper.get_rules("invalid_policy_id", "meta_rule_id") assert not rules.get('meta_rule-id') assert len(rules.get('rules')) == 0 def test_add_rule(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") value = { "rule": ("high", "medium", "vm-action"), "instructions": ({"decision": "grant"}), "enabled": "", } - policy_id = mock_data.get_policy_id() - meta_rule_id = "1" - rules = add_rule(policy_id, meta_rule_id, value) + + rules = policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) assert rules assert len(rules) == 1 assert isinstance(rules, dict) @@ -308,19 +202,44 @@ def test_add_rule(db): assert rules[rule_id][key] == value[key] with pytest.raises(RuleExisting): - add_rule(policy_id, meta_rule_id, value) + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) def test_delete_rule(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category14", + object_category_name="object_category14", + action_category_name="action_category14", + meta_rule_name="meta_rule_14", + model_name="model14") value = { "rule": ("low", "low", "vm-action"), "instructions": ({"decision": "grant"}), "enabled": "", } - policy_id = mock_data.get_policy_id() - meta_rule_id = "2" - rules = add_rule(policy_id, meta_rule_id, value) + rules = policy_helper.add_rule(policy_id, meta_rule_id, value) rule_id = list(rules.keys())[0] - delete_rule(policy_id, rule_id) - rules = get_rules(policy_id, meta_rule_id) + policy_helper.delete_rule(policy_id, rule_id) + rules = policy_helper.get_rules(policy_id, meta_rule_id) assert not rules.get('rules') + + +def test_delete_policies_with_pdp(db): + value = { + "name": "test_policy1", + "model_id": "", + "genre": "authz", + "description": "test", + } + policies = policy_helper.add_policies(value=value) + policy_id1 = list(policies.keys())[0] + pdp_id = "pdp_id1" + value = { + "name": "test_pdp", + "security_pipeline": [policy_id1], + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + pdp_helper.add_pdp(pdp_id=pdp_id, value=value) + with pytest.raises(DeletePolicyWithPdp) as exception_info: + policy_helper.delete_policies(policy_id1) |