diff options
-rw-r--r-- | python_moondb/python_moondb/api/policy.py | 13 | ||||
-rwxr-xr-x | python_moondb/tests/unit_python/test_pdp.py | 43 |
2 files changed, 56 insertions, 0 deletions
diff --git a/python_moondb/python_moondb/api/policy.py b/python_moondb/python_moondb/api/policy.py index 9781fda8..97866bfd 100644 --- a/python_moondb/python_moondb/api/policy.py +++ b/python_moondb/python_moondb/api/policy.py @@ -7,6 +7,7 @@ from uuid import uuid4 import logging from python_moonutilities.security_functions import enforce from python_moondb.api.managers import Managers +from python_moonutilities import exceptions logger = logging.getLogger("moon.db.api.policy") @@ -22,21 +23,31 @@ class PolicyManager(Managers): models = self.ModelManager.get_models("admin") for pdp_key, pdp_value in self.PDPManager.get_pdp(user_id).items(): for policy_id in pdp_value["security_pipeline"]: + if not policies: + raise exceptions.PolicyUnknown model_id = policies[policy_id]["model_id"] + if not models: + raise exceptions.ModelUnknown if meta_rule_id in models[model_id]["meta_rules"]: return policy_id @enforce(("read", "write"), "policies") def update_policy(self, user_id, policy_id, value): + if policy_id not in self.driver.get_policies(policy_id=policy_id): + raise exceptions.PolicyUnknown return self.driver.update_policy(policy_id=policy_id, value=value) @enforce(("read", "write"), "policies") def delete_policy(self, user_id, policy_id): # TODO (asteroide): unmap PDP linked to that policy + if policy_id not in self.driver.get_policies(policy_id=policy_id): + raise exceptions.PolicyUnknown return self.driver.delete_policy(policy_id=policy_id) @enforce(("read", "write"), "policies") def add_policy(self, user_id, policy_id=None, value=None): + if policy_id in self.driver.get_policies(policy_id=policy_id): + raise exceptions.PolicyExisting if not policy_id: policy_id = uuid4().hex return self.driver.add_policy(policy_id=policy_id, value=value) @@ -235,6 +246,8 @@ class PolicyManager(Managers): "action": [] } policy = self.driver.get_policies(policy_id=policy_id) + if not policy: + raise exceptions.PolicyUnknown model_id = policy[policy_id]["model_id"] model = Managers.ModelManager.get_models(user_id=user_id, model_id=model_id) try: diff --git a/python_moondb/tests/unit_python/test_pdp.py b/python_moondb/tests/unit_python/test_pdp.py index cb206d3d..5134c0fb 100755 --- a/python_moondb/tests/unit_python/test_pdp.py +++ b/python_moondb/tests/unit_python/test_pdp.py @@ -1,3 +1,6 @@ +import pytest + + def update_pdp(pdp_id, value): from python_moondb.core import PDPManager return PDPManager.update_pdp("", pdp_id, value) @@ -31,6 +34,19 @@ def test_update_pdp(db): assert pdp +def test_update_pdp_with_invalid_id(db): + pdp_id = "pdp_id1" + value = { + "name": "test_pdp", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + with pytest.raises(Exception) as exception_info: + update_pdp(pdp_id, value) + assert str(exception_info.value) == '400: Pdp Unknown' + + def test_delete_pdp(db): pdp_id = "pdp_id1" value = { @@ -44,6 +60,13 @@ def test_delete_pdp(db): assert len(get_pdp(pdp_id)) == 0 +def test_delete_pdp_with_invalid_id(db): + pdp_id = "pdp_id1" + with pytest.raises(Exception) as exception_info: + delete_pdp(pdp_id) + assert str(exception_info.value) == '400: Pdp Unknown' + + def test_add_pdp(db): pdp_id = "pdp_id1" value = { @@ -56,6 +79,20 @@ def test_add_pdp(db): assert pdp +def test_add_pdp_twice_with_same_id(db): + pdp_id = "pdp_id1" + value = { + "name": "test_pdp", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + add_pdp(pdp_id, value) + with pytest.raises(Exception) as exception_info: + add_pdp(pdp_id, value) + assert str(exception_info.value) == '409: Pdp Error' + + def test_get_pdp(db): pdp_id = "pdp_id1" value = { @@ -67,3 +104,9 @@ def test_get_pdp(db): add_pdp(pdp_id, value) pdp = get_pdp(pdp_id) assert len(pdp) == 1 + + +def test_get_pdp_with_invalid_id(db): + pdp_id = "invalid" + pdp = get_pdp(pdp_id) + assert len(pdp) == 0 |