aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python_moondb/python_moondb/api/policy.py13
-rwxr-xr-xpython_moondb/tests/unit_python/test_pdp.py43
2 files changed, 56 insertions, 0 deletions
diff --git a/python_moondb/python_moondb/api/policy.py b/python_moondb/python_moondb/api/policy.py
index 9781fda8..97866bfd 100644
--- a/python_moondb/python_moondb/api/policy.py
+++ b/python_moondb/python_moondb/api/policy.py
@@ -7,6 +7,7 @@ from uuid import uuid4
import logging
from python_moonutilities.security_functions import enforce
from python_moondb.api.managers import Managers
+from python_moonutilities import exceptions
logger = logging.getLogger("moon.db.api.policy")
@@ -22,21 +23,31 @@ class PolicyManager(Managers):
models = self.ModelManager.get_models("admin")
for pdp_key, pdp_value in self.PDPManager.get_pdp(user_id).items():
for policy_id in pdp_value["security_pipeline"]:
+ if not policies:
+ raise exceptions.PolicyUnknown
model_id = policies[policy_id]["model_id"]
+ if not models:
+ raise exceptions.ModelUnknown
if meta_rule_id in models[model_id]["meta_rules"]:
return policy_id
@enforce(("read", "write"), "policies")
def update_policy(self, user_id, policy_id, value):
+ if policy_id not in self.driver.get_policies(policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.update_policy(policy_id=policy_id, value=value)
@enforce(("read", "write"), "policies")
def delete_policy(self, user_id, policy_id):
# TODO (asteroide): unmap PDP linked to that policy
+ if policy_id not in self.driver.get_policies(policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.delete_policy(policy_id=policy_id)
@enforce(("read", "write"), "policies")
def add_policy(self, user_id, policy_id=None, value=None):
+ if policy_id in self.driver.get_policies(policy_id=policy_id):
+ raise exceptions.PolicyExisting
if not policy_id:
policy_id = uuid4().hex
return self.driver.add_policy(policy_id=policy_id, value=value)
@@ -235,6 +246,8 @@ class PolicyManager(Managers):
"action": []
}
policy = self.driver.get_policies(policy_id=policy_id)
+ if not policy:
+ raise exceptions.PolicyUnknown
model_id = policy[policy_id]["model_id"]
model = Managers.ModelManager.get_models(user_id=user_id, model_id=model_id)
try:
diff --git a/python_moondb/tests/unit_python/test_pdp.py b/python_moondb/tests/unit_python/test_pdp.py
index cb206d3d..5134c0fb 100755
--- a/python_moondb/tests/unit_python/test_pdp.py
+++ b/python_moondb/tests/unit_python/test_pdp.py
@@ -1,3 +1,6 @@
+import pytest
+
+
def update_pdp(pdp_id, value):
from python_moondb.core import PDPManager
return PDPManager.update_pdp("", pdp_id, value)
@@ -31,6 +34,19 @@ def test_update_pdp(db):
assert pdp
+def test_update_pdp_with_invalid_id(db):
+ pdp_id = "pdp_id1"
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ with pytest.raises(Exception) as exception_info:
+ update_pdp(pdp_id, value)
+ assert str(exception_info.value) == '400: Pdp Unknown'
+
+
def test_delete_pdp(db):
pdp_id = "pdp_id1"
value = {
@@ -44,6 +60,13 @@ def test_delete_pdp(db):
assert len(get_pdp(pdp_id)) == 0
+def test_delete_pdp_with_invalid_id(db):
+ pdp_id = "pdp_id1"
+ with pytest.raises(Exception) as exception_info:
+ delete_pdp(pdp_id)
+ assert str(exception_info.value) == '400: Pdp Unknown'
+
+
def test_add_pdp(db):
pdp_id = "pdp_id1"
value = {
@@ -56,6 +79,20 @@ def test_add_pdp(db):
assert pdp
+def test_add_pdp_twice_with_same_id(db):
+ pdp_id = "pdp_id1"
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ add_pdp(pdp_id, value)
+ with pytest.raises(Exception) as exception_info:
+ add_pdp(pdp_id, value)
+ assert str(exception_info.value) == '409: Pdp Error'
+
+
def test_get_pdp(db):
pdp_id = "pdp_id1"
value = {
@@ -67,3 +104,9 @@ def test_get_pdp(db):
add_pdp(pdp_id, value)
pdp = get_pdp(pdp_id)
assert len(pdp) == 1
+
+
+def test_get_pdp_with_invalid_id(db):
+ pdp_id = "invalid"
+ pdp = get_pdp(pdp_id)
+ assert len(pdp) == 0