aboutsummaryrefslogtreecommitdiffstats
path: root/python_moondb
diff options
context:
space:
mode:
Diffstat (limited to 'python_moondb')
-rw-r--r--python_moondb/Changelog49
-rw-r--r--python_moondb/MANIFEST.in2
-rw-r--r--python_moondb/python_moondb/__init__.py3
-rw-r--r--python_moondb/python_moondb/api/model.py79
-rw-r--r--python_moondb/python_moondb/api/pdp.py8
-rw-r--r--python_moondb/python_moondb/api/policy.py75
-rw-r--r--python_moondb/python_moondb/backends/sql.py847
-rw-r--r--python_moondb/python_moondb/migrate_repo/versions/001_moon.py75
-rw-r--r--python_moondb/tests/unit_python/helpers/__init__.py0
-rw-r--r--python_moondb/tests/unit_python/helpers/assignment_helper.py49
-rw-r--r--python_moondb/tests/unit_python/helpers/category_helper.py54
-rw-r--r--python_moondb/tests/unit_python/helpers/data_helper.py98
-rw-r--r--python_moondb/tests/unit_python/helpers/meta_rule_helper.py48
-rw-r--r--python_moondb/tests/unit_python/helpers/mock_data.py144
-rw-r--r--python_moondb/tests/unit_python/helpers/model_helper.py50
-rw-r--r--python_moondb/tests/unit_python/helpers/pdp_helper.py23
-rw-r--r--python_moondb/tests/unit_python/helpers/policy_helper.py61
-rw-r--r--python_moondb/tests/unit_python/models/test_categories.py79
-rw-r--r--python_moondb/tests/unit_python/models/test_meta_rules.py161
-rw-r--r--python_moondb/tests/unit_python/models/test_models.py400
-rw-r--r--python_moondb/tests/unit_python/policies/mock_data.py55
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_assignments.py302
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_data.py574
-rwxr-xr-xpython_moondb/tests/unit_python/policies/test_policies.py236
-rw-r--r--python_moondb/tests/unit_python/requirements.txt1
-rwxr-xr-xpython_moondb/tests/unit_python/test_pdp.py115
26 files changed, 2302 insertions, 1286 deletions
diff --git a/python_moondb/Changelog b/python_moondb/Changelog
index a7d10b17..f4feef62 100644
--- a/python_moondb/Changelog
+++ b/python_moondb/Changelog
@@ -57,3 +57,52 @@ CHANGES
-----
- Code cleaning
+1.2.6
+-----
+- Remove some code duplication in moon_db
+- handle the extra field for the perimeter
+
+1.2.7
+-----
+- Fix some bugs
+
+1.2.8
+-----
+- Add unique constraints on db tables
+
+1.2.9
+-----
+- Add some verifications when deleting some elements in database
+
+1.2.10
+-----
+- Update the migration script because of a bug introduced in 1.2.8 in rule table
+- Fix bugs due to the previous version
+
+1.2.11
+------
+- adding test cases for perimeter
+- adding subject_object_action to model_test
+- update import of exception
+- add unit_test to test_model
+- add validation for not accepting blank perimeter name or category name
+
+1.2.12
+------
+- Fix the SubjectExisting exception problem
+
+1.2.13
+------
+- Add validations and refactor test cases
+
+1.2.14
+------
+- Fix some bugs for the manager and clean the code
+
+1.2.15
+------
+- Fix test cases after removing syntax error in exceptions
+
+1.2.16
+------
+- Fix the "key length error" in meta_rule table
diff --git a/python_moondb/MANIFEST.in b/python_moondb/MANIFEST.in
index 82b40140..02655837 100644
--- a/python_moondb/MANIFEST.in
+++ b/python_moondb/MANIFEST.in
@@ -3,7 +3,7 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-include README.rst
+include README.md
include LICENSE
include setup.py
include requirements.txt
diff --git a/python_moondb/python_moondb/__init__.py b/python_moondb/python_moondb/__init__.py
index de7c772e..e2e16287 100644
--- a/python_moondb/python_moondb/__init__.py
+++ b/python_moondb/python_moondb/__init__.py
@@ -3,5 +3,4 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-__version__ = "1.2.5"
-
+__version__ = "1.2.16"
diff --git a/python_moondb/python_moondb/api/model.py b/python_moondb/python_moondb/api/model.py
index bd475365..c1603b83 100644
--- a/python_moondb/python_moondb/api/model.py
+++ b/python_moondb/python_moondb/api/model.py
@@ -9,7 +9,6 @@ from python_moonutilities import exceptions
from python_moonutilities.security_functions import filter_input, enforce
from python_moondb.api.managers import Managers
-
logger = logging.getLogger("moon.db.api.model")
@@ -23,6 +22,10 @@ class ModelManager(Managers):
def update_model(self, user_id, model_id, value):
if model_id not in self.driver.get_models(model_id=model_id):
raise exceptions.ModelUnknown
+ if value and 'meta_rules' in value:
+ for meta_rule_id in value['meta_rules']:
+ if not self.driver.get_meta_rules(meta_rule_id=meta_rule_id):
+ raise exceptions.MetaRuleUnknown
return self.driver.update_model(model_id=model_id, value=value)
@enforce(("read", "write"), "models")
@@ -30,6 +33,10 @@ class ModelManager(Managers):
if model_id not in self.driver.get_models(model_id=model_id):
raise exceptions.ModelUnknown
# TODO (asteroide): check that no policy is connected to this model
+ policies = Managers.PolicyManager.get_policies(user_id=user_id)
+ for policy in policies:
+ if policies[policy]['model_id'] == model_id:
+ raise exceptions.DeleteModelWithPolicy
return self.driver.delete_model(model_id=model_id)
@enforce(("read", "write"), "models")
@@ -38,6 +45,10 @@ class ModelManager(Managers):
raise exceptions.ModelExisting
if not model_id:
model_id = uuid4().hex
+ if value and 'meta_rules' in value:
+ for meta_rule_id in value['meta_rules']:
+ if not self.driver.get_meta_rules(meta_rule_id=meta_rule_id):
+ raise exceptions.MetaRuleUnknown
return self.driver.add_model(model_id=model_id, value=value)
@enforce("read", "models")
@@ -48,6 +59,19 @@ class ModelManager(Managers):
def set_meta_rule(self, user_id, meta_rule_id, value):
if meta_rule_id not in self.driver.get_meta_rules(meta_rule_id=meta_rule_id):
raise exceptions.MetaRuleUnknown
+ if value:
+ if 'subject_categories' in value:
+ for subject_category_id in value['subject_categories']:
+ if not self.driver.get_subject_categories(category_id=subject_category_id):
+ raise exceptions.SubjectCategoryUnknown
+ if 'object_categories' in value:
+ for object_category_id in value['object_categories']:
+ if not self.driver.get_object_categories(category_id=object_category_id):
+ raise exceptions.ObjectCategoryUnknown
+ if 'action_categories' in value:
+ for action_category_id in value['action_categories']:
+ if not self.driver.get_action_categories(category_id=action_category_id):
+ raise exceptions.ActionCategoryUnknown
return self.driver.set_meta_rule(meta_rule_id=meta_rule_id, value=value)
@enforce("read", "meta_rules")
@@ -58,9 +82,19 @@ class ModelManager(Managers):
def add_meta_rule(self, user_id, meta_rule_id=None, value=None):
if meta_rule_id in self.driver.get_meta_rules(meta_rule_id=meta_rule_id):
raise exceptions.MetaRuleExisting
- if not meta_rule_id:
- meta_rule_id = uuid4().hex
- logger.info("add_meta_rule {}".format(value))
+ if value:
+ if 'subject_categories' in value:
+ for subject_category_id in value['subject_categories']:
+ if not self.driver.get_subject_categories(category_id=subject_category_id):
+ raise exceptions.SubjectCategoryUnknown
+ if 'object_categories' in value:
+ for object_category_id in value['object_categories']:
+ if not self.driver.get_object_categories(category_id=object_category_id):
+ raise exceptions.ObjectCategoryUnknown
+ if 'action_categories' in value:
+ for action_category_id in value['action_categories']:
+ if not self.driver.get_action_categories(category_id=action_category_id):
+ raise exceptions.ActionCategoryUnknown
return self.driver.set_meta_rule(meta_rule_id=meta_rule_id, value=value)
@enforce(("read", "write"), "meta_rules")
@@ -68,6 +102,10 @@ class ModelManager(Managers):
if meta_rule_id not in self.driver.get_meta_rules(meta_rule_id=meta_rule_id):
raise exceptions.MetaRuleUnknown
# TODO (asteroide): check and/or delete data and assignments and rules linked to that meta_rule
+ models = self.get_models(user_id=user_id)
+ for model_id in models:
+ if models[model_id]['meta_rules'] == meta_rule_id:
+ raise exceptions.DeleteMetaRuleWithModel
return self.driver.delete_meta_rule(meta_rule_id=meta_rule_id)
@enforce("read", "meta_data")
@@ -78,8 +116,6 @@ class ModelManager(Managers):
def add_subject_category(self, user_id, category_id=None, value=None):
if category_id in self.driver.get_subject_categories(category_id=category_id):
raise exceptions.SubjectCategoryExisting
- # if not category_id:
- # category_id = uuid4().hex
return self.driver.add_subject_category(name=value["name"], description=value["description"], uuid=category_id)
@enforce(("read", "write"), "meta_data")
@@ -88,6 +124,16 @@ class ModelManager(Managers):
# TODO (asteroide): delete all meta_rules linked to that category
if category_id not in self.driver.get_subject_categories(category_id=category_id):
raise exceptions.SubjectCategoryUnknown
+ meta_rules = self.get_meta_rules(user_id=user_id)
+ for meta_rule_id in meta_rules:
+ for subject_category_id in meta_rules[meta_rule_id]['subject_categories']:
+ logger.info("delete_subject_category {} {}".format(subject_category_id, meta_rule_id))
+ logger.info("delete_subject_category {}".format(meta_rules[meta_rule_id]))
+ if subject_category_id == category_id:
+ self.delete_meta_rule(user_id, meta_rule_id)
+ # raise exceptions.DeleteCategoryWithMetaRule
+ if self.driver.is_subject_data_exist(category_id=category_id):
+ raise exceptions.DeleteCategoryWithData
return self.driver.delete_subject_category(category_id=category_id)
@enforce("read", "meta_data")
@@ -98,8 +144,6 @@ class ModelManager(Managers):
def add_object_category(self, user_id, category_id=None, value=None):
if category_id in self.driver.get_object_categories(category_id=category_id):
raise exceptions.ObjectCategoryExisting
- # if not category_id:
- # category_id = uuid4().hex
return self.driver.add_object_category(name=value["name"], description=value["description"], uuid=category_id)
@enforce(("read", "write"), "meta_data")
@@ -108,6 +152,13 @@ class ModelManager(Managers):
# TODO (asteroide): delete all meta_rules linked to that category
if category_id not in self.driver.get_object_categories(category_id=category_id):
raise exceptions.ObjectCategoryUnknown
+ meta_rules = self.get_meta_rules(user_id=user_id)
+ for meta_rule_id in meta_rules:
+ for object_category_id in meta_rules[meta_rule_id]['object_categories']:
+ if object_category_id == category_id:
+ self.delete_meta_rule(user_id, meta_rule_id)
+ if self.driver.is_object_data_exist(category_id=category_id):
+ raise exceptions.DeleteCategoryWithData
return self.driver.delete_object_category(category_id=category_id)
@enforce("read", "meta_data")
@@ -118,8 +169,6 @@ class ModelManager(Managers):
def add_action_category(self, user_id, category_id=None, value=None):
if category_id in self.driver.get_action_categories(category_id=category_id):
raise exceptions.ActionCategoryExisting
- # if not category_id:
- # category_id = uuid4().hex
return self.driver.add_action_category(name=value["name"], description=value["description"], uuid=category_id)
@enforce(("read", "write"), "meta_data")
@@ -127,6 +176,12 @@ class ModelManager(Managers):
# TODO (asteroide): delete all data linked to that category
# TODO (asteroide): delete all meta_rules linked to that category
if category_id not in self.driver.get_action_categories(category_id=category_id):
- raise exceptions.ActionCategoryExisting
+ raise exceptions.ActionCategoryUnknown
+ meta_rules = self.get_meta_rules(user_id=user_id)
+ for meta_rule_id in meta_rules:
+ for action_category_id in meta_rules[meta_rule_id]['action_categories']:
+ if action_category_id == category_id:
+ self.delete_meta_rule(user_id, meta_rule_id)
+ if self.driver.is_action_data_exist(category_id=category_id):
+ raise exceptions.DeleteCategoryWithData
return self.driver.delete_action_category(category_id=category_id)
-
diff --git a/python_moondb/python_moondb/api/pdp.py b/python_moondb/python_moondb/api/pdp.py
index 7e852ca8..d0a071c9 100644
--- a/python_moondb/python_moondb/api/pdp.py
+++ b/python_moondb/python_moondb/api/pdp.py
@@ -22,6 +22,10 @@ class PDPManager(Managers):
def update_pdp(self, user_id, pdp_id, value):
if pdp_id not in self.driver.get_pdp(pdp_id=pdp_id):
raise exceptions.PdpUnknown
+ if value and 'security_pipeline' in value:
+ for policy_id in value['security_pipeline']:
+ if not Managers.PolicyManager.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.update_pdp(pdp_id=pdp_id, value=value)
@enforce(("read", "write"), "pdp")
@@ -36,6 +40,10 @@ class PDPManager(Managers):
raise exceptions.PdpExisting
if not pdp_id:
pdp_id = uuid4().hex
+ if value and 'security_pipeline' in value:
+ for policy_id in value['security_pipeline']:
+ if not Managers.PolicyManager.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.add_pdp(pdp_id=pdp_id, value=value)
@enforce("read", "pdp")
diff --git a/python_moondb/python_moondb/api/policy.py b/python_moondb/python_moondb/api/policy.py
index ca313f9a..05c2b7d5 100644
--- a/python_moondb/python_moondb/api/policy.py
+++ b/python_moondb/python_moondb/api/policy.py
@@ -8,6 +8,7 @@ import logging
from python_moonutilities.security_functions import enforce
from python_moondb.api.managers import Managers
from python_moonutilities import exceptions
+# from python_moondb.core import PDPManager
logger = logging.getLogger("moon.db.api.policy")
@@ -39,6 +40,9 @@ class PolicyManager(Managers):
def update_policy(self, user_id, policy_id, value):
if policy_id not in self.driver.get_policies(policy_id=policy_id):
raise exceptions.PolicyUnknown
+ if value and 'model_id' in value and value['model_id'] != "":
+ if not Managers.ModelManager.get_models(user_id, model_id=value['model_id']):
+ raise exceptions.ModelUnknown
return self.driver.update_policy(policy_id=policy_id, value=value)
@enforce(("read", "write"), "policies")
@@ -46,6 +50,11 @@ class PolicyManager(Managers):
# TODO (asteroide): unmap PDP linked to that policy
if policy_id not in self.driver.get_policies(policy_id=policy_id):
raise exceptions.PolicyUnknown
+ pdps = self.PDPManager.get_pdp(user_id=user_id)
+ for pdp in pdps:
+ for policy_id in pdps[pdp]['security_pipeline']:
+ if policy_id == policy_id:
+ raise exceptions.DeletePolicyWithPdp
return self.driver.delete_policy(policy_id=policy_id)
@enforce(("read", "write"), "policies")
@@ -54,6 +63,9 @@ class PolicyManager(Managers):
raise exceptions.PolicyExisting
if not policy_id:
policy_id = uuid4().hex
+ if value and 'model_id' in value and value['model_id'] != "":
+ if not Managers.ModelManager.get_models(user_id, model_id=value['model_id']):
+ raise exceptions.ModelUnknown
return self.driver.add_policy(policy_id=policy_id, value=value)
@enforce("read", "policies")
@@ -62,10 +74,19 @@ class PolicyManager(Managers):
@enforce("read", "perimeter")
def get_subjects(self, user_id, policy_id, perimeter_id=None):
+ if not policy_id:
+ pass
+ elif not (policy_id and self.get_policies(user_id=user_id, policy_id=policy_id)):
+ raise exceptions.PolicyUnknown
return self.driver.get_subjects(policy_id=policy_id, perimeter_id=perimeter_id)
@enforce(("read", "write"), "perimeter")
def add_subject(self, user_id, policy_id, perimeter_id=None, value=None):
+ if not value or "name" not in value or not value["name"].strip():
+ raise exceptions.PerimeterNameInvalid
+ if value["name"] in map(lambda x: x['name'],
+ self.get_subjects(user_id, policy_id, perimeter_id).values()):
+ raise exceptions.SubjectExisting
k_user = Managers.KeystoneManager.get_user_by_name(value.get('name'))
if not k_user['users']:
k_user = Managers.KeystoneManager.create_user(value)
@@ -88,10 +109,16 @@ class PolicyManager(Managers):
@enforce(("read", "write"), "perimeter")
def delete_subject(self, user_id, policy_id, perimeter_id):
+ if policy_id and not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.delete_subject(policy_id=policy_id, perimeter_id=perimeter_id)
@enforce("read", "perimeter")
def get_objects(self, user_id, policy_id, perimeter_id=None):
+ if not policy_id:
+ pass
+ elif not (policy_id and self.get_policies(user_id=user_id, policy_id=policy_id)):
+ raise exceptions.PolicyUnknown
return self.driver.get_objects(policy_id=policy_id, perimeter_id=perimeter_id)
@enforce(("read", "write"), "perimeter")
@@ -104,22 +131,30 @@ class PolicyManager(Managers):
@enforce(("read", "write"), "perimeter")
def delete_object(self, user_id, policy_id, perimeter_id):
+ if policy_id and not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.delete_object(policy_id=policy_id, perimeter_id=perimeter_id)
@enforce("read", "perimeter")
def get_actions(self, user_id, policy_id, perimeter_id=None):
+ if not policy_id:
+ pass
+ elif not (policy_id and self.get_policies(user_id=user_id, policy_id=policy_id)):
+ raise exceptions.PolicyUnknown
return self.driver.get_actions(policy_id=policy_id, perimeter_id=perimeter_id)
@enforce(("read", "write"), "perimeter")
def add_action(self, user_id, policy_id, perimeter_id=None, value=None):
+ logger.debug("add_action {}".format(policy_id))
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
- if not perimeter_id:
- perimeter_id = uuid4().hex
return self.driver.set_action(policy_id=policy_id, perimeter_id=perimeter_id, value=value)
@enforce(("read", "write"), "perimeter")
def delete_action(self, user_id, policy_id, perimeter_id):
+ logger.debug("delete_action {} {} {}".format(policy_id, perimeter_id, self.get_policies(user_id=user_id, policy_id=policy_id)))
+ if policy_id and not self.get_policies(user_id=user_id, policy_id=policy_id):
+ raise exceptions.PolicyUnknown
return self.driver.delete_action(policy_id=policy_id, perimeter_id=perimeter_id)
@enforce("read", "data")
@@ -139,6 +174,8 @@ class PolicyManager(Managers):
def set_subject_data(self, user_id, policy_id, data_id=None, category_id=None, value=None):
if not category_id:
raise Exception('Invalid category id')
+ if not Managers.ModelManager.get_subject_categories(user_id=user_id, category_id=category_id):
+ raise exceptions.MetaDataUnknown
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
if not data_id:
@@ -148,6 +185,9 @@ class PolicyManager(Managers):
@enforce(("read", "write"), "data")
def delete_subject_data(self, user_id, policy_id, data_id):
# TODO (asteroide): check and/or delete assignments linked to that data
+ subject_assignments = self.get_subject_assignments(user_id=user_id, policy_id=policy_id, subject_id=data_id)
+ if subject_assignments:
+ raise exceptions.DeleteData
return self.driver.delete_subject_data(policy_id=policy_id, data_id=data_id)
@enforce("read", "data")
@@ -167,6 +207,8 @@ class PolicyManager(Managers):
def add_object_data(self, user_id, policy_id, data_id=None, category_id=None, value=None):
if not category_id:
raise Exception('Invalid category id')
+ if not Managers.ModelManager.get_object_categories(user_id=user_id, category_id=category_id):
+ raise exceptions.MetaDataUnknown
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
if not data_id:
@@ -176,6 +218,9 @@ class PolicyManager(Managers):
@enforce(("read", "write"), "data")
def delete_object_data(self, user_id, policy_id, data_id):
# TODO (asteroide): check and/or delete assignments linked to that data
+ object_assignments = self.get_object_assignments(user_id=user_id, policy_id=policy_id, object_id=data_id)
+ if object_assignments:
+ raise exceptions.DeleteData
return self.driver.delete_object_data(policy_id=policy_id, data_id=data_id)
@enforce("read", "data")
@@ -195,6 +240,8 @@ class PolicyManager(Managers):
def add_action_data(self, user_id, policy_id, data_id=None, category_id=None, value=None):
if not category_id:
raise Exception('Invalid category id')
+ if not Managers.ModelManager.get_action_categories(user_id=user_id, category_id=category_id):
+ raise exceptions.MetaDataUnknown
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
if not data_id:
@@ -204,6 +251,9 @@ class PolicyManager(Managers):
@enforce(("read", "write"), "data")
def delete_action_data(self, user_id, policy_id, data_id):
# TODO (asteroide): check and/or delete assignments linked to that data
+ action_assignments = self.get_action_assignments(user_id=user_id, policy_id=policy_id, action_id=data_id)
+ if action_assignments:
+ raise exceptions.DeleteData
return self.driver.delete_action_data(policy_id=policy_id, data_id=data_id)
@enforce("read", "assignments")
@@ -214,6 +264,12 @@ class PolicyManager(Managers):
def add_subject_assignment(self, user_id, policy_id, subject_id, category_id, data_id):
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
+ if not self.get_subjects(user_id=user_id, policy_id=policy_id, perimeter_id=subject_id):
+ raise exceptions.SubjectUnknown
+ if not Managers.ModelManager.get_subject_categories(user_id=user_id, category_id=category_id):
+ raise exceptions.MetaDataUnknown
+ if not self.get_subject_data(user_id=user_id, policy_id=policy_id, data_id=data_id):
+ raise exceptions.DataUnknown
return self.driver.add_subject_assignment(policy_id=policy_id, subject_id=subject_id,
category_id=category_id, data_id=data_id)
@@ -230,6 +286,12 @@ class PolicyManager(Managers):
def add_object_assignment(self, user_id, policy_id, object_id, category_id, data_id):
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
+ if not self.get_objects(user_id=user_id, policy_id=policy_id, perimeter_id=object_id):
+ raise exceptions.ObjectUnknown
+ if not Managers.ModelManager.get_object_categories(user_id=user_id, category_id=category_id):
+ raise exceptions.MetaDataUnknown
+ if not self.get_object_data(user_id=user_id, policy_id=policy_id, data_id=data_id):
+ raise exceptions.DataUnknown
return self.driver.add_object_assignment(policy_id=policy_id, object_id=object_id,
category_id=category_id, data_id=data_id)
@@ -246,6 +308,12 @@ class PolicyManager(Managers):
def add_action_assignment(self, user_id, policy_id, action_id, category_id, data_id):
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
+ if not self.get_actions(user_id=user_id, policy_id=policy_id, perimeter_id=action_id):
+ raise exceptions.ActionUnknown
+ if not Managers.ModelManager.get_action_categories(user_id=user_id, category_id=category_id):
+ raise exceptions.MetaDataUnknown
+ if not self.get_action_data(user_id=user_id, policy_id=policy_id, data_id=data_id):
+ raise exceptions.DataUnknown
return self.driver.add_action_assignment(policy_id=policy_id, action_id=action_id,
category_id=category_id, data_id=data_id)
@@ -257,11 +325,14 @@ class PolicyManager(Managers):
@enforce("read", "rules")
def get_rules(self, user_id, policy_id, meta_rule_id=None, rule_id=None):
return self.driver.get_rules(policy_id=policy_id, meta_rule_id=meta_rule_id, rule_id=rule_id)
+ logger.info("delete_subject_data: {} {}".format(policy_id, data_id))
@enforce(("read", "write"), "rules")
def add_rule(self, user_id, policy_id, meta_rule_id, value):
if not self.get_policies(user_id=user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
+ if not self.ModelManager.get_meta_rules(user_id=user_id, meta_rule_id=meta_rule_id):
+ raise exceptions.MetaRuleUnknown
return self.driver.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
@enforce(("read", "write"), "rules")
diff --git a/python_moondb/python_moondb/backends/sql.py b/python_moondb/python_moondb/backends/sql.py
index 1ce8d016..7310e7f3 100644
--- a/python_moondb/python_moondb/backends/sql.py
+++ b/python_moondb/python_moondb/backends/sql.py
@@ -9,13 +9,14 @@ from uuid import uuid4
import sqlalchemy as sql
import logging
from sqlalchemy.orm import sessionmaker
-from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy import create_engine
from contextlib import contextmanager
from sqlalchemy import types as sql_types
from python_moonutilities import configuration
-from python_moonutilities.exceptions import *
+from python_moonutilities import exceptions
from python_moondb.core import PDPDriver, PolicyDriver, ModelDriver
+import sqlalchemy
logger = logging.getLogger("moon.db.driver.sql")
Base = declarative_base()
@@ -61,13 +62,14 @@ class JsonBlob(sql_types.TypeDecorator):
class Model(Base, DictBase):
__tablename__ = 'models'
- attributes = ['id', 'value']
+ attributes = ['id', 'name', 'value']
id = sql.Column(sql.String(64), primary_key=True)
+ name = sql.Column(sql.String(256), nullable=False)
value = sql.Column(JsonBlob(), nullable=True)
def to_dict(self):
return {
- "name": self.value.get("name"),
+ "name": self.name,
"description": self.value.get("description", ""),
"meta_rules": self.value.get("meta_rules", list()),
}
@@ -75,240 +77,198 @@ class Model(Base, DictBase):
class Policy(Base, DictBase):
__tablename__ = 'policies'
- attributes = ['id', 'value']
+ attributes = ['id', 'name', 'model_id', 'value']
id = sql.Column(sql.String(64), primary_key=True)
+ name = sql.Column(sql.String(256), nullable=False)
+ model_id = sql.Column(sql.String(64), nullable=True, default="")
value = sql.Column(JsonBlob(), nullable=True)
def to_dict(self):
return {
- "name": self.value.get("name"),
"description": self.value.get("description", ""),
- "model_id": self.value.get("model_id", ""),
"genre": self.value.get("genre", ""),
+ "model_id": self.model_id,
+ "name": self.name
}
class PDP(Base, DictBase):
__tablename__ = 'pdp'
- attributes = ['id', 'value']
+ attributes = ['id', 'name', 'keystone_project_id', 'value']
id = sql.Column(sql.String(64), primary_key=True)
+ name = sql.Column(sql.String(256), nullable=False)
+ keystone_project_id = sql.Column(sql.String(64), nullable=True, default="")
value = sql.Column(JsonBlob(), nullable=True)
def to_dict(self):
return {
- "name": self.value.get("name"),
+ "name": self.name,
"description": self.value.get("description", ""),
- "keystone_project_id": self.value.get("keystone_project_id", ""),
+ "keystone_project_id": self.keystone_project_id,
"security_pipeline": self.value.get("security_pipeline", []),
}
-class SubjectCategory(Base, DictBase):
- __tablename__ = 'subject_categories'
+class PerimeterCategoryBase(DictBase):
attributes = ['id', 'name', 'description']
id = sql.Column(sql.String(64), primary_key=True)
name = sql.Column(sql.String(256), nullable=False)
description = sql.Column(sql.String(256), nullable=True)
-class ObjectCategory(Base, DictBase):
+class SubjectCategory(Base, PerimeterCategoryBase):
+ __tablename__ = 'subject_categories'
+
+
+class ObjectCategory(Base, PerimeterCategoryBase):
__tablename__ = 'object_categories'
- attributes = ['id', 'name', 'description']
- id = sql.Column(sql.String(64), primary_key=True)
- name = sql.Column(sql.String(256), nullable=False)
- description = sql.Column(sql.String(256), nullable=True)
-class ActionCategory(Base, DictBase):
+class ActionCategory(Base, PerimeterCategoryBase):
__tablename__ = 'action_categories'
- attributes = ['id', 'name', 'description']
- id = sql.Column(sql.String(64), primary_key=True)
- name = sql.Column(sql.String(256), nullable=False)
- description = sql.Column(sql.String(256), nullable=True)
-class Subject(Base, DictBase):
- __tablename__ = 'subjects'
- attributes = ['id', 'value']
+class PerimeterBase(DictBase):
+ attributes = ['id', 'name', 'value']
id = sql.Column(sql.String(64), primary_key=True)
+ name = sql.Column(sql.String(256), nullable=False)
value = sql.Column(JsonBlob(), nullable=True)
+ __mapper_args__ = {'concrete': True}
def __repr__(self):
- return "{}: {}".format(self.id, json.dumps(self.value))
+ return "{} with name {} : {}".format(self.id, self.name, json.dumps(self.value))
def to_return(self):
return {
'id': self.id,
- 'name': self.value.get("name", ""),
+ 'name': self.name,
'description': self.value.get("description", ""),
'email': self.value.get("email", ""),
- 'partner_id': self.value.get("partner_id", ""),
+ 'extra': self.value.get("extra", dict()),
'policy_list': self.value.get("policy_list", [])
}
def to_dict(self):
+ dict_value = copy.deepcopy(self.value)
+ dict_value["name"] = self.name
return {
'id': self.id,
- 'value': self.value
+ 'value': dict_value
}
-class Object(Base, DictBase):
- __tablename__ = 'objects'
- attributes = ['id', 'value']
- id = sql.Column(sql.String(64), primary_key=True)
- value = sql.Column(JsonBlob(), nullable=True)
-
- def __repr__(self):
- return "{}: {}".format(self.id, json.dumps(self.value))
+class Subject(Base, PerimeterBase):
+ __tablename__ = 'subjects'
- def to_dict(self):
- return {
- 'id': self.id,
- 'value': self.value
- }
- def to_return(self):
- return {
- 'id': self.id,
- 'name': self.value.get("name", ""),
- 'description': self.value.get("description", ""),
- 'partner_id': self.value.get("partner_id", ""),
- 'policy_list': self.value.get("policy_list", [])
- }
+class Object(Base, PerimeterBase):
+ __tablename__ = 'objects'
-class Action(Base, DictBase):
+class Action(Base, PerimeterBase):
__tablename__ = 'actions'
- attributes = ['id', 'value']
- id = sql.Column(sql.String(64), primary_key=True)
- value = sql.Column(JsonBlob(), nullable=True)
- def __repr__(self):
- return "{}: {}".format(self.id, json.dumps(self.value))
- def to_dict(self):
- return {
- 'id': self.id,
- 'value': self.value
- }
-
- def to_return(self):
- return {
- 'id': self.id,
- 'name': self.value.get("name", ""),
- 'description': self.value.get("description", ""),
- 'partner_id': self.value.get("partner_id", ""),
- 'policy_list': self.value.get("policy_list", [])
- }
-
-
-class SubjectData(Base, DictBase):
- __tablename__ = 'subject_data'
- attributes = ['id', 'value', 'category_id', 'policy_id']
+class PerimeterDataBase(DictBase):
+ attributes = ['id', 'name', 'value', 'category_id', 'policy_id']
id = sql.Column(sql.String(64), primary_key=True)
+ name = sql.Column(sql.String(256), nullable=False)
value = sql.Column(JsonBlob(), nullable=True)
- category_id = sql.Column(sql.ForeignKey("subject_categories.id"), nullable=False)
- policy_id = sql.Column(sql.ForeignKey("policies.id"), nullable=False)
+ @declared_attr
+ def policy_id(cls):
+ return sql.Column(sql.ForeignKey("policies.id"), nullable=False)
def to_dict(self):
return {
'id': self.id,
- 'name': self.value.get("name", ""),
+ 'name': self.name,
'description': self.value.get("description", ""),
'category_id': self.category_id,
'policy_id': self.policy_id
}
-class ObjectData(Base, DictBase):
+class SubjectData(Base, PerimeterDataBase):
+ __tablename__ = 'subject_data'
+ category_id = sql.Column(sql.ForeignKey("subject_categories.id"), nullable=False)
+
+
+class ObjectData(Base, PerimeterDataBase):
__tablename__ = 'object_data'
- attributes = ['id', 'value', 'category_id', 'policy_id']
- id = sql.Column(sql.String(64), primary_key=True)
- value = sql.Column(JsonBlob(), nullable=True)
category_id = sql.Column(sql.ForeignKey("object_categories.id"), nullable=False)
- policy_id = sql.Column(sql.ForeignKey("policies.id"), nullable=False)
-class ActionData(Base, DictBase):
+class ActionData(Base, PerimeterDataBase):
__tablename__ = 'action_data'
- attributes = ['id', 'value', 'category_id', 'policy_id']
- id = sql.Column(sql.String(64), primary_key=True)
- value = sql.Column(JsonBlob(), nullable=True)
category_id = sql.Column(sql.ForeignKey("action_categories.id"), nullable=False)
- policy_id = sql.Column(sql.ForeignKey("policies.id"), nullable=False)
-class SubjectAssignment(Base, DictBase):
- __tablename__ = 'subject_assignments'
+class PerimeterAssignmentBase(DictBase):
attributes = ['id', 'assignments', 'policy_id', 'subject_id', 'category_id']
id = sql.Column(sql.String(64), primary_key=True)
assignments = sql.Column(JsonBlob(), nullable=True)
- policy_id = sql.Column(sql.ForeignKey("policies.id"), nullable=False)
- subject_id = sql.Column(sql.ForeignKey("subjects.id"), nullable=False)
- category_id = sql.Column(sql.ForeignKey("subject_categories.id"), nullable=False)
+ category_id = None
- def to_dict(self):
+ @declared_attr
+ def policy_id(cls):
+ return sql.Column(sql.ForeignKey("policies.id"), nullable=False)
+
+ def _to_dict(self, element_key, element_value):
return {
"id": self.id,
"policy_id": self.policy_id,
- "subject_id": self.subject_id,
+ element_key: element_value,
"category_id": self.category_id,
"assignments": self.assignments,
}
-class ObjectAssignment(Base, DictBase):
+class SubjectAssignment(Base, PerimeterAssignmentBase):
+ __tablename__ = 'subject_assignments'
+ subject_id = sql.Column(sql.ForeignKey("subjects.id"), nullable=False)
+ category_id = sql.Column(sql.ForeignKey("subject_categories.id"), nullable=False)
+
+ def to_dict(self):
+ return self._to_dict("subject_id", self.subject_id)
+
+
+class ObjectAssignment(Base, PerimeterAssignmentBase):
__tablename__ = 'object_assignments'
attributes = ['id', 'assignments', 'policy_id', 'object_id', 'category_id']
- id = sql.Column(sql.String(64), primary_key=True)
- assignments = sql.Column(JsonBlob(), nullable=True)
- policy_id = sql.Column(sql.ForeignKey("policies.id"), nullable=False)
object_id = sql.Column(sql.ForeignKey("objects.id"), nullable=False)
category_id = sql.Column(sql.ForeignKey("object_categories.id"), nullable=False)
def to_dict(self):
- return {
- "id": self.id,
- "policy_id": self.policy_id,
- "object_id": self.object_id,
- "category_id": self.category_id,
- "assignments": self.assignments,
- }
+ return self._to_dict("object_id", self.object_id)
-class ActionAssignment(Base, DictBase):
+class ActionAssignment(Base, PerimeterAssignmentBase):
__tablename__ = 'action_assignments'
attributes = ['id', 'assignments', 'policy_id', 'action_id', 'category_id']
- id = sql.Column(sql.String(64), primary_key=True)
- assignments = sql.Column(JsonBlob(), nullable=True)
- policy_id = sql.Column(sql.ForeignKey("policies.id"), nullable=False)
action_id = sql.Column(sql.ForeignKey("actions.id"), nullable=False)
category_id = sql.Column(sql.ForeignKey("action_categories.id"), nullable=False)
def to_dict(self):
- return {
- "id": self.id,
- "policy_id": self.policy_id,
- "action_id": self.action_id,
- "category_id": self.category_id,
- "assignments": self.assignments,
- }
+ return self._to_dict("action_id", self.action_id)
class MetaRule(Base, DictBase):
__tablename__ = 'meta_rules'
- attributes = ['id', 'value']
+ attributes = ['id', 'name', 'subject_categories', 'object_categories', 'action_categories', 'value']
id = sql.Column(sql.String(64), primary_key=True)
+ name = sql.Column(sql.String(256), nullable=False)
+ subject_categories = sql.Column(JsonBlob(), nullable=True)
+ object_categories = sql.Column(JsonBlob(), nullable=True)
+ action_categories = sql.Column(JsonBlob(), nullable=True)
value = sql.Column(JsonBlob(), nullable=True)
def to_dict(self):
return {
- "name": self.value["name"],
+ "name": self.name,
"description": self.value.get("description", ""),
- "subject_categories": self.value.get("subject_categories", list()),
- "object_categories": self.value.get("object_categories", list()),
- "action_categories": self.value.get("action_categories", list()),
+ "subject_categories": self.subject_categories,
+ "object_categories": self.object_categories,
+ "action_categories": self.action_categories,
}
@@ -378,15 +338,23 @@ class BaseConnector(object):
class PDPConnector(BaseConnector, PDPDriver):
def update_pdp(self, pdp_id, value):
- with self.get_session_for_write() as session:
- query = session.query(PDP)
- query = query.filter_by(id=pdp_id)
- ref = query.first()
- if ref:
- d = dict(ref.value)
- d.update(value)
- setattr(ref, "value", d)
- return {ref.id: ref.to_dict()}
+ try:
+ with self.get_session_for_write() as session:
+ query = session.query(PDP)
+ query = query.filter_by(id=pdp_id)
+ ref = query.first()
+ if ref:
+ value_wo_name = copy.deepcopy(value)
+ value_wo_name.pop("name", None)
+ value_wo_name.pop("keystone_project_id", None)
+ ref.name = value["name"]
+ ref.keystone_project_id = value["keystone_project_id"]
+ d = dict(ref.value)
+ d.update(value_wo_name)
+ setattr(ref, "value", d)
+ return {ref.id: ref.to_dict()}
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.PdpExisting
def delete_pdp(self, pdp_id):
with self.get_session_for_write() as session:
@@ -394,13 +362,21 @@ class PDPConnector(BaseConnector, PDPDriver):
session.delete(ref)
def add_pdp(self, pdp_id=None, value=None):
- with self.get_session_for_write() as session:
- new = PDP.from_dict({
- "id": pdp_id if pdp_id else uuid4().hex,
- "value": value
- })
- session.add(new)
- return {new.id: new.to_dict()}
+ try:
+ with self.get_session_for_write() as session:
+ value_wo_name = copy.deepcopy(value)
+ value_wo_name.pop("name", None)
+ value_wo_name.pop("keystone_project_id", None)
+ new = PDP.from_dict({
+ "id": pdp_id if pdp_id else uuid4().hex,
+ "name": value["name"],
+ "keystone_project_id": value["keystone_project_id"],
+ "value": value_wo_name
+ })
+ session.add(new)
+ return {new.id: new.to_dict()}
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.PdpExisting
def get_pdp(self, pdp_id=None):
with self.get_session_for_read() as session:
@@ -419,8 +395,13 @@ class PolicyConnector(BaseConnector, PolicyDriver):
query = query.filter_by(id=policy_id)
ref = query.first()
if ref:
+ value_wo_other_info = copy.deepcopy(value)
+ value_wo_other_info.pop("name", None)
+ value_wo_other_info.pop("model_id", None)
+ ref.name = value["name"]
+ ref.model_id= value["model_id"]
d = dict(ref.value)
- d.update(value)
+ d.update(value_wo_other_info)
setattr(ref, "value", d)
return {ref.id: ref.to_dict()}
@@ -431,9 +412,14 @@ class PolicyConnector(BaseConnector, PolicyDriver):
def add_policy(self, policy_id=None, value=None):
with self.get_session_for_write() as session:
+ value_wo_other_info = copy.deepcopy(value)
+ value_wo_other_info.pop("name", None)
+ value_wo_other_info.pop("model_id", None)
new = Policy.from_dict({
"id": policy_id if policy_id else uuid4().hex,
- "value": value
+ "name": value["name"],
+ "model_id": value.get("model_id", ""),
+ "value": value_wo_other_info
})
session.add(new)
return {new.id: new.to_dict()}
@@ -446,9 +432,9 @@ class PolicyConnector(BaseConnector, PolicyDriver):
ref_list = query.all()
return {_ref.id: _ref.to_dict() for _ref in ref_list}
- def get_subjects(self, policy_id, perimeter_id=None):
+ def __get_perimeters(self, ClassType, policy_id, perimeter_id=None):
with self.get_session_for_read() as session:
- query = session.query(Subject)
+ query = session.query(ClassType)
ref_list = copy.deepcopy(query.all())
if perimeter_id:
for _ref in ref_list:
@@ -467,212 +453,148 @@ class PolicyConnector(BaseConnector, PolicyDriver):
return {_ref.id: _ref.to_return() for _ref in results}
return {_ref.id: _ref.to_return() for _ref in ref_list}
- def set_subject(self, policy_id, perimeter_id=None, value=None):
- _subject = None
+ def __set_perimeter(self, ClassType, ClassTypeException, policy_id, perimeter_id=None, value=None):
+ if not value or "name" not in value or not value["name"].strip():
+ raise exceptions.PerimeterNameInvalid
with self.get_session_for_write() as session:
+ _perimeter = None
if perimeter_id:
- query = session.query(Subject)
+ query = session.query(ClassType)
query = query.filter_by(id=perimeter_id)
- _subject = query.first()
- if not _subject:
+ _perimeter = query.first()
+ if not perimeter_id and not _perimeter:
+ query = session.query(ClassType)
+ query = query.filter_by(name=value['name'])
+ _perimeter = query.first()
+ if _perimeter:
+ raise ClassTypeException
+ if not _perimeter:
if "policy_list" not in value or type(value["policy_list"]) is not list:
value["policy_list"] = []
if policy_id and policy_id not in value["policy_list"]:
value["policy_list"] = [policy_id, ]
- new = Subject.from_dict({
+
+ value_wo_name = copy.deepcopy(value)
+ value_wo_name.pop("name", None)
+ new = ClassType.from_dict({
"id": perimeter_id if perimeter_id else uuid4().hex,
- "value": value
+ "name": value["name"],
+ "value": value_wo_name
})
session.add(new)
return {new.id: new.to_return()}
else:
- _value = copy.deepcopy(_subject.to_dict())
+ _value = copy.deepcopy(_perimeter.to_dict())
if "policy_list" not in _value["value"] or type(_value["value"]["policy_list"]) is not list:
_value["value"]["policy_list"] = []
if policy_id and policy_id not in _value["value"]["policy_list"]:
_value["value"]["policy_list"].append(policy_id)
- new_subject = Subject.from_dict(_value)
- # setattr(_subject, "value", _value["value"])
- setattr(_subject, "value", getattr(new_subject, "value"))
- return {_subject.id: _subject.to_return()}
+ _value["value"].update(value)
+
+ name = _value["value"]["name"]
+ _value["value"].pop("name")
+ new_perimeter = ClassType.from_dict({
+ "id": _value["id"],
+ "name": name,
+ "value": _value["value"]
+ })
+ _perimeter.value = new_perimeter.value
+ _perimeter.name = new_perimeter.name
+ return {_perimeter.id: _perimeter.to_return()}
- def delete_subject(self, policy_id, perimeter_id):
+ def __delete_perimeter(self, ClassType, ClassUnknownException, policy_id, perimeter_id):
with self.get_session_for_write() as session:
- query = session.query(Subject)
+ query = session.query(ClassType)
query = query.filter_by(id=perimeter_id)
- _subject = query.first()
- if not _subject:
- raise SubjectUnknown
- old_subject = copy.deepcopy(_subject.to_dict())
- # value = _subject.to_dict()
+ _perimeter = query.first()
+ if not _perimeter:
+ raise ClassUnknownException
+ old_perimeter = copy.deepcopy(_perimeter.to_dict())
try:
- old_subject["value"]["policy_list"].remove(policy_id)
- new_user = Subject.from_dict(old_subject)
- setattr(_subject, "value", getattr(new_user, "value"))
+ old_perimeter["value"]["policy_list"].remove(policy_id)
+ new_perimeter = ClassType.from_dict(old_perimeter)
+ setattr(_perimeter, "value", getattr(new_perimeter, "value"))
except ValueError:
- if not _subject.value["policy_list"]:
- session.delete(_subject)
+ if not _perimeter.value["policy_list"]:
+ session.delete(_perimeter)
+
+ def get_subjects(self, policy_id, perimeter_id=None):
+ return self.__get_perimeters(Subject, policy_id, perimeter_id)
+
+ def set_subject(self, policy_id, perimeter_id=None, value=None):
+ try:
+ return self.__set_perimeter(Subject, exceptions.SubjectExisting, policy_id, perimeter_id=perimeter_id, value=value)
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.SubjectExisting
+
+ def delete_subject(self, policy_id, perimeter_id):
+ self.__delete_perimeter(Subject, exceptions.SubjectUnknown, policy_id, perimeter_id)
def get_objects(self, policy_id, perimeter_id=None):
- with self.get_session_for_read() as session:
- query = session.query(Object)
- ref_list = copy.deepcopy(query.all())
- if perimeter_id:
- for _ref in ref_list:
- _ref_value = _ref.to_return()
- if perimeter_id == _ref.id:
- if policy_id and policy_id in _ref_value["policy_list"]:
- return {_ref.id: _ref_value}
- else:
- return {}
- elif policy_id:
- results = []
- for _ref in ref_list:
- _ref_value = _ref.to_return()
- if policy_id in _ref_value["policy_list"]:
- results.append(_ref)
- return {_ref.id: _ref.to_return() for _ref in results}
- return {_ref.id: _ref.to_return() for _ref in ref_list}
+ return self.__get_perimeters(Object, policy_id, perimeter_id)
def set_object(self, policy_id, perimeter_id=None, value=None):
- _object = None
- with self.get_session_for_write() as session:
- if perimeter_id:
- query = session.query(Object)
- query = query.filter_by(id=perimeter_id)
- _object = query.first()
- if not _object:
- if "policy_list" not in value or type(value["policy_list"]) is not list:
- value["policy_list"] = []
- if policy_id and policy_id not in value["policy_list"]:
- value["policy_list"] = [policy_id, ]
- new = Object.from_dict({
- "id": perimeter_id if perimeter_id else uuid4().hex,
- "value": value
- })
- session.add(new)
- return {new.id: new.to_return()}
- else:
- _value = copy.deepcopy(_object.to_dict())
- if "policy_list" not in _value["value"] or type(_value["value"]["policy_list"]) is not list:
- _value["value"]["policy_list"] = []
- if policy_id and policy_id not in _value["value"]["policy_list"]:
- _value["value"]["policy_list"].append(policy_id)
- new_object = Object.from_dict(_value)
- # setattr(_object, "value", _value["value"])
- setattr(_object, "value", getattr(new_object, "value"))
- return {_object.id: _object.to_return()}
+ try:
+ return self.__set_perimeter(Object, exceptions.ObjectExisting, policy_id, perimeter_id=perimeter_id, value=value)
+ except sqlalchemy.exc.IntegrityError as e:
+ logger.exception("IntegrityError {}".format(e))
+ raise exceptions.ObjectExisting
def delete_object(self, policy_id, perimeter_id):
- with self.get_session_for_write() as session:
- query = session.query(Object)
- query = query.filter_by(id=perimeter_id)
- _object = query.first()
- if not _object:
- raise ObjectUnknown
- old_object = copy.deepcopy(_object.to_dict())
- # value = _object.to_dict()
- try:
- old_object["value"]["policy_list"].remove(policy_id)
- new_user = Object.from_dict(old_object)
- setattr(_object, "value", getattr(new_user, "value"))
- except ValueError:
- if not _object.value["policy_list"]:
- session.delete(_object)
+ self.__delete_perimeter(Object, exceptions.ObjectUnknown, policy_id, perimeter_id)
def get_actions(self, policy_id, perimeter_id=None):
- with self.get_session_for_read() as session:
- query = session.query(Action)
- ref_list = copy.deepcopy(query.all())
- if perimeter_id:
- for _ref in ref_list:
- _ref_value = _ref.to_return()
- if perimeter_id == _ref.id:
- if policy_id and policy_id in _ref_value["policy_list"]:
- return {_ref.id: _ref_value}
- else:
- return {}
- elif policy_id:
- results = []
- for _ref in ref_list:
- _ref_value = _ref.to_return()
- if policy_id in _ref_value["policy_list"]:
- results.append(_ref)
- return {_ref.id: _ref.to_return() for _ref in results}
- return {_ref.id: _ref.to_return() for _ref in ref_list}
+ return self.__get_perimeters(Action, policy_id, perimeter_id)
def set_action(self, policy_id, perimeter_id=None, value=None):
- _action = None
- with self.get_session_for_write() as session:
- if perimeter_id:
- query = session.query(Action)
- query = query.filter_by(id=perimeter_id)
- _action = query.first()
- if not _action:
- if "policy_list" not in value or type(value["policy_list"]) is not list:
- value["policy_list"] = []
- if policy_id and policy_id not in value["policy_list"]:
- value["policy_list"] = [policy_id, ]
- new = Action.from_dict({
- "id": perimeter_id if perimeter_id else uuid4().hex,
- "value": value
- })
- session.add(new)
- return {new.id: new.to_return()}
- else:
- _value = copy.deepcopy(_action.to_dict())
- if "policy_list" not in _value["value"] or type(_value["value"]["policy_list"]) is not list:
- _value["value"]["policy_list"] = []
- if policy_id and policy_id not in _value["value"]["policy_list"]:
- _value["value"]["policy_list"].append(policy_id)
- new_action = Action.from_dict(_value)
- # setattr(_action, "value", _value["value"])
- setattr(_action, "value", getattr(new_action, "value"))
- return {_action.id: _action.to_return()}
+ try:
+ return self.__set_perimeter(Action, exceptions.ActionExisting, policy_id, perimeter_id=perimeter_id, value=value)
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.ActionExisting
def delete_action(self, policy_id, perimeter_id):
- with self.get_session_for_write() as session:
- query = session.query(Action)
- query = query.filter_by(id=perimeter_id)
- _action = query.first()
- if not _action:
- raise ActionUnknown
- old_action = copy.deepcopy(_action.to_dict())
- # value = _action.to_dict()
- try:
- old_action["value"]["policy_list"].remove(policy_id)
- new_user = Action.from_dict(old_action)
- setattr(_action, "value", getattr(new_user, "value"))
- except ValueError:
- if not _action.value["policy_list"]:
- session.delete(_action)
+ self.__delete_perimeter(Action, exceptions.ActionUnknown, policy_id, perimeter_id)
- def get_subject_data(self, policy_id, data_id=None, category_id=None):
- logger.info("driver {} {} {}".format(policy_id, data_id, category_id))
+ def __is_data_exist(self, ClassType, data_id=None, category_id=None):
+ if not data_id:
+ return False
with self.get_session_for_read() as session:
- query = session.query(SubjectData)
- if data_id:
+ query = session.query(ClassType)
+ query = query.filter_by(category_id=category_id)
+ ref_list = query.all()
+ if ref_list:
+ return True
+ return False
+
+ def __get_data(self, ClassType, policy_id, data_id=None, category_id=None):
+ with self.get_session_for_read() as session:
+ query = session.query(ClassType)
+ if policy_id and data_id and category_id:
query = query.filter_by(policy_id=policy_id, id=data_id, category_id=category_id)
- else:
+ elif policy_id and category_id:
query = query.filter_by(policy_id=policy_id, category_id=category_id)
+ else:
+ query = query.filter_by(category_id=category_id)
ref_list = query.all()
- logger.info("ref_list={}".format(ref_list))
return {
"policy_id": policy_id,
"category_id": category_id,
"data": {_ref.id: _ref.to_dict() for _ref in ref_list}
}
- def set_subject_data(self, policy_id, data_id=None, category_id=None, value=None):
+ def __set_data(self, ClassType, ClassTypeData, policy_id, data_id=None, category_id=None, value=None):
with self.get_session_for_write() as session:
- query = session.query(SubjectData)
+ query = session.query(ClassTypeData)
query = query.filter_by(policy_id=policy_id, id=data_id, category_id=category_id)
ref = query.first()
if not ref:
- new_ref = SubjectData.from_dict(
+ value_wo_name = copy.deepcopy(value)
+ value_wo_name.pop("name", None)
+ new_ref = ClassTypeData.from_dict(
{
"id": data_id if data_id else uuid4().hex,
- 'value': value,
+ 'name': value["name"],
+ 'value': value_wo_name,
'category_id': category_id,
'policy_id': policy_id,
}
@@ -680,7 +602,7 @@ class PolicyConnector(BaseConnector, PolicyDriver):
session.add(new_ref)
ref = new_ref
else:
- for attr in Subject.attributes:
+ for attr in ClassType.attributes:
if attr != 'id':
setattr(ref, attr, getattr(ref, attr))
# session.flush()
@@ -690,116 +612,64 @@ class PolicyConnector(BaseConnector, PolicyDriver):
"data": {ref.id: ref.to_dict()}
}
- def delete_subject_data(self, policy_id, data_id):
+ def __delete_data(self, ClassType, policy_id, data_id):
with self.get_session_for_write() as session:
- query = session.query(SubjectData)
+ query = session.query(ClassType)
query = query.filter_by(policy_id=policy_id, id=data_id)
ref = query.first()
if ref:
session.delete(ref)
+ def is_subject_data_exist(self, data_id=None, category_id=None):
+ return self.__is_data_exist(SubjectData, data_id=data_id, category_id=category_id)
+
+ def get_subject_data(self, policy_id, data_id=None, category_id=None):
+ return self.__get_data(SubjectData, policy_id, data_id=data_id, category_id=category_id)
+
+ def set_subject_data(self, policy_id, data_id=None, category_id=None, value=None):
+ try:
+ return self.__set_data(Subject, SubjectData, policy_id, data_id=data_id, category_id=category_id, value=value)
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.SubjectScopeExisting
+
+ def delete_subject_data(self, policy_id, data_id):
+ return self.__delete_data(SubjectData, policy_id, data_id)
+
+ def is_object_data_exist(self, data_id=None, category_id=None):
+ return self.__is_data_exist(ObjectData, data_id=data_id, category_id=category_id)
+
def get_object_data(self, policy_id, data_id=None, category_id=None):
- with self.get_session_for_read() as session:
- query = session.query(ObjectData)
- if data_id:
- query = query.filter_by(policy_id=policy_id, id=data_id, category_id=category_id)
- else:
- query = query.filter_by(policy_id=policy_id, category_id=category_id)
- ref_list = query.all()
- return {
- "policy_id": policy_id,
- "category_id": category_id,
- "data": {_ref.id: _ref.to_dict() for _ref in ref_list}
- }
+ return self.__get_data(ObjectData, policy_id, data_id=data_id, category_id=category_id)
def set_object_data(self, policy_id, data_id=None, category_id=None, value=None):
- with self.get_session_for_write() as session:
- query = session.query(ObjectData)
- query = query.filter_by(policy_id=policy_id, id=data_id, category_id=category_id)
- ref = query.first()
- if not ref:
- new_ref = ObjectData.from_dict(
- {
- "id": data_id if data_id else uuid4().hex,
- 'value': value,
- 'category_id': category_id,
- 'policy_id': policy_id,
- }
- )
- session.add(new_ref)
- ref = new_ref
- else:
- for attr in Object.attributes:
- if attr != 'id':
- setattr(ref, attr, getattr(ref, attr))
- # session.flush()
- return {
- "policy_id": policy_id,
- "category_id": category_id,
- "data": {ref.id: ref.to_dict()}
- }
+ try:
+ return self.__set_data(Object, ObjectData, policy_id, data_id=data_id, category_id=category_id, value=value)
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.ObjectScopeExisting
def delete_object_data(self, policy_id, data_id):
- with self.get_session_for_write() as session:
- query = session.query(ObjectData)
- query = query.filter_by(policy_id=policy_id, id=data_id)
- ref = query.first()
- if ref:
- session.delete(ref)
+ return self.__delete_data(ObjectData, policy_id, data_id)
+
+ def is_action_data_exist(self, data_id=None,category_id=None):
+ return self.__is_data_exist(ActionData, data_id=data_id, category_id=category_id)
def get_action_data(self, policy_id, data_id=None, category_id=None):
- with self.get_session_for_read() as session:
- query = session.query(ActionData)
- if data_id:
- query = query.filter_by(policy_id=policy_id, id=data_id, category_id=category_id)
- else:
- query = query.filter_by(policy_id=policy_id, category_id=category_id)
- ref_list = query.all()
- return {
- "policy_id": policy_id,
- "category_id": category_id,
- "data": {_ref.id: _ref.to_dict() for _ref in ref_list}
- }
+ return self.__get_data(ActionData, policy_id, data_id=data_id, category_id=category_id)
def set_action_data(self, policy_id, data_id=None, category_id=None, value=None):
- with self.get_session_for_write() as session:
- query = session.query(ActionData)
- query = query.filter_by(policy_id=policy_id, id=data_id, category_id=category_id)
- ref = query.first()
- if not ref:
- new_ref = ActionData.from_dict(
- {
- "id": data_id if data_id else uuid4().hex,
- 'value': value,
- 'category_id': category_id,
- 'policy_id': policy_id,
- }
- )
- session.add(new_ref)
- ref = new_ref
- else:
- for attr in Action.attributes:
- if attr != 'id':
- setattr(ref, attr, getattr(ref, attr))
- # session.flush()
- return {
- "policy_id": policy_id,
- "category_id": category_id,
- "data": {ref.id: ref.to_dict()}
- }
+ try:
+ return self.__set_data(Action, ActionData, policy_id, data_id=data_id, category_id=category_id, value=value)
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.ActionScopeExisting
def delete_action_data(self, policy_id, data_id):
- with self.get_session_for_write() as session:
- query = session.query(ActionData)
- query = query.filter_by(policy_id=policy_id, id=data_id)
- ref = query.first()
- if ref:
- session.delete(ref)
+ return self.__delete_data(ActionData, policy_id, data_id)
def get_subject_assignments(self, policy_id, subject_id=None, category_id=None):
with self.get_session_for_write() as session:
query = session.query(SubjectAssignment)
if subject_id and category_id:
+ #TODO change the subject_id to perimeter_id to allow code refactoring
query = query.filter_by(policy_id=policy_id, subject_id=subject_id, category_id=category_id)
elif subject_id:
query = query.filter_by(policy_id=policy_id, subject_id=subject_id)
@@ -819,6 +689,8 @@ class PolicyConnector(BaseConnector, PolicyDriver):
if data_id not in assignments:
assignments.append(data_id)
setattr(ref, "assignments", assignments)
+ else:
+ raise exceptions.SubjectAssignmentExisting
else:
ref = SubjectAssignment.from_dict(
{
@@ -852,6 +724,7 @@ class PolicyConnector(BaseConnector, PolicyDriver):
with self.get_session_for_write() as session:
query = session.query(ObjectAssignment)
if object_id and category_id:
+ #TODO change the object_id to perimeter_id to allow code refactoring
query = query.filter_by(policy_id=policy_id, object_id=object_id, category_id=category_id)
elif object_id:
query = query.filter_by(policy_id=policy_id, object_id=object_id)
@@ -871,6 +744,8 @@ class PolicyConnector(BaseConnector, PolicyDriver):
if data_id not in assignments:
assignments.append(data_id)
setattr(ref, "assignments", assignments)
+ else:
+ raise exceptions.ObjectAssignmentExisting
else:
ref = ObjectAssignment.from_dict(
{
@@ -904,6 +779,7 @@ class PolicyConnector(BaseConnector, PolicyDriver):
with self.get_session_for_write() as session:
query = session.query(ActionAssignment)
if action_id and category_id:
+ # TODO change the action_id to perimeter_id to allow code refactoring
query = query.filter_by(policy_id=policy_id, action_id=action_id, category_id=category_id)
elif action_id:
query = query.filter_by(policy_id=policy_id, action_id=action_id)
@@ -923,6 +799,8 @@ class PolicyConnector(BaseConnector, PolicyDriver):
if data_id not in assignments:
assignments.append(data_id)
setattr(ref, "assignments", assignments)
+ else:
+ raise exceptions.ActionAssignmentExisting
else:
ref = ActionAssignment.from_dict(
{
@@ -976,13 +854,12 @@ class PolicyConnector(BaseConnector, PolicyDriver):
}
def add_rule(self, policy_id, meta_rule_id, value):
- with self.get_session_for_write() as session:
- query = session.query(Rule)
- query = query.filter_by(policy_id=policy_id, meta_rule_id=meta_rule_id)
- ref_list = query.all()
- rules = list(map(lambda x: x.rule, ref_list))
- if not rules or value not in rules:
- logger.info("add_rule IN IF")
+ try:
+ rules = self.get_rules(policy_id, meta_rule_id=meta_rule_id)
+ for _rule in map(lambda x: x["rule"], rules["rules"]):
+ if list(value.get('rule')) == list(_rule):
+ raise exceptions.RuleExisting
+ with self.get_session_for_write() as session:
ref = Rule.from_dict(
{
"id": uuid4().hex,
@@ -993,7 +870,8 @@ class PolicyConnector(BaseConnector, PolicyDriver):
)
session.add(ref)
return {ref.id: ref.to_dict()}
- return {}
+ except sqlalchemy.exc.IntegrityError:
+ raise exceptions.RuleExisting
def delete_rule(self, policy_id, rule_id):
with self.get_session_for_write() as session:
@@ -1013,8 +891,11 @@ class ModelConnector(BaseConnector, ModelDriver):
query = query.filter_by(id=model_id)
ref = query.first()
if ref:
+ value_wo_name = copy.deepcopy(value)
+ value_wo_name.pop("name", None)
+ setattr(ref, "name", value["name"])
d = dict(ref.value)
- d.update(value)
+ d.update(value_wo_name)
setattr(ref, "value", d)
return {ref.id: ref.to_dict()}
@@ -1024,13 +905,19 @@ class ModelConnector(BaseConnector, ModelDriver):
session.delete(ref)
def add_model(self, model_id=None, value=None):
- with self.get_session_for_write() as session:
- new = Model.from_dict({
- "id": model_id if model_id else uuid4().hex,
- "value": value
- })
- session.add(new)
- return {new.id: new.to_dict()}
+ try:
+ with self.get_session_for_write() as session:
+ value_wo_name = copy.deepcopy(value)
+ value_wo_name.pop("name", None)
+ new = Model.from_dict({
+ "id": model_id if model_id else uuid4().hex,
+ "name": value["name"],
+ "value": value_wo_name
+ })
+ session.add(new)
+ return {new.id: new.to_dict()}
+ except sqlalchemy.exc.IntegrityError as e:
+ raise exceptions.ModelExisting
def get_models(self, model_id=None):
with self.get_session_for_read() as session:
@@ -1039,23 +926,41 @@ class ModelConnector(BaseConnector, ModelDriver):
ref_list = query.filter(Model.id == model_id)
else:
ref_list = query.all()
- return {_ref.id: _ref.to_dict() for _ref in ref_list}
+
+ r = {_ref.id: _ref.to_dict() for _ref in ref_list}
+ return r
def set_meta_rule(self, meta_rule_id, value):
with self.get_session_for_write() as session:
- query = session.query(MetaRule)
- query = query.filter_by(id=meta_rule_id)
- ref = query.first()
- if not ref:
- ref = MetaRule.from_dict(
- {
- "id": meta_rule_id if meta_rule_id else uuid4().hex,
- "value": value
- }
- )
- session.add(ref)
+ value_wo_other_data = copy.deepcopy(value)
+ value_wo_other_data.pop("name", None)
+ value_wo_other_data.pop("subject_categories", None)
+ value_wo_other_data.pop("object_categories", None)
+ value_wo_other_data.pop("action_categories", None)
+ if meta_rule_id is None:
+ try:
+ ref = MetaRule.from_dict(
+ {
+ "id": uuid4().hex,
+ "name": value["name"],
+ "subject_categories": value["subject_categories"],
+ "object_categories": value["object_categories"],
+ "action_categories": value["action_categories"],
+ "value": value_wo_other_data
+ }
+ )
+ session.add(ref)
+ except sqlalchemy.exc.IntegrityError as e:
+ raise exceptions.MetaRuleExisting
else:
- setattr(ref, "value", value)
+ query = session.query(MetaRule)
+ query = query.filter_by(id=meta_rule_id)
+ ref = query.first()
+ setattr(ref, "name", value["name"])
+ setattr(ref, "subject_categories", value["subject_categories"])
+ setattr(ref, "object_categories", value["object_categories"])
+ setattr(ref, "action_categories", value["action_categories"])
+ setattr(ref, "value", value_wo_other_data)
return {ref.id: ref.to_dict()}
def get_meta_rules(self, meta_rule_id=None):
@@ -1074,101 +979,73 @@ class ModelConnector(BaseConnector, ModelDriver):
if ref:
session.delete(ref)
- def get_subject_categories(self, category_id=None):
+ def __get_perimeter_categories(self, ClassType, category_id=None):
with self.get_session_for_read() as session:
- query = session.query(SubjectCategory)
+ query = session.query(ClassType)
if category_id:
query = query.filter_by(id=category_id)
ref_list = query.all()
return {_ref.id: _ref.to_dict() for _ref in ref_list}
- def add_subject_category(self, name, description, uuid=None):
+ def __add_perimeter_category(self, ClassType, name, description, uuid=None):
+ if not name.strip():
+ raise exceptions.CategoryNameInvalid
with self.get_session_for_write() as session:
- query = session.query(SubjectCategory)
- query = query.filter_by(name=name)
- ref = query.first()
- if not ref:
- ref = SubjectCategory.from_dict(
- {
- "id": uuid if uuid else uuid4().hex,
- "name": name,
- "description": description
- }
- )
- session.add(ref)
+ ref = ClassType.from_dict(
+ {
+ "id": uuid if uuid else uuid4().hex,
+ "name": name,
+ "description": description
+ }
+ )
+ session.add(ref)
return {ref.id: ref.to_dict()}
- def delete_subject_category(self, category_id):
+ def __delete_perimeter_category(self, ClassType, category_id):
with self.get_session_for_write() as session:
- query = session.query(SubjectCategory)
+ query = session.query(ClassType)
query = query.filter_by(id=category_id)
ref = query.first()
if ref:
session.delete(ref)
+ def get_subject_categories(self, category_id=None):
+ return self.__get_perimeter_categories(SubjectCategory, category_id=category_id)
+
+ def add_subject_category(self, name, description, uuid=None):
+ try:
+ return self.__add_perimeter_category(SubjectCategory, name, description, uuid=uuid)
+ except sql.exc.IntegrityError as e:
+ raise exceptions.SubjectCategoryExisting()
+
+ def delete_subject_category(self, category_id):
+ self.__delete_perimeter_category(SubjectCategory, category_id)
+
def get_object_categories(self, category_id=None):
- with self.get_session_for_read() as session:
- query = session.query(ObjectCategory)
- if category_id:
- query = query.filter_by(id=category_id)
- ref_list = query.all()
- return {_ref.id: _ref.to_dict() for _ref in ref_list}
+ return self.__get_perimeter_categories(ObjectCategory, category_id=category_id)
def add_object_category(self, name, description, uuid=None):
- with self.get_session_for_write() as session:
- query = session.query(ObjectCategory)
- query = query.filter_by(name=name)
- ref = query.first()
- if not ref:
- ref = ObjectCategory.from_dict(
- {
- "id": uuid if uuid else uuid4().hex,
- "name": name,
- "description": description
- }
- )
- session.add(ref)
- return {ref.id: ref.to_dict()}
+ try:
+ return self.__add_perimeter_category(ObjectCategory, name, description, uuid=uuid)
+ except sql.exc.IntegrityError as e:
+ raise exceptions.ObjectCategoryExisting()
def delete_object_category(self, category_id):
- with self.get_session_for_write() as session:
- query = session.query(ObjectCategory)
- query = query.filter_by(id=category_id)
- ref = query.first()
- if ref:
- session.delete(ref)
+ self.__delete_perimeter_category(ObjectCategory, category_id)
def get_action_categories(self, category_id=None):
- with self.get_session_for_read() as session:
- query = session.query(ActionCategory)
- if category_id:
- query = query.filter_by(id=category_id)
- ref_list = query.all()
- return {_ref.id: _ref.to_dict() for _ref in ref_list}
+
+ return self.__get_perimeter_categories(ActionCategory, category_id=category_id)
def add_action_category(self, name, description, uuid=None):
- with self.get_session_for_write() as session:
- query = session.query(ActionCategory)
- query = query.filter_by(name=name)
- ref = query.first()
- if not ref:
- ref = ActionCategory.from_dict(
- {
- "id": uuid if uuid else uuid4().hex,
- "name": name,
- "description": description
- }
- )
- session.add(ref)
- return {ref.id: ref.to_dict()}
+ try:
+ return self.__add_perimeter_category(ActionCategory, name, description, uuid=uuid)
+ except sql.exc.IntegrityError as e:
+ raise exceptions.ActionCategoryExisting()
def delete_action_category(self, category_id):
- with self.get_session_for_write() as session:
- query = session.query(ActionCategory)
- query = query.filter_by(id=category_id)
- ref = query.first()
- if ref:
- session.delete(ref)
+ self.__delete_perimeter_category(ActionCategory, category_id)
+
# Getter and Setter for subject_category
# def get_subject_categories_dict(self, intra_extension_id):
diff --git a/python_moondb/python_moondb/migrate_repo/versions/001_moon.py b/python_moondb/python_moondb/migrate_repo/versions/001_moon.py
index 2cc36140..1bfb2ffa 100644
--- a/python_moondb/python_moondb/migrate_repo/versions/001_moon.py
+++ b/python_moondb/python_moondb/migrate_repo/versions/001_moon.py
@@ -3,7 +3,19 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+import json
import sqlalchemy as sql
+from sqlalchemy import types as sql_types
+
+class JsonBlob(sql_types.TypeDecorator):
+
+ impl = sql.Text
+
+ def process_bind_param(self, value, dialect):
+ return json.dumps(value)
+
+ def process_result_value(self, value, dialect):
+ return json.loads(value)
def upgrade(migrate_engine):
@@ -14,7 +26,10 @@ def upgrade(migrate_engine):
'pdp',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('keystone_project_id', sql.String(64), nullable=True, default=""),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', 'keystone_project_id', name='unique_constraint_models'),
mysql_engine='InnoDB',
mysql_charset='utf8')
table.create(migrate_engine, checkfirst=True)
@@ -23,7 +38,10 @@ def upgrade(migrate_engine):
'policies',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('model_id', sql.String(64), nullable=True, default=""),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', 'model_id', name='unique_constraint_models'),
mysql_engine='InnoDB',
mysql_charset='utf8')
table.create(migrate_engine, checkfirst=True)
@@ -32,7 +50,9 @@ def upgrade(migrate_engine):
'models',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', name='unique_constraint_models'),
mysql_engine='InnoDB',
mysql_charset='utf8')
table.create(migrate_engine, checkfirst=True)
@@ -43,6 +63,8 @@ def upgrade(migrate_engine):
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('name', sql.String(256), nullable=False),
sql.Column('description', sql.String(256), nullable=True),
+
+ sql.UniqueConstraint('name', name='unique_constraint_subject_categories'),
mysql_engine='InnoDB',
mysql_charset='utf8')
subject_categories_table.create(migrate_engine, checkfirst=True)
@@ -53,6 +75,8 @@ def upgrade(migrate_engine):
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('name', sql.String(256), nullable=False),
sql.Column('description', sql.String(256), nullable=True),
+
+ sql.UniqueConstraint('name', name='unique_constraint_object_categories'),
mysql_engine='InnoDB',
mysql_charset='utf8')
object_categories_table.create(migrate_engine, checkfirst=True)
@@ -63,6 +87,8 @@ def upgrade(migrate_engine):
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('name', sql.String(256), nullable=False),
sql.Column('description', sql.String(256), nullable=True),
+
+ sql.UniqueConstraint('name', name='unique_constraint_action_categories'),
mysql_engine='InnoDB',
mysql_charset='utf8')
action_categories_table.create(migrate_engine, checkfirst=True)
@@ -71,7 +97,9 @@ def upgrade(migrate_engine):
'subjects',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', name='unique_constraint_subjects'),
mysql_engine='InnoDB',
mysql_charset='utf8')
subjects_table.create(migrate_engine, checkfirst=True)
@@ -80,7 +108,9 @@ def upgrade(migrate_engine):
'objects',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', name='unique_constraint_objects'),
mysql_engine='InnoDB',
mysql_charset='utf8')
objects_table.create(migrate_engine, checkfirst=True)
@@ -89,7 +119,9 @@ def upgrade(migrate_engine):
'actions',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', name='unique_constraint_actions'),
mysql_engine='InnoDB',
mysql_charset='utf8')
actions_table.create(migrate_engine, checkfirst=True)
@@ -98,9 +130,11 @@ def upgrade(migrate_engine):
'subject_data',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
sql.Column('category_id', sql.ForeignKey("subject_categories.id"), nullable=False),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
+ sql.UniqueConstraint('name', 'category_id', 'policy_id', name='unique_constraint_subject_data'),
mysql_engine='InnoDB',
mysql_charset='utf8')
subject_data_table.create(migrate_engine, checkfirst=True)
@@ -109,9 +143,11 @@ def upgrade(migrate_engine):
'object_data',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
sql.Column('category_id', sql.ForeignKey("object_categories.id"), nullable=False),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
+ sql.UniqueConstraint('name', 'category_id', 'policy_id', name='unique_constraint_object_data'),
mysql_engine='InnoDB',
mysql_charset='utf8')
object_data_table.create(migrate_engine, checkfirst=True)
@@ -120,9 +156,11 @@ def upgrade(migrate_engine):
'action_data',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
sql.Column('category_id', sql.ForeignKey("action_categories.id"), nullable=False),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
+ sql.UniqueConstraint('name', 'category_id', 'policy_id', name='unique_constraint_action_data'),
mysql_engine='InnoDB',
mysql_charset='utf8')
action_data_table.create(migrate_engine, checkfirst=True)
@@ -131,10 +169,11 @@ def upgrade(migrate_engine):
'subject_assignments',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('assignments', sql.Text(), nullable=True),
+ sql.Column('assignments', sql.String(256), nullable=True),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
sql.Column('subject_id', sql.ForeignKey("subjects.id"), nullable=False),
sql.Column('category_id', sql.ForeignKey("subject_categories.id"), nullable=False),
+ sql.UniqueConstraint('policy_id', 'subject_id', 'category_id', name='unique_constraint_subject_assignment'),
mysql_engine='InnoDB',
mysql_charset='utf8')
subject_assignments_table.create(migrate_engine, checkfirst=True)
@@ -143,10 +182,11 @@ def upgrade(migrate_engine):
'object_assignments',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('assignments', sql.Text(), nullable=True),
+ sql.Column('assignments', sql.String(256), nullable=True),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
sql.Column('object_id', sql.ForeignKey("objects.id"), nullable=False),
sql.Column('category_id', sql.ForeignKey("object_categories.id"), nullable=False),
+ sql.UniqueConstraint('policy_id', 'object_id', 'category_id', name='unique_constraint_object_assignment'),
mysql_engine='InnoDB',
mysql_charset='utf8')
object_assignments_table.create(migrate_engine, checkfirst=True)
@@ -155,10 +195,11 @@ def upgrade(migrate_engine):
'action_assignments',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('assignments', sql.Text(), nullable=True),
+ sql.Column('assignments', sql.String(256), nullable=True),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
sql.Column('action_id', sql.ForeignKey("actions.id"), nullable=False),
sql.Column('category_id', sql.ForeignKey("action_categories.id"), nullable=False),
+ sql.UniqueConstraint('policy_id', 'action_id', 'category_id', name='unique_constraint_action_assignment'),
mysql_engine='InnoDB',
mysql_charset='utf8')
action_assignments_table.create(migrate_engine, checkfirst=True)
@@ -167,7 +208,13 @@ def upgrade(migrate_engine):
'meta_rules',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('value', sql.Text(), nullable=True),
+ sql.Column('name', sql.String(256), nullable=False),
+ sql.Column('subject_categories', JsonBlob(), nullable=False),
+ sql.Column('object_categories', JsonBlob(), nullable=False),
+ sql.Column('action_categories', JsonBlob(), nullable=False),
+ sql.Column('value', JsonBlob(), nullable=True),
+ sql.UniqueConstraint('name', name='unique_constraint_meta_rule_name'),
+ # sql.UniqueConstraint('subject_categories', 'object_categories', 'action_categories', name='unique_constraint_meta_rule_def'),
mysql_engine='InnoDB',
mysql_charset='utf8')
meta_rules_table.create(migrate_engine, checkfirst=True)
@@ -176,7 +223,7 @@ def upgrade(migrate_engine):
'rules',
meta,
sql.Column('id', sql.String(64), primary_key=True),
- sql.Column('rule', sql.Text(), nullable=True),
+ sql.Column('rule', JsonBlob(), nullable=True),
sql.Column('policy_id', sql.ForeignKey("policies.id"), nullable=False),
sql.Column('meta_rule_id', sql.ForeignKey("meta_rules.id"), nullable=False),
mysql_engine='InnoDB',
diff --git a/python_moondb/tests/unit_python/helpers/__init__.py b/python_moondb/tests/unit_python/helpers/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/__init__.py
diff --git a/python_moondb/tests/unit_python/helpers/assignment_helper.py b/python_moondb/tests/unit_python/helpers/assignment_helper.py
new file mode 100644
index 00000000..22a56e38
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/assignment_helper.py
@@ -0,0 +1,49 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def get_action_assignments(policy_id, action_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_action_assignments("", policy_id, action_id, category_id)
+
+
+def add_action_assignment(policy_id, action_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def delete_action_assignment(policy_id, action_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def get_object_assignments(policy_id, object_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_object_assignments("", policy_id, object_id, category_id)
+
+
+def add_object_assignment(policy_id, object_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def delete_object_assignment(policy_id, object_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def get_subject_assignments(policy_id, subject_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id)
+
+
+def add_subject_assignment(policy_id, subject_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
+
+def delete_subject_assignment(policy_id, subject_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
diff --git a/python_moondb/tests/unit_python/helpers/category_helper.py b/python_moondb/tests/unit_python/helpers/category_helper.py
new file mode 100644
index 00000000..55e95d91
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/category_helper.py
@@ -0,0 +1,54 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def add_subject_category(cat_id=None, value=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.add_subject_category(user_id=None, category_id=cat_id, value=value)
+ return category
+
+
+def get_subject_category(cat_id=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.get_subject_categories(user_id=None, category_id=cat_id)
+ return category
+
+
+def add_object_category(cat_id=None, value=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.add_object_category(user_id=None, category_id=cat_id, value=value)
+ return category
+
+
+def get_object_category(cat_id=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.get_object_categories(user_id=None, category_id=cat_id)
+ return category
+
+
+def add_action_category(cat_id=None, value=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.add_action_category(user_id=None, category_id=cat_id, value=value)
+ return category
+
+
+def get_action_category(cat_id=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.get_action_categories(user_id=None, category_id=cat_id)
+ return category
+
+
+def delete_subject_category(category_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.delete_subject_category("", category_id=category_id)
+
+
+def delete_object_category(category_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.delete_object_category("", category_id=category_id)
+
+
+def delete_action_category(category_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.delete_action_category("", category_id=category_id)
diff --git a/python_moondb/tests/unit_python/helpers/data_helper.py b/python_moondb/tests/unit_python/helpers/data_helper.py
new file mode 100644
index 00000000..20d9ae9a
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/data_helper.py
@@ -0,0 +1,98 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def get_action_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_action_data("", policy_id, data_id, category_id)
+
+
+def add_action_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action_data("", policy_id, data_id, category_id, value)
+
+
+def delete_action_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action_data("", policy_id, data_id)
+
+
+def get_object_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_object_data("", policy_id, data_id, category_id)
+
+
+def add_object_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object_data("", policy_id, data_id, category_id, value)
+
+
+def delete_object_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object_data("", policy_id, data_id)
+
+
+def get_subject_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subject_data("", policy_id, data_id, category_id)
+
+
+def add_subject_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value)
+
+
+def delete_subject_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject_data("", policy_id, data_id)
+
+
+def get_actions(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_actions("", policy_id, perimeter_id)
+
+
+def add_action(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action("", policy_id, perimeter_id, value)
+
+
+def delete_action(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action("", policy_id, perimeter_id)
+
+
+def get_objects(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_objects("", policy_id, perimeter_id)
+
+
+def add_object(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object("", policy_id, perimeter_id, value)
+
+
+def delete_object(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object("", policy_id, perimeter_id)
+
+
+def get_subjects(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subjects("", policy_id, perimeter_id)
+
+
+def add_subject(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_subject("", policy_id, perimeter_id, value)
+
+
+def delete_subject(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject("", policy_id, perimeter_id)
+
+
+def get_available_metadata(policy_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_available_metadata("", policy_id)
diff --git a/python_moondb/tests/unit_python/helpers/meta_rule_helper.py b/python_moondb/tests/unit_python/helpers/meta_rule_helper.py
new file mode 100644
index 00000000..80d138c6
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/meta_rule_helper.py
@@ -0,0 +1,48 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from helpers import mock_data
+
+
+def set_meta_rule(meta_rule_id, value=None):
+ from python_moondb.core import ModelManager
+ if not value:
+ action_category_id = mock_data.create_action_category("action_category_id1")
+ subject_category_id = mock_data.create_subject_category("subject_category_id1")
+ object_category_id = mock_data.create_object_category("object_category_id1")
+ value = {
+ "name": "MLS_meta_rule",
+ "description": "test",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ return ModelManager.set_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value)
+
+
+def add_meta_rule(meta_rule_id=None, value=None):
+ from python_moondb.core import ModelManager
+ if not value:
+ action_category_id = mock_data.create_action_category("action_category_id1")
+ subject_category_id = mock_data.create_subject_category("subject_category_id1")
+ object_category_id = mock_data.create_object_category("object_category_id1")
+ value = {
+ "name": "MLS_meta_rule",
+ "description": "test",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ return ModelManager.add_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value)
+
+
+def get_meta_rules(meta_rule_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.get_meta_rules(user_id=None, meta_rule_id=meta_rule_id)
+
+
+def delete_meta_rules(meta_rule_id=None):
+ from python_moondb.core import ModelManager
+ ModelManager.delete_meta_rule(user_id=None, meta_rule_id=meta_rule_id)
diff --git a/python_moondb/tests/unit_python/helpers/mock_data.py b/python_moondb/tests/unit_python/helpers/mock_data.py
new file mode 100644
index 00000000..82eebe88
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/mock_data.py
@@ -0,0 +1,144 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from .category_helper import *
+from .policy_helper import *
+from .data_helper import *
+from .model_helper import *
+from .meta_rule_helper import *
+
+
+def create_subject_category(name):
+ subject_category = add_subject_category(
+ value={"name": name, "description": "description 1"})
+ return list(subject_category.keys())[0]
+
+
+def create_object_category(name):
+ object_category = add_object_category(
+ value={"name": name, "description": "description 1"})
+ return list(object_category.keys())[0]
+
+
+def create_action_category(name):
+ action_category = add_action_category(
+ value={"name": name, "description": "description 1"})
+ return list(action_category.keys())[0]
+
+
+def create_model(meta_rule_id, model_name="test_model"):
+ value = {
+ "name": model_name,
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+
+ }
+ return value
+
+
+def create_policy(model_id, policy_name="policy_1"):
+ value = {
+ "name": policy_name,
+ "model_id": model_id,
+ "genre": "authz",
+ "description": "test",
+ }
+ return value
+
+
+def create_pdp(policies_ids):
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": policies_ids,
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ return value
+
+
+def create_new_policy(subject_category_name=None, object_category_name=None, action_category_name=None,
+ model_name="test_model", policy_name="policy_1", meta_rule_name="meta_rule1"):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = create_new_meta_rule(
+ subject_category_name=subject_category_name,
+ object_category_name=object_category_name,
+ action_category_name=action_category_name, meta_rule_name=meta_rule_name)
+ model = add_model(value=create_model(meta_rule_id, model_name))
+ model_id = list(model.keys())[0]
+ value = create_policy(model_id, policy_name)
+ policy = add_policies(value=value)
+ assert policy
+ policy_id = list(policy.keys())[0]
+ return subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id
+
+
+def create_new_meta_rule(subject_category_name=None, object_category_name=None, action_category_name=None,
+ meta_rule_name="meta_rule1"):
+ subject_category_id = create_subject_category(subject_category_name)
+ object_category_id = create_object_category(object_category_name)
+ action_category_id = create_action_category(action_category_name)
+ value = {"name": meta_rule_name,
+ "algorithm": "name of the meta rule algorithm",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ meta_rule = add_meta_rule(value=value)
+ return subject_category_id, object_category_id, action_category_id, list(meta_rule.keys())[0]
+
+
+def create_subject(policy_id):
+ value = {
+ "name": "testuser",
+ "description": "test",
+ }
+ subject = add_subject(policy_id=policy_id, value=value)
+ return list(subject.keys())[0]
+
+
+def create_object(policy_id):
+ value = {
+ "name": "testobject",
+ "description": "test",
+ }
+ object = add_object(policy_id=policy_id, value=value)
+ return list(object.keys())[0]
+
+
+def create_action(policy_id):
+ value = {
+ "name": "testaction",
+ "description": "test",
+ }
+ action = add_action(policy_id=policy_id, value=value)
+ return list(action.keys())[0]
+
+
+def create_subject_data(policy_id, category_id):
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ subject_data = add_subject_data(policy_id=policy_id, category_id=category_id, value=value).get('data')
+ assert subject_data
+ return list(subject_data.keys())[0]
+
+
+def create_object_data(policy_id, category_id):
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ object_data = add_object_data(policy_id=policy_id, category_id=category_id, value=value).get('data')
+ return list(object_data.keys())[0]
+
+
+def create_action_data(policy_id, category_id):
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ action_data = add_action_data(policy_id=policy_id, category_id=category_id, value=value).get('data')
+ return list(action_data.keys())[0]
+
diff --git a/python_moondb/tests/unit_python/helpers/model_helper.py b/python_moondb/tests/unit_python/helpers/model_helper.py
new file mode 100644
index 00000000..58946a99
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/model_helper.py
@@ -0,0 +1,50 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from helpers import mock_data
+
+
+def get_models(model_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.get_models(user_id=None, model_id=model_id)
+
+
+def add_model(model_id=None, value=None):
+ from python_moondb.core import ModelManager
+ if not value:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1")
+ name = "MLS" if model_id is None else "MLS " + model_id
+ value = {
+ "name": name,
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+ }
+ return ModelManager.add_model(user_id=None, model_id=model_id, value=value)
+
+
+def delete_models(uuid=None, name=None):
+ from python_moondb.core import ModelManager
+ if not uuid:
+ for model_id, model_value in get_models():
+ if name == model_value['name']:
+ uuid = model_id
+ break
+ ModelManager.delete_model(user_id=None, model_id=uuid)
+
+
+def delete_all_models():
+ from python_moondb.core import ModelManager
+ models_values = get_models()
+ print(models_values)
+ for model_id, model_value in models_values.items():
+ ModelManager.delete_model(user_id=None, model_id=model_id)
+
+
+def update_model(model_id=None, value=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.update_model(user_id=None, model_id=model_id, value=value)
diff --git a/python_moondb/tests/unit_python/helpers/pdp_helper.py b/python_moondb/tests/unit_python/helpers/pdp_helper.py
new file mode 100644
index 00000000..3d169b06
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/pdp_helper.py
@@ -0,0 +1,23 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def update_pdp(pdp_id, value):
+ from python_moondb.core import PDPManager
+ return PDPManager.update_pdp("", pdp_id, value)
+
+
+def delete_pdp(pdp_id):
+ from python_moondb.core import PDPManager
+ PDPManager.delete_pdp("", pdp_id)
+
+
+def add_pdp(pdp_id=None, value=None):
+ from python_moondb.core import PDPManager
+ return PDPManager.add_pdp("", pdp_id, value)
+
+
+def get_pdp(pdp_id=None):
+ from python_moondb.core import PDPManager
+ return PDPManager.get_pdp("", pdp_id)
diff --git a/python_moondb/tests/unit_python/helpers/policy_helper.py b/python_moondb/tests/unit_python/helpers/policy_helper.py
new file mode 100644
index 00000000..c932ee3a
--- /dev/null
+++ b/python_moondb/tests/unit_python/helpers/policy_helper.py
@@ -0,0 +1,61 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def get_policies():
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_policies("admin")
+
+
+def add_policies(policy_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ if not value:
+ value = {
+ "name": "test_policy",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ return PolicyManager.add_policy("admin", policy_id=policy_id, value=value)
+
+
+def delete_policies(uuid=None, name=None):
+ from python_moondb.core import PolicyManager
+ if not uuid:
+ for policy_id, policy_value in get_policies():
+ if name == policy_value['name']:
+ uuid = policy_id
+ break
+ PolicyManager.delete_policy("admin", uuid)
+
+
+def update_policy(policy_id, value):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.update_policy("admin", policy_id, value)
+
+
+def get_policy_from_meta_rules(meta_rule_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id)
+
+
+def get_rules(policy_id=None, meta_rule_id=None, rule_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id)
+
+
+def add_rule(policy_id=None, meta_rule_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ if not value:
+ value = {
+ "rule": ("high", "medium", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ return PolicyManager.add_rule("", policy_id, meta_rule_id, value)
+
+
+def delete_rule(policy_id=None, rule_id=None):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_rule("", policy_id, rule_id)
diff --git a/python_moondb/tests/unit_python/models/test_categories.py b/python_moondb/tests/unit_python/models/test_categories.py
new file mode 100644
index 00000000..f87d0e12
--- /dev/null
+++ b/python_moondb/tests/unit_python/models/test_categories.py
@@ -0,0 +1,79 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import pytest
+import logging
+from python_moonutilities.exceptions import *
+from helpers import category_helper
+
+logger = logging.getLogger("moon.db.tests.models.test_categories")
+
+def test_add_subject_category_twice():
+ category = category_helper.add_subject_category(value={"name": "category name", "description": "description 1"})
+ category_id = list(category.keys())[0]
+ assert category is not None
+ with pytest.raises(SubjectCategoryExisting):
+ category_helper.add_subject_category(category_id,
+ value={"name": "category name", "description": "description 2"})
+
+
+def test_get_subject_categories():
+ added_category = category_helper.add_subject_category(
+ value={"name": "category name", "description": "description 1"})
+ category_id = list(added_category.keys())[0]
+ subject_category = category_helper.get_subject_category(category_id)
+ assert subject_category == added_category
+
+
+def test_get_subject_categories_with_invalid_id():
+ category_id = "invalid_id"
+ subject_category = category_helper.get_subject_category(category_id)
+ assert len(subject_category) == 0
+
+
+def test_add_object_category_twice():
+ category = category_helper.add_object_category(value={"name": "category name", "description": "description 1"})
+ category_id = list(category.keys())[0]
+ assert category is not None
+ with pytest.raises(ObjectCategoryExisting):
+ category_helper.add_object_category(category_id,
+ value={"name": "category name", "description": "description 2"})
+
+
+def test_get_object_categories():
+ added_category = category_helper.add_object_category(
+ value={"name": "category name", "description": "description 1"})
+ category_id = list(added_category.keys())[0]
+ object_category = category_helper.get_object_category(category_id)
+ assert object_category == added_category
+
+
+def test_get_object_categories_with_invalid_id():
+ category_id = "invalid_id"
+ object_category = category_helper.get_object_category(category_id)
+ assert len(object_category) == 0
+
+
+def test_add_action_category_twice():
+ category = category_helper.add_action_category(value={"name": "category name", "description": "description 1"})
+ category_id = list(category.keys())[0]
+ assert category is not None
+ with pytest.raises(ActionCategoryExisting):
+ category_helper.add_action_category(category_id,
+ value={"name": "category name", "description": "description 2"})
+
+
+def test_get_action_categories():
+ added_category = category_helper.add_action_category(
+ value={"name": "category name", "description": "description 1"})
+ category_id = list(added_category.keys())[0]
+ action_category = category_helper.get_action_category(category_id)
+ assert action_category == added_category
+
+
+def test_get_action_categories_with_invalid_id():
+ category_id = "invalid_id"
+ action_category = category_helper.get_action_category(category_id)
+ assert len(action_category) == 0
diff --git a/python_moondb/tests/unit_python/models/test_meta_rules.py b/python_moondb/tests/unit_python/models/test_meta_rules.py
index d8b61365..102cd724 100644
--- a/python_moondb/tests/unit_python/models/test_meta_rules.py
+++ b/python_moondb/tests/unit_python/models/test_meta_rules.py
@@ -1,122 +1,115 @@
-import pytest
-
-
-def set_meta_rule(meta_rule_id, value=None):
- from python_moondb.core import ModelManager
- if not value:
- value = {
- "name": "MLS_meta_rule",
- "description": "test",
- "subject_categories": ["user_security_level_id_1"],
- "object_categories": ["vm_security_level_id_1"],
- "action_categories": ["action_type_id_1"]
- }
- return ModelManager.set_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value)
-
-
-def add_meta_rule(meta_rule_id=None, value=None):
- from python_moondb.core import ModelManager
- if not value:
- value = {
- "name": "MLS_meta_rule",
- "description": "test",
- "subject_categories": ["user_security_level_id_1"],
- "object_categories": ["vm_security_level_id_1"],
- "action_categories": ["action_type_id_1"]
- }
- return ModelManager.add_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value)
-
-
-def get_meta_rules(meta_rule_id=None):
- from python_moondb.core import ModelManager
- return ModelManager.get_meta_rules(user_id=None, meta_rule_id=meta_rule_id)
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+import pytest
+from helpers import meta_rule_helper
+import helpers.mock_data as mock_data
-def delete_meta_rules(meta_rule_id=None):
- from python_moondb.core import ModelManager
- ModelManager.delete_meta_rule(user_id=None, meta_rule_id=meta_rule_id)
def test_set_not_exist_meta_rule_error(db):
# set not existing meta rule and expect to raise and error
with pytest.raises(Exception) as exception_info:
- set_meta_rule(meta_rule_id=None)
- assert str(exception_info.value) == '400: Sub Meta Rule Unknown'
+ meta_rule_helper.set_meta_rule(meta_rule_id=None)
+ assert str(exception_info.value) == '400: Meta Rule Unknown'
def test_add_new_meta_rule_success(db):
+ action_category_id = mock_data.create_action_category("action_category_id1")
+ subject_category_id = mock_data.create_subject_category("subject_category_id1")
+ object_category_id = mock_data.create_object_category("object_category_id1")
value = {
"name": "MLS_meta_rule",
"description": "test",
- "subject_categories": ["user_security_level_id_1"],
- "object_categories": ["vm_security_level_id_1"],
- "action_categories": ["action_type_id_1"]
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
}
- metaRules = add_meta_rule();
- assert isinstance(metaRules, dict)
- assert metaRules
- assert len(metaRules) is 1
- meta_rule_id = list(metaRules.keys())[0]
+ meta_rules = meta_rule_helper.add_meta_rule(value=value)
+ assert isinstance(meta_rules, dict)
+ assert meta_rules
+ assert len(meta_rules) is 1
+ meta_rule_id = list(meta_rules.keys())[0]
for key in ("name", "description", "subject_categories", "object_categories", "action_categories"):
- assert key in metaRules[meta_rule_id]
- assert metaRules[meta_rule_id][key] == value[key]
+ assert key in meta_rules[meta_rule_id]
+ assert meta_rules[meta_rule_id][key] == value[key]
-def test_set_meta_rule_succes(db):
+def test_set_meta_rule_success(db):
# arrange
- meta_rules = add_meta_rule()
+ meta_rules = meta_rule_helper.add_meta_rule()
meta_rule_id = list(meta_rules.keys())[0]
+ action_category_id = mock_data.create_action_category("action_category_id2")
+ subject_category_id = mock_data.create_subject_category("subject_category_id2")
+ object_category_id = mock_data.create_object_category("object_category_id2")
updated_value = {
"name": "MLS_meta_rule",
"description": "test",
- "subject_categories": ["user_role_id_1"],
- "object_categories": ["vm_security_level_id_1"],
- "action_categories": ["action_type_id_1"]
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
}
# action
- updated_meta_rule = set_meta_rule(meta_rule_id, updated_value)
+ updated_meta_rule = meta_rule_helper.set_meta_rule(meta_rule_id, updated_value)
# assert
updated_meta_rule_id = list(updated_meta_rule.keys())[0]
assert updated_meta_rule_id == meta_rule_id
- assert updated_meta_rule[updated_meta_rule_id]["subject_categories"] == \
- updated_value["subject_categories"]
+ assert updated_meta_rule[updated_meta_rule_id]["subject_categories"] == updated_value["subject_categories"]
def test_add_existing_meta_rule_error(db):
- meta_rules = add_meta_rule()
+ action_category_id = mock_data.create_action_category("action_category_id3")
+ subject_category_id = mock_data.create_subject_category("subject_category_id3")
+ object_category_id = mock_data.create_object_category("object_category_id3")
+ value = {
+ "name": "MLS_meta_rule",
+ "description": "test",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ meta_rules = meta_rule_helper.add_meta_rule(value=value)
meta_rule_id = list(meta_rules.keys())[0]
with pytest.raises(Exception) as exception_info:
- add_meta_rule(meta_rule_id=meta_rule_id)
+ meta_rule_helper.add_meta_rule(meta_rule_id=meta_rule_id)
assert str(exception_info.value) == '400: Sub Meta Rule Existing'
def test_get_meta_rule_success(db):
# arrange
+ action_category_id = mock_data.create_action_category("action_type")
+ subject_category_id = mock_data.create_subject_category("user_security_level")
+ object_category_id = mock_data.create_object_category("vm_security_level")
values = {}
value1 = {
"name": "MLS_meta_rule",
"description": "test",
- "subject_categories": ["user_security_level_id_1"],
- "object_categories": ["vm_security_level_id_1"],
- "action_categories": ["action_type_id_1"]
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
}
- meta_rules1 = add_meta_rule(value=value1)
+ meta_rules1 = meta_rule_helper.add_meta_rule(value=value1)
meta_rule_id1 = list(meta_rules1.keys())[0]
values[meta_rule_id1] = value1
+ action_category_id = mock_data.create_action_category("action_type2")
+ subject_category_id = mock_data.create_subject_category("user_security_level2")
+ object_category_id = mock_data.create_object_category("vm_security_level2")
value2 = {
"name": "rbac_meta_rule",
"description": "test",
- "subject_categories": ["user_role_id_1"],
- "object_categories": ["vm_id_1"],
- "action_categories": ["action_type_id_1"]
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
}
- meta_rules2 = add_meta_rule(value=value2)
+ meta_rules2 = meta_rule_helper.add_meta_rule(value=value2)
meta_rule_id2 = list(meta_rules2.keys())[0]
values[meta_rule_id2] = value2
# action
- meta_rules = get_meta_rules()
+ meta_rules = meta_rule_helper.get_meta_rules()
# assert
- assert isinstance(meta_rules , dict)
+ assert isinstance(meta_rules, dict)
assert meta_rules
assert len(meta_rules) is 2
for meta_rule_id in meta_rules:
@@ -127,11 +120,10 @@ def test_get_meta_rule_success(db):
def test_get_specific_meta_rule_success(db):
# arrange
- add_meta_rule()
- added_meta_rules = add_meta_rule()
+ added_meta_rules = meta_rule_helper.add_meta_rule()
added_meta_rule_id = list(added_meta_rules.keys())[0]
# action
- meta_rules = get_meta_rules(meta_rule_id=added_meta_rule_id)
+ meta_rules = meta_rule_helper.get_meta_rules(meta_rule_id=added_meta_rule_id)
meta_rule_id = list(meta_rules.keys())[0]
# assert
assert meta_rule_id == added_meta_rule_id
@@ -141,35 +133,28 @@ def test_get_specific_meta_rule_success(db):
def test_delete_meta_rules_success(db):
+ action_category_id = mock_data.create_action_category("action_type")
+ subject_category_id = mock_data.create_subject_category("user_security_level")
+ object_category_id = mock_data.create_object_category("vm_security_level")
# arrange
value1 = {
"name": "MLS_meta_rule",
"description": "test",
- "subject_categories": ["user_security_level_id_1"],
- "object_categories": ["vm_security_level_id_1"],
- "action_categories": ["action_type_id_1"]
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
}
- meta_rules1 = add_meta_rule(value=value1)
+ meta_rules1 = meta_rule_helper.add_meta_rule(value=value1)
meta_rule_id1 = list(meta_rules1.keys())[0]
- value2 = {
- "name": "rbac_meta_rule",
- "description": "test",
- "subject_categories": ["user_role_id_1"],
- "object_categories": ["vm_id_1"],
- "action_categories": ["action_type_id_1"]
- }
- meta_rules2 = add_meta_rule(value=value2)
- meta_rule_id2 = list(meta_rules2.keys())[0]
-
# action
- delete_meta_rules(meta_rule_id1)
+ meta_rule_helper.delete_meta_rules(meta_rule_id1)
# assert
- meta_rules = get_meta_rules()
+ meta_rules = meta_rule_helper.get_meta_rules()
assert meta_rule_id1 not in meta_rules
def test_delete_invalid_meta_rules_error(db):
with pytest.raises(Exception) as exception_info:
- delete_meta_rules("INVALID_META_RULE_ID")
- assert str(exception_info.value) == '400: Sub Meta Rule Unknown'
+ meta_rule_helper.delete_meta_rules("INVALID_META_RULE_ID")
+ assert str(exception_info.value) == '400: Meta Rule Unknown'
diff --git a/python_moondb/tests/unit_python/models/test_models.py b/python_moondb/tests/unit_python/models/test_models.py
index e56fea6b..0026345c 100644
--- a/python_moondb/tests/unit_python/models/test_models.py
+++ b/python_moondb/tests/unit_python/models/test_models.py
@@ -1,40 +1,22 @@
-import pytest
-
-
-def get_models(model_id=None):
- from python_moondb.core import ModelManager
- return ModelManager.get_models(user_id= None , model_id= model_id)
-
-
-def add_model(model_id=None, value=None):
- from python_moondb.core import ModelManager
- if not value:
- value = {
- "name": "MLS",
- "description": "test",
- "meta_rules": "meta_rule_mls_1"
- }
- return ModelManager.add_model(user_id=None, model_id=model_id, value=value)
-
-
-def delete_models(uuid=None, name=None):
- from python_moondb.core import ModelManager
- if not uuid:
- for model_id, model_value in get_models():
- if name == model_value['name']:
- uuid = model_id
- break
- ModelManager.delete_model(user_id=None, model_id=uuid)
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+import pytest
+from python_moonutilities.exceptions import *
+import logging
+import helpers.mock_data as mock_data
+import helpers.model_helper as model_helper
+import helpers.category_helper as category_helper
+import helpers.policy_helper as policy_helper
-def update_model(model_id=None, value=None):
- from python_moondb.core import ModelManager
- return ModelManager.update_model(user_id=None, model_id=model_id, value=value)
+logger = logging.getLogger("moon.db.tests.test_model")
def test_get_models_empty(db):
# act
- models = get_models()
+ models = model_helper.get_models()
# assert
assert isinstance(models, dict)
assert not models
@@ -42,70 +24,107 @@ def test_get_models_empty(db):
def test_get_model(db):
# prepare
- add_model(model_id="mls_model_id")
+ model_helper.add_model(model_id="mls_model_id")
# act
- models = get_models()
+ models = model_helper.get_models()
# assert
assert isinstance(models, dict)
assert models # assert model is not empty
assert len(models) is 1
+ model_helper.delete_all_models()
def test_get_specific_model(db):
# prepare
- add_model(model_id="mls_model_id")
- add_model(model_id="rbac_model_id")
+ model_helper.add_model(model_id="mls_model_id")
# act
- models = get_models(model_id="mls_model_id")
+ models = model_helper.get_models(model_id="mls_model_id")
# assert
assert isinstance(models, dict)
assert models # assert model is not empty
assert len(models) is 1
+ model_helper.delete_all_models()
def test_add_model(db):
# act
- model = add_model()
+ model = model_helper.add_model()
# assert
assert isinstance(model, dict)
assert model # assert model is not empty
assert len(model) is 1
+ model_helper.delete_all_models()
def test_add_same_model_twice(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ value = {
+ "name": "model1",
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+ }
# prepare
- add_model(model_id="model_1") # add model twice
+ model_helper.add_model(model_id="model_1", value=value) # add model twice
# act
- with pytest.raises(Exception) as exception_info:
- add_model(model_id="model_1")
- assert str(exception_info.value) == '409: Model Error'
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category2",
+ object_category_name="object_category2",
+ action_category_name="action_category2",
+ meta_rule_name="meta_rule_2")
+ value = {
+ "name": "model2",
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+ }
+ with pytest.raises(ModelExisting) as exception_info:
+ model_helper.add_model(model_id="model_1", value=value)
+ model_helper.delete_all_models()
+ # assert str(exception_info.value) == '409: Model Error'
def test_add_model_generate_new_uuid(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id1 = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category3",
+ object_category_name="object_category3",
+ action_category_name="action_category3",
+ meta_rule_name="meta_rule_3")
model_value1 = {
"name": "MLS",
"description": "test",
- "meta_rules": "meta_rule_mls_1"
+ "meta_rules": [meta_rule_id1]
}
- model1 = add_model(value=model_value1)
-
+ model1 = model_helper.add_model(value=model_value1)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id2 = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category4",
+ object_category_name="object_category4",
+ action_category_name="action_category4",
+ meta_rule_name="meta_rule_4")
model_value2 = {
"name": "rbac",
"description": "test",
- "meta_rules": "meta_rule_mls_2"
+ "meta_rules": [meta_rule_id2]
}
- model2 = add_model(value=model_value2)
+ model2 = model_helper.add_model(value=model_value2)
assert list(model1)[0] != list(model2)[0]
+ model_helper.delete_all_models()
def test_add_models(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category5",
+ object_category_name="object_category5",
+ action_category_name="action_category5")
model_value1 = {
"name": "MLS",
"description": "test",
- "meta_rules": "meta_rule_mls_1"
+ "meta_rules": [meta_rule_id]
}
- models = add_model(value=model_value1)
+ models = model_helper.add_model(value=model_value1)
assert isinstance(models, dict)
assert models
assert len(models.keys()) == 1
@@ -113,49 +132,306 @@ def test_add_models(db):
for key in ("name", "meta_rules", "description"):
assert key in models[model_id]
assert models[model_id][key] == model_value1[key]
+ model_helper.delete_all_models()
-def test_delete_models(db):
+def test_add_models_with_same_name_twice(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category5",
+ object_category_name="object_category5",
+ action_category_name="action_category5")
model_value1 = {
"name": "MLS",
"description": "test",
- "meta_rules": "meta_rule_mls_1"
+ "meta_rules": [meta_rule_id]
}
- model1 = add_model(value=model_value1)
+ models = model_helper.add_model(value=model_value1)
+ assert isinstance(models, dict)
+ assert models
+ with pytest.raises(Exception) as exc_info:
+ model_helper.add_model(value=model_value1)
+ model_helper.delete_all_models()
+
+def test_delete_models(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id1 = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category6",
+ object_category_name="object_category6",
+ action_category_name="action_category6",
+ meta_rule_name="meta_rule_6")
+ model_value1 = {
+ "name": "MLS",
+ "description": "test",
+ "meta_rules": [meta_rule_id1]
+ }
+ model1 = model_helper.add_model(value=model_value1)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id2 = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category7",
+ object_category_name="object_category7",
+ action_category_name="action_category7",
+ meta_rule_name="meta_rule_7")
model_value2 = {
"name": "rbac",
"description": "test",
- "meta_rules": "meta_rule_mls_2"
+ "meta_rules": [meta_rule_id2]
}
- model2 = add_model(value=model_value2)
+ model_helper.add_model(value=model_value2)
id = list(model1)[0]
- delete_models(id)
+ model_helper.delete_models(id)
# assert
- models = get_models()
+ models = model_helper.get_models()
assert id not in models
+ model_helper.delete_all_models()
def test_update_model(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id1 = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category8",
+ object_category_name="object_category8",
+ action_category_name="action_category8",
+ meta_rule_name="meta_rule_8")
# prepare
model_value = {
"name": "MLS",
"description": "test",
- "meta_rules": "meta_rule_mls_1"
+ "meta_rules": [meta_rule_id1]
}
- model = add_model(value=model_value)
+ model = model_helper.add_model(value=model_value)
model_id = list(model)[0]
+ subject_category_id, object_category_id, action_category_id, meta_rule_id2 = mock_data.create_new_meta_rule(
+ subject_category_name="subject_category9",
+ object_category_name="object_category9",
+ action_category_name="action_category9",
+ meta_rule_name="meta_rule_9")
new_model_value = {
- "name": "MLS",
+ "name": "MLS2",
"description": "test",
- "meta_rules": "meta_rule_mls_2"
+ "meta_rules": [meta_rule_id2]
}
# act
- update_model(model_id=model_id, value=new_model_value)
+ model_helper.update_model(model_id=model_id, value=new_model_value)
# assert
- model = get_models(model_id)
+ model = model_helper.get_models(model_id)
for key in ("name", "meta_rules", "description"):
assert key in model[model_id]
- assert model[model_id][key] == new_model_value[key] \ No newline at end of file
+ assert model[model_id][key] == new_model_value[key]
+ model_helper.delete_all_models()
+
+
+def test_delete_model_assigned_to_policy(db):
+ model_value1 = {
+ "name": "MLS",
+ "description": "test",
+ "meta_rules": []
+ }
+ models = model_helper.add_model(value=model_value1)
+ assert isinstance(models, dict)
+ assert models
+ assert len(models.keys()) == 1
+ model_id = list(models.keys())[0]
+ value = {
+ "name": "test_policy",
+ "model_id": model_id,
+ "genre": "authz",
+ "description": "test",
+ }
+ policy_helper.add_policies(value=value)
+ with pytest.raises(DeleteModelWithPolicy) as exception_info:
+ model_helper.delete_models(uuid=model_id)
+
+
+def test_add_subject_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "subject_category",
+ "description": "description subject_category"
+ }
+ subject_category = category_helper.add_subject_category(category_id, value)
+ assert subject_category
+ assert len(subject_category) == 1
+
+
+def test_add_subject_category_with_empty_name(db):
+ category_id = "category_id1"
+ value = {
+ "name": "",
+ "description": "description subject_category"
+ }
+ with pytest.raises(Exception) as exception_info:
+ category_helper.add_subject_category(category_id, value)
+ assert str(exception_info.value) == '400: Category Name Invalid'
+
+
+def test_add_subject_category_with_same_category_id(db):
+ category_id = "category_id1"
+ value = {
+ "name": "subject_category",
+ "description": "description subject_category"
+ }
+ category_helper.add_subject_category(category_id, value)
+ with pytest.raises(Exception) as exception_info:
+ category_helper.add_subject_category(category_id, value)
+ assert str(exception_info.value) == '409: Subject Category Existing'
+
+
+def test_get_subject_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "subject_category",
+ "description": "description subject_category"
+ }
+ category_helper.add_subject_category(category_id, value)
+ subject_category = category_helper.get_subject_category(category_id)
+ assert subject_category
+ assert len(subject_category) == 1
+
+
+def test_delete_subject_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "subject_category",
+ "description": "description subject_category"
+ }
+ category_helper.add_subject_category(category_id, value)
+ subject_category = category_helper.delete_subject_category(category_id)
+ assert not subject_category
+
+
+def test_delete_subject_category_with_unkown_category_id(db):
+ category_id = "invalid_category_id"
+
+ with pytest.raises(Exception) as exception_info:
+ category_helper.delete_subject_category(category_id)
+ assert str(exception_info.value) == '400: Subject Category Unknown'
+
+
+def test_add_object_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "object_category",
+ "description": "description object_category"
+ }
+ object_category = category_helper.add_object_category(category_id, value)
+ assert object_category
+ assert len(object_category) == 1
+
+
+def test_add_object_category_with_same_category_id(db):
+ category_id = "category_id1"
+ value = {
+ "name": "object_category",
+ "description": "description object_category"
+ }
+ category_helper.add_object_category(category_id, value)
+ with pytest.raises(Exception) as exception_info:
+ category_helper.add_object_category(category_id, value)
+ assert str(exception_info.value) == '409: Object Category Existing'
+
+
+def test_add_object_category_with_empty_name(db):
+ category_id = "category_id1"
+ value = {
+ "name": "",
+ "description": "description object_category"
+ }
+ with pytest.raises(Exception) as exception_info:
+ category_helper.add_object_category(category_id, value)
+ assert str(exception_info.value) == '400: Category Name Invalid'
+
+
+def test_get_object_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "object_category",
+ "description": "description object_category"
+ }
+ category_helper.add_object_category(category_id, value)
+ object_category = category_helper.get_object_category(category_id)
+ assert object_category
+ assert len(object_category) == 1
+
+
+def test_delete_object_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "object_category",
+ "description": "description object_category"
+ }
+ category_helper.add_object_category(category_id, value)
+ object_category = category_helper.delete_object_category(category_id)
+ assert not object_category
+
+
+def test_delete_object_category_with_unkown_category_id(db):
+ category_id = "invalid_category_id"
+
+ with pytest.raises(Exception) as exception_info:
+ category_helper.delete_object_category(category_id)
+ assert str(exception_info.value) == '400: Object Category Unknown'
+
+
+def test_add_action_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "action_category",
+ "description": "description action_category"
+ }
+ action_category = category_helper.add_action_category(category_id, value)
+ assert action_category
+ assert len(action_category) == 1
+
+
+def test_add_action_category_with_same_category_id(db):
+ category_id = "category_id1"
+ value = {
+ "name": "action_category",
+ "description": "description action_category"
+ }
+ category_helper.add_action_category(category_id, value)
+ with pytest.raises(Exception) as exception_info:
+ category_helper.add_action_category(category_id, value)
+ assert str(exception_info.value) == '409: Action Category Existing'
+
+
+def test_add_action_category_with_empty_name(db):
+ category_id = "category_id1"
+ value = {
+ "name": "",
+ "description": "description action_category"
+ }
+ with pytest.raises(Exception) as exception_info:
+ category_helper.add_action_category(category_id, value)
+ assert str(exception_info.value) == '400: Category Name Invalid'
+
+
+def test_get_action_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "action_category",
+ "description": "description action_category"
+ }
+ category_helper.add_action_category(category_id, value)
+ action_category = category_helper.get_action_category(category_id)
+ assert action_category
+ assert len(action_category) == 1
+
+
+def test_delete_action_category(db):
+ category_id = "category_id1"
+ value = {
+ "name": "action_category",
+ "description": "description action_category"
+ }
+ category_helper.add_action_category(category_id, value)
+ action_category = category_helper.delete_action_category(category_id)
+ assert not action_category
+
+
+def test_delete_action_category_with_unkown_category_id(db):
+ category_id = "invalid_category_id"
+
+ with pytest.raises(Exception) as exception_info:
+ category_helper.delete_action_category(category_id)
+ assert str(exception_info.value) == '400: Action Category Unknown'
diff --git a/python_moondb/tests/unit_python/policies/mock_data.py b/python_moondb/tests/unit_python/policies/mock_data.py
index 23eeef64..47fc9f9e 100644
--- a/python_moondb/tests/unit_python/policies/mock_data.py
+++ b/python_moondb/tests/unit_python/policies/mock_data.py
@@ -1,18 +1,24 @@
-def create_meta_rule():
+import helpers.model_helper as model_helper
+import helpers.meta_rule_helper as meta_rule_helper
+import helpers.policy_helper as policy_helper
+import helpers.category_helper as category_helper
+
+
+def create_meta_rule(meta_rule_name="meta_rule1", category_prefix=""):
meta_rule_value = {
- "name": "meta_rule1",
+ "name": meta_rule_name,
"algorithm": "name of the meta rule algorithm",
- "subject_categories": ["subject_category_id1",
- "subject_category_id2"],
- "object_categories": ["object_category_id1"],
- "action_categories": ["action_category_id1"]
+ "subject_categories": [category_prefix + "subject_category_id1",
+ category_prefix + "subject_category_id2"],
+ "object_categories": [category_prefix + "object_category_id1"],
+ "action_categories": [category_prefix + "action_category_id1"]
}
return meta_rule_value
-def create_model(meta_rule_id):
+def create_model(meta_rule_id, model_name="test_model"):
value = {
- "name": "test_model",
+ "name": model_name,
"description": "test",
"meta_rules": [meta_rule_id]
@@ -20,9 +26,9 @@ def create_model(meta_rule_id):
return value
-def create_policy(model_id):
+def create_policy(model_id, policy_name="policy_1"):
value = {
- "name": "policy_1",
+ "name": policy_name,
"model_id": model_id,
"genre": "authz",
"description": "test",
@@ -40,16 +46,29 @@ def create_pdp(pdp_ids):
return value
-def get_policy_id():
- import policies.test_policies as test_policies
- import models.test_models as test_models
- import models.test_meta_rules as test_meta_rules
- meta_rule = test_meta_rules.add_meta_rule(value=create_meta_rule())
+def get_policy_id(model_name="test_model", policy_name="policy_1", meta_rule_name="meta_rule1", category_prefix=""):
+ category_helper.add_subject_category(
+ category_prefix + "subject_category_id1",
+ value={"name": category_prefix + "subject_category_id1",
+ "description": "description 1"})
+ category_helper.add_subject_category(
+ category_prefix + "subject_category_id2",
+ value={"name": category_prefix + "subject_category_id2",
+ "description": "description 1"})
+ category_helper.add_object_category(
+ category_prefix + "object_category_id1",
+ value={"name": category_prefix + "object_category_id1",
+ "description": "description 1"})
+ category_helper.add_action_category(
+ category_prefix + "action_category_id1",
+ value={"name": category_prefix + "action_category_id1",
+ "description": "description 1"})
+ meta_rule = meta_rule_helper.add_meta_rule(value=create_meta_rule(meta_rule_name, category_prefix))
meta_rule_id = list(meta_rule.keys())[0]
- model = test_models.add_model(value=create_model(meta_rule_id))
+ model = model_helper.add_model(value=create_model(meta_rule_id, model_name))
model_id = list(model.keys())[0]
- value = create_policy(model_id)
- policy = test_policies.add_policies(value=value)
+ value = create_policy(model_id, policy_name)
+ policy = policy_helper.add_policies(value=value)
assert policy
policy_id = list(policy.keys())[0]
return policy_id
diff --git a/python_moondb/tests/unit_python/policies/test_assignments.py b/python_moondb/tests/unit_python/policies/test_assignments.py
index 707632b0..675c2ff9 100755
--- a/python_moondb/tests/unit_python/policies/test_assignments.py
+++ b/python_moondb/tests/unit_python/policies/test_assignments.py
@@ -1,248 +1,220 @@
-import policies.mock_data as mock_data
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-
-def get_action_assignments(policy_id, action_id=None, category_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_action_assignments("", policy_id, action_id, category_id)
-
-
-def add_action_assignment(policy_id, action_id, category_id, data_id):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id)
-
-
-def delete_action_assignment(policy_id, action_id, category_id, data_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id)
-
-
-def get_object_assignments(policy_id, object_id=None, category_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_object_assignments("", policy_id, object_id, category_id)
-
-
-def add_object_assignment(policy_id, object_id, category_id, data_id):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id)
-
-
-def delete_object_assignment(policy_id, object_id, category_id, data_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id)
-
-
-def get_subject_assignments(policy_id, subject_id=None, category_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id)
-
-
-def add_subject_assignment(policy_id, subject_id, category_id, data_id):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id)
-
-
-def delete_subject_assignment(policy_id, subject_id, category_id, data_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id)
+import helpers.mock_data as mock_data
+import helpers.assignment_helper as assignment_helper
+from python_moonutilities.exceptions import *
+import pytest
def test_get_action_assignments(db):
- policy_id = mock_data.get_policy_id()
- action_id = "action_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- add_action_assignment(policy_id, action_id, category_id, data_id)
- act_assignments = get_action_assignments(policy_id, action_id, category_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ action_id = mock_data.create_action(policy_id)
+ data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id)
+
+ assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id)
+ act_assignments = assignment_helper.get_action_assignments(policy_id, action_id, action_category_id)
action_id_1 = list(act_assignments.keys())[0]
assert act_assignments[action_id_1]["policy_id"] == policy_id
assert act_assignments[action_id_1]["action_id"] == action_id
- assert act_assignments[action_id_1]["category_id"] == category_id
+ assert act_assignments[action_id_1]["category_id"] == action_category_id
assert len(act_assignments[action_id_1].get("assignments")) == 1
assert data_id in act_assignments[action_id_1].get("assignments")
-def test_get_action_assignments_by_policy_id(db):
- policy_id = mock_data.get_policy_id()
- action_id = "action_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- add_action_assignment(policy_id, action_id, category_id, data_id)
- data_id = "data_id_2"
- add_action_assignment(policy_id, action_id, category_id, data_id)
- data_id = "data_id_3"
- add_action_assignment(policy_id, action_id, category_id, data_id)
- act_assignments = get_action_assignments(policy_id)
- action_id_1 = list(act_assignments.keys())[0]
- assert act_assignments[action_id_1]["policy_id"] == policy_id
- assert act_assignments[action_id_1]["action_id"] == action_id
- assert act_assignments[action_id_1]["category_id"] == category_id
- assert len(act_assignments[action_id_1].get("assignments")) == 3
-
-
def test_add_action_assignments(db):
- policy_id = mock_data.get_policy_id()
- action_id = "action_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- action_assignments = add_action_assignment(policy_id, action_id, category_id, data_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ action_id = mock_data.create_action(policy_id)
+ data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id)
+ action_assignments = assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id)
assert action_assignments
action_id_1 = list(action_assignments.keys())[0]
assert action_assignments[action_id_1]["policy_id"] == policy_id
assert action_assignments[action_id_1]["action_id"] == action_id
- assert action_assignments[action_id_1]["category_id"] == category_id
+ assert action_assignments[action_id_1]["category_id"] == action_category_id
assert len(action_assignments[action_id_1].get("assignments")) == 1
assert data_id in action_assignments[action_id_1].get("assignments")
+ with pytest.raises(ActionAssignmentExisting) as exception_info:
+ assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id)
+
def test_delete_action_assignment(db):
- policy_id = mock_data.get_policy_id()
- add_action_assignment(policy_id, "", "", "")
- policy_id = mock_data.get_policy_id()
- action_id = "action_id_2"
- category_id = "category_id_2"
- data_id = "data_id_2"
- add_action_assignment(policy_id, action_id, category_id, data_id)
- delete_action_assignment(policy_id, "", "", "")
- assignments = get_action_assignments(policy_id, )
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ action_id = mock_data.create_action(policy_id)
+ data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id)
+ assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id)
+ assignment_helper.delete_action_assignment(policy_id, "", "", "")
+ assignments = assignment_helper.get_action_assignments(policy_id, )
assert len(assignments) == 1
def test_delete_action_assignment_with_invalid_policy_id(db):
policy_id = "invalid_id"
- delete_action_assignment(policy_id, "", "", "")
- assignments = get_action_assignments(policy_id, )
+ assignment_helper.delete_action_assignment(policy_id, "", "", "")
+ assignments = assignment_helper.get_action_assignments(policy_id, )
assert len(assignments) == 0
def test_get_object_assignments(db):
- policy_id = mock_data.get_policy_id()
- object_id = "object_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- add_object_assignment(policy_id, object_id, category_id, data_id)
- obj_assignments = get_object_assignments(policy_id, object_id, category_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ object_id = mock_data.create_object(policy_id)
+ data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id)
+ assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id)
+ obj_assignments = assignment_helper.get_object_assignments(policy_id, object_id, object_category_id)
object_id_1 = list(obj_assignments.keys())[0]
assert obj_assignments[object_id_1]["policy_id"] == policy_id
assert obj_assignments[object_id_1]["object_id"] == object_id
- assert obj_assignments[object_id_1]["category_id"] == category_id
+ assert obj_assignments[object_id_1]["category_id"] == object_category_id
assert len(obj_assignments[object_id_1].get("assignments")) == 1
assert data_id in obj_assignments[object_id_1].get("assignments")
def test_get_object_assignments_by_policy_id(db):
- policy_id = mock_data.get_policy_id()
- object_id_1 = "object_id_1"
- category_id_1 = "category_id_1"
- data_id = "data_id_1"
- add_action_assignment(policy_id, object_id_1, category_id_1, data_id)
- object_id_2 = "object_id_2"
- category_id_2 = "category_id_2"
- data_id = "data_id_2"
- add_action_assignment(policy_id, object_id_2, category_id_2, data_id)
- object_id_3 = "object_id_3"
- category_id_3 = "category_id_3"
- data_id = "data_id_3"
- add_action_assignment(policy_id, object_id_3, category_id_3, data_id)
- act_assignments = get_action_assignments(policy_id)
- assert len(act_assignments) == 3
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ object_id = mock_data.create_object(policy_id)
+ data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id)
+ assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id)
+ obj_assignments = assignment_helper.get_object_assignments(policy_id)
+ assert len(obj_assignments) == 1
def test_add_object_assignments(db):
- policy_id = mock_data.get_policy_id()
- object_id = "object_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- object_assignments = add_object_assignment(policy_id, object_id, category_id, data_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ object_id = mock_data.create_object(policy_id)
+ data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id)
+ object_assignments = assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id)
assert object_assignments
object_id_1 = list(object_assignments.keys())[0]
assert object_assignments[object_id_1]["policy_id"] == policy_id
assert object_assignments[object_id_1]["object_id"] == object_id
- assert object_assignments[object_id_1]["category_id"] == category_id
+ assert object_assignments[object_id_1]["category_id"] == object_category_id
assert len(object_assignments[object_id_1].get("assignments")) == 1
assert data_id in object_assignments[object_id_1].get("assignments")
+ with pytest.raises(ObjectAssignmentExisting):
+ assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id)
+
def test_delete_object_assignment(db):
- policy_id = mock_data.get_policy_id()
- add_object_assignment(policy_id, "", "", "")
- object_id = "action_id_2"
- category_id = "category_id_2"
- data_id = "data_id_2"
- add_object_assignment(policy_id, object_id, category_id, data_id)
- delete_object_assignment(policy_id, "", "", "")
- assignments = get_object_assignments(policy_id, )
- assert len(assignments) == 1
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ object_id = mock_data.create_object(policy_id)
+ data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id)
+ assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id)
+
+ assignment_helper.delete_object_assignment(policy_id, object_id, object_category_id, data_id=data_id)
+ assignments = assignment_helper.get_object_assignments(policy_id)
+ assert len(assignments) == 0
def test_delete_object_assignment_with_invalid_policy_id(db):
policy_id = "invalid_id"
- delete_object_assignment(policy_id, "", "", "")
- assignments = get_object_assignments(policy_id, )
+ assignment_helper.delete_object_assignment(policy_id, "", "", "")
+ assignments = assignment_helper.get_object_assignments(policy_id, )
assert len(assignments) == 0
def test_get_subject_assignments(db):
- policy_id = mock_data.get_policy_id()
- subject_id = "object_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- add_subject_assignment(policy_id, subject_id, category_id, data_id)
- subj_assignments = get_subject_assignments(policy_id, subject_id, category_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ subject_id = mock_data.create_subject(policy_id)
+ data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id)
+ subj_assignments = assignment_helper.get_subject_assignments(policy_id, subject_id, subject_category_id)
subject_id_1 = list(subj_assignments.keys())[0]
assert subj_assignments[subject_id_1]["policy_id"] == policy_id
assert subj_assignments[subject_id_1]["subject_id"] == subject_id
- assert subj_assignments[subject_id_1]["category_id"] == category_id
+ assert subj_assignments[subject_id_1]["category_id"] == subject_category_id
assert len(subj_assignments[subject_id_1].get("assignments")) == 1
assert data_id in subj_assignments[subject_id_1].get("assignments")
def test_get_subject_assignments_by_policy_id(db):
- policy_id = mock_data.get_policy_id()
- subject_id_1 = "subject_id_1"
- category_id_1 = "category_id_1"
- data_id = "data_id_1"
- add_subject_assignment(policy_id, subject_id_1, category_id_1, data_id)
- subject_id_2 = "subject_id_2"
- category_id_2 = "category_id_2"
- data_id = "data_id_2"
- add_subject_assignment(policy_id, subject_id_2, category_id_2, data_id)
- subject_id_3 = "subject_id_3"
- category_id_3 = "category_id_3"
- data_id = "data_id_3"
- add_subject_assignment(policy_id, subject_id_3, category_id_3, data_id)
- subj_assignments = get_subject_assignments(policy_id)
- assert len(subj_assignments) == 3
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ subject_id = mock_data.create_subject(policy_id)
+ data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id)
+ subj_assignments = assignment_helper.get_subject_assignments(policy_id)
+ assert len(subj_assignments) == 1
def test_add_subject_assignments(db):
- policy_id = mock_data.get_policy_id()
- subject_id = "subject_id_1"
- category_id = "category_id_1"
- data_id = "data_id_1"
- subject_assignments = add_subject_assignment(policy_id, subject_id, category_id, data_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ subject_id = mock_data.create_subject(policy_id)
+ data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ subject_assignments = assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id)
assert subject_assignments
subject_id_1 = list(subject_assignments.keys())[0]
assert subject_assignments[subject_id_1]["policy_id"] == policy_id
assert subject_assignments[subject_id_1]["subject_id"] == subject_id
- assert subject_assignments[subject_id_1]["category_id"] == category_id
+ assert subject_assignments[subject_id_1]["category_id"] == subject_category_id
assert len(subject_assignments[subject_id_1].get("assignments")) == 1
assert data_id in subject_assignments[subject_id_1].get("assignments")
+ with pytest.raises(SubjectAssignmentExisting):
+ assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id)
+
def test_delete_subject_assignment(db):
- policy_id = mock_data.get_policy_id()
- add_subject_assignment(policy_id, "", "", "")
- subject_id = "subject_id_2"
- category_id = "category_id_2"
- data_id = "data_id_2"
- add_subject_assignment(policy_id, subject_id, category_id, data_id)
- delete_subject_assignment(policy_id, "", "", "")
- assignments = get_subject_assignments(policy_id, )
- assert len(assignments) == 1
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ subject_id = mock_data.create_subject(policy_id)
+ data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+ assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id)
+ assignment_helper.delete_subject_assignment(policy_id, subject_id, subject_category_id, data_id)
+ assignments = assignment_helper.get_subject_assignments(policy_id)
+ assert len(assignments) == 0
def test_delete_subject_assignment_with_invalid_policy_id(db):
policy_id = "invalid_id"
- delete_subject_assignment(policy_id, "", "", "")
- assignments = get_subject_assignments(policy_id, )
+ assignment_helper.delete_subject_assignment(policy_id, "", "", "")
+ assignments = assignment_helper.get_subject_assignments(policy_id, )
assert len(assignments) == 0
diff --git a/python_moondb/tests/unit_python/policies/test_data.py b/python_moondb/tests/unit_python/policies/test_data.py
index 67fa44fb..fa3f8c06 100755
--- a/python_moondb/tests/unit_python/policies/test_data.py
+++ b/python_moondb/tests/unit_python/policies/test_data.py
@@ -1,328 +1,269 @@
-import policies.mock_data as mock_data
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import helpers.mock_data as mock_data
+import policies.mock_data
+import helpers.data_helper as data_helper
import pytest
+import logging
+from python_moonutilities.exceptions import *
-
-def get_action_data(policy_id, data_id=None, category_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_action_data("", policy_id, data_id, category_id)
-
-
-def add_action_data(policy_id, data_id=None, category_id=None, value=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_action_data("", policy_id, data_id, category_id, value)
-
-
-def delete_action_data(policy_id, data_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_action_data("", policy_id, data_id)
-
-
-def get_object_data(policy_id, data_id=None, category_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_object_data("", policy_id, data_id, category_id)
-
-
-def add_object_data(policy_id, data_id=None, category_id=None, value=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_object_data("", policy_id, data_id, category_id, value)
-
-
-def delete_object_data(policy_id, data_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_object_data("", policy_id, data_id)
-
-
-def get_subject_data(policy_id, data_id=None, category_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_subject_data("", policy_id, data_id, category_id)
-
-
-def add_subject_data(policy_id, data_id=None, category_id=None, value=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value)
-
-
-def delete_subject_data(policy_id, data_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_subject_data("", policy_id, data_id)
-
-
-def get_actions(policy_id, perimeter_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_actions("", policy_id, perimeter_id)
-
-
-def add_action(policy_id, perimeter_id=None, value=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_action("", policy_id, perimeter_id, value)
-
-
-def delete_action(policy_id, perimeter_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_action("", policy_id, perimeter_id)
-
-
-def get_objects(policy_id, perimeter_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_objects("", policy_id, perimeter_id)
-
-
-def add_object(policy_id, perimeter_id=None, value=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_object("", policy_id, perimeter_id, value)
-
-
-def delete_object(policy_id, perimeter_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_object("", policy_id, perimeter_id)
-
-
-def get_subjects(policy_id, perimeter_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_subjects("", policy_id, perimeter_id)
-
-
-def add_subject(policy_id, perimeter_id=None, value=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.add_subject("", policy_id, perimeter_id, value)
-
-
-def delete_subject(policy_id, perimeter_id):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_subject("", policy_id, perimeter_id)
-
-
-def get_available_metadata(policy_id):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_available_metadata("", policy_id)
+logger = logging.getLogger("python_moondb.tests.api.test_data")
def test_get_action_data(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
-
- policy_id = policy_id
- data_id = "data_id_1"
- category_id = "action_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "action-type",
"description": {"vm-action": "", "storage-action": "", },
}
- add_action_data(policy_id, data_id, category_id, value)
- action_data = get_action_data(policy_id, data_id, category_id)
- assert action_data
- assert len(action_data[0]['data']) == 1
+ action_data = data_helper.add_action_data(policy_id=policy_id, category_id=action_category_id, value=value)
+ data_id = list(action_data["data"])[0]
+ found_action_data = data_helper.get_action_data(policy_id=policy_id, data_id=data_id,
+ category_id=action_category_id)
+ assert found_action_data
+ assert len(found_action_data[0]["data"]) == 1
def test_get_action_data_with_invalid_category_id(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "action_category_id1"
- value = {
- "name": "action-type",
- "description": {"vm-action": "", "storage-action": "", },
- }
- add_action_data(policy_id, data_id, category_id, value)
- action_data = get_action_data(policy_id)
- assert action_data
- assert len(action_data[0]['data']) == 1
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ action_data = data_helper.get_action_data(policy_id=policy_id, category_id="invalid")
+ assert len(action_data) == 0
def test_add_action_data(db):
- policy_id = mock_data.get_policy_id()
- data_id = "data_id_1"
- category_id = "category_id_1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "action-type",
"description": {"vm-action": "", "storage-action": "", },
}
- action_data = add_action_data(policy_id, data_id, category_id, value).get('data')
+ action_data = data_helper.add_action_data(policy_id=policy_id, category_id=action_category_id, value=value)
assert action_data
- action_data_id = list(action_data.keys())[0]
- assert action_data[action_data_id].get('policy_id') == policy_id
+ assert len(action_data['data']) == 1
def test_add_action_data_with_invalid_category_id(db):
- policy_id = mock_data.get_policy_id()
- data_id = "data_id_1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "action-type",
"description": {"vm-action": "", "storage-action": "", },
}
with pytest.raises(Exception) as exception_info:
- add_action_data(policy_id=policy_id, data_id=data_id, value=value).get('data')
+ data_helper.add_action_data(policy_id=policy_id, value=value).get('data')
assert str(exception_info.value) == 'Invalid category id'
def test_delete_action_data(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "category_id_1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ data_helper.get_available_metadata(policy_id)
value = {
"name": "action-type",
"description": {"vm-action": "", "storage-action": "", },
}
- action_data = add_action_data(policy_id, data_id, category_id, value).get('data')
- action_data_id = list(action_data.keys())[0]
- delete_action_data(action_data[action_data_id].get('policy_id'), None)
- new_action_data = get_action_data(policy_id)
+ action_data = data_helper.add_action_data(policy_id=policy_id, category_id=action_category_id, value=value)
+ data_id = list(action_data["data"])[0]
+ data_helper.delete_action_data(policy_id, data_id)
+ new_action_data = data_helper.get_action_data(policy_id)
assert len(new_action_data[0]['data']) == 0
def test_get_object_data(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "object_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "object-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- add_object_data(policy_id, data_id, category_id, value)
- object_data = get_object_data(policy_id, data_id, category_id)
- assert object_data
- assert len(object_data[0]['data']) == 1
+ object_data = data_helper.add_object_data(policy_id=policy_id, category_id=object_category_id, value=value)
+ data_id = list(object_data["data"])[0]
+ found_object_data = data_helper.get_object_data(policy_id=policy_id, data_id=data_id,
+ category_id=object_category_id)
+ assert found_object_data
+ assert len(found_object_data[0]['data']) == 1
def test_get_object_data_with_invalid_category_id(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "object_category_id1"
- value = {
- "name": "object-security-level",
- "description": {"low": "", "medium": "", "high": ""},
- }
- add_object_data(policy_id, data_id, category_id, value)
- object_data = get_object_data(policy_id)
- assert object_data
- assert len(object_data[0]['data']) == 1
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ object_data = data_helper.get_object_data(policy_id=policy_id, category_id="invalid")
+ assert len(object_data) == 0
def test_add_object_data(db):
- policy_id = mock_data.get_policy_id()
- data_id = "data_id_1"
- category_id = "object_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "object-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- object_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+ object_data = data_helper.add_object_data(policy_id=policy_id, category_id=object_category_id, value=value).get(
+ 'data')
assert object_data
object_data_id = list(object_data.keys())[0]
assert object_data[object_data_id].get('policy_id') == policy_id
def test_add_object_data_with_invalid_category_id(db):
- policy_id = mock_data.get_policy_id()
- data_id = "data_id_1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "object-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- with pytest.raises(Exception) as exception_info:
- add_object_data(policy_id=policy_id, data_id=data_id, value=value).get('data')
- assert str(exception_info.value) == 'Invalid category id'
+ with pytest.raises(MetaDataUnknown) as exception_info:
+ data_helper.add_object_data(policy_id=policy_id, category_id="invalid", value=value).get('data')
+ assert str(exception_info.value) == '400: Meta data Unknown'
def test_delete_object_data(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "object_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "object-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- object_data = add_object_data(policy_id, data_id, category_id, value).get('data')
+ object_data = data_helper.add_object_data(policy_id=policy_id, category_id=object_category_id, value=value).get(
+ 'data')
object_data_id = list(object_data.keys())[0]
- delete_object_data(object_data[object_data_id].get('policy_id'), data_id)
- new_object_data = get_object_data(policy_id)
+ data_helper.delete_object_data(policy_id=object_data[object_data_id].get('policy_id'), data_id=object_data_id)
+ new_object_data = data_helper.get_object_data(policy_id)
assert len(new_object_data[0]['data']) == 0
def test_get_subject_data(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "subject_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- add_subject_data(policy_id, data_id, category_id, value)
- subject_data = get_subject_data(policy_id, data_id, category_id)
+ subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, value=value).get(
+ 'data')
+ subject_data_id = list(subject_data.keys())[0]
+ subject_data = data_helper.get_subject_data(policy_id, subject_data_id, subject_category_id)
assert subject_data
assert len(subject_data[0]['data']) == 1
def test_get_subject_data_with_invalid_category_id(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "subject_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- add_subject_data(policy_id, data_id, category_id, value)
- subject_data = get_subject_data(policy_id)
- assert subject_data
- assert len(subject_data[0]['data']) == 1
+ subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, value=value).get(
+ 'data')
+ subject_data_id = list(subject_data.keys())[0]
+ found_subject_data = data_helper.get_subject_data(policy_id, subject_data_id, "invalid")
+ assert len(found_subject_data) == 0
def test_add_subject_data(db):
- policy_id = mock_data.get_policy_id()
- data_id = "data_id_1"
- category_id = "subject_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- subject_data = add_subject_data(policy_id, data_id, category_id, value).get('data')
+ subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, value=value).get(
+ 'data')
assert subject_data
subject_data_id = list(subject_data.keys())[0]
assert subject_data[subject_data_id].get('policy_id') == policy_id
def test_add_subject_data_with_no_category_id(db):
- policy_id = mock_data.get_policy_id()
- data_id = "data_id_1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
with pytest.raises(Exception) as exception_info:
- add_subject_data(policy_id=policy_id, data_id=data_id, value=value).get('data')
+ data_helper.add_subject_data(policy_id=policy_id, data_id=subject_category_id, value=value).get('data')
assert str(exception_info.value) == 'Invalid category id'
def test_delete_subject_data(db):
- policy_id = mock_data.get_policy_id()
- get_available_metadata(policy_id)
- data_id = "data_id_1"
- category_id = "subject_category_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- subject_data = add_subject_data(policy_id, data_id, category_id, value).get('data')
+ subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, value=value).get(
+ 'data')
subject_data_id = list(subject_data.keys())[0]
- delete_subject_data(subject_data[subject_data_id].get('policy_id'), data_id)
- new_subject_data = get_subject_data(policy_id)
+ data_helper.delete_subject_data(subject_data[subject_data_id].get('policy_id'), subject_data_id)
+ new_subject_data = data_helper.get_subject_data(policy_id)
assert len(new_subject_data[0]['data']) == 0
def test_get_actions(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "test_action",
"description": "test",
}
- add_action(policy_id=policy_id, value=value)
- actions = get_actions(policy_id, )
+ data_helper.add_action(policy_id=policy_id, value=value)
+ actions = data_helper.get_actions(policy_id, )
assert actions
assert len(actions) == 1
action_id = list(actions.keys())[0]
@@ -330,24 +271,71 @@ def test_get_actions(db):
def test_add_action(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "test_action",
"description": "test",
}
- action = add_action(policy_id=policy_id, value=value)
+ action = data_helper.add_action(policy_id=policy_id, value=value)
assert action
action_id = list(action.keys())[0]
assert len(action[action_id].get('policy_list')) == 1
+def test_add_action_twice(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ value = {
+ "name": "test_action",
+ "description": "test",
+ }
+ data_helper.add_action(policy_id=policy_id, value=value)
+ with pytest.raises(ActionExisting):
+ data_helper.add_action(policy_id=policy_id, value=value)
+
+
+def test_add_action_blank_name(db):
+ policy_id = policies.mock_data.get_policy_id()
+ value = {
+ "name": "",
+ "description": "test",
+ }
+ with pytest.raises(Exception) as exception_info:
+ data_helper.add_action(policy_id=policy_id, value=value)
+ assert str(exception_info.value) == '400: Perimeter Name is Invalid'
+
+
+def test_add_action_with_name_space(db):
+ policy_id = policies.mock_data.get_policy_id()
+ value = {
+ "name": " ",
+ "description": "test",
+ }
+ with pytest.raises(Exception) as exception_info:
+ data_helper.add_action(policy_id=policy_id, value=value)
+ assert str(exception_info.value) == '400: Perimeter Name is Invalid'
+
+
def test_add_action_multiple_times(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_action",
"description": "test",
}
- action = add_action(policy_id=policy_id, value=value)
+ action = data_helper.add_action(policy_id=policy_id1, value=value)
+ logger.info("action : {}".format(action))
action_id = list(action.keys())[0]
perimeter_id = action[action_id].get('id')
assert action
@@ -356,22 +344,33 @@ def test_add_action_multiple_times(db):
"description": "test",
"policy_list": ['policy_id_3', 'policy_id_4']
}
- action = add_action(mock_data.get_policy_id(), perimeter_id, value)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = mock_data.create_new_policy(
+ subject_category_name="subject_category2",
+ object_category_name="object_category2",
+ action_category_name="action_category2",
+ meta_rule_name="meta_rule_2",
+ model_name="model2")
+ action = data_helper.add_action(policy_id=policy_id2, perimeter_id=perimeter_id, value=value)
+ logger.info("action : {}".format(action))
assert action
action_id = list(action.keys())[0]
assert len(action[action_id].get('policy_list')) == 2
def test_delete_action(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "test_action",
"description": "test",
}
- action = add_action(policy_id=policy_id, value=value)
+ action = data_helper.add_action(policy_id=policy_id, value=value)
action_id = list(action.keys())[0]
- delete_action(policy_id, action_id)
- actions = get_actions(policy_id, )
+ data_helper.delete_action(policy_id, action_id)
+ actions = data_helper.get_actions(policy_id, )
assert not actions
@@ -379,18 +378,22 @@ def test_delete_action_with_invalid_perimeter_id(db):
policy_id = "invalid"
perimeter_id = "invalid"
with pytest.raises(Exception) as exception_info:
- delete_action(policy_id, perimeter_id)
- assert str(exception_info.value) == '400: Action Unknown'
+ data_helper.delete_action(policy_id, perimeter_id)
+ assert str(exception_info.value) == '400: Policy Unknown'
def test_get_objects(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "test_object",
"description": "test",
}
- add_object(policy_id=policy_id, value=value)
- objects = get_objects(policy_id, )
+ data_helper.add_object(policy_id=policy_id, value=value)
+ objects = data_helper.get_objects(policy_id, )
assert objects
assert len(objects) == 1
object_id = list(objects.keys())[0]
@@ -398,48 +401,69 @@ def test_get_objects(db):
def test_add_object(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "test_object",
"description": "test",
}
- added_object = add_object(policy_id=policy_id, value=value)
+ added_object = data_helper.add_object(policy_id=policy_id, value=value)
assert added_object
object_id = list(added_object.keys())[0]
assert len(added_object[object_id].get('policy_list')) == 1
+ with pytest.raises(ObjectExisting):
+ data_helper.add_object(policy_id=policy_id, value=value)
+
def test_add_objects_multiple_times(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_object",
"description": "test",
}
- added_object = add_object(policy_id=policy_id, value=value)
+ added_object = data_helper.add_object(policy_id=policy_id, value=value)
object_id = list(added_object.keys())[0]
perimeter_id = added_object[object_id].get('id')
assert added_object
value = {
"name": "test_object",
"description": "test",
- "policy_list": ['policy_id_3', 'policy_id_4']
}
- added_object = add_object(mock_data.get_policy_id(), perimeter_id, value)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category2",
+ object_category_name="object_category2",
+ action_category_name="action_category2",
+ meta_rule_name="meta_rule_2",
+ model_name="model2")
+ added_object = data_helper.add_object(policy_id=policy_id, perimeter_id=perimeter_id, value=value)
assert added_object
object_id = list(added_object.keys())[0]
assert len(added_object[object_id].get('policy_list')) == 2
def test_delete_object(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "test_object",
"description": "test",
}
- added_object = add_object(policy_id=policy_id, value=value)
+ added_object = data_helper.add_object(policy_id=policy_id, value=value)
object_id = list(added_object.keys())[0]
- delete_object(policy_id, object_id)
- objects = get_objects(policy_id, )
+ data_helper.delete_object(policy_id, object_id)
+ objects = data_helper.get_objects(policy_id, )
assert not objects
@@ -447,67 +471,107 @@ def test_delete_object_with_invalid_perimeter_id(db):
policy_id = "invalid"
perimeter_id = "invalid"
with pytest.raises(Exception) as exception_info:
- delete_object(policy_id, perimeter_id)
- assert str(exception_info.value) == '400: Object Unknown'
+ data_helper.delete_object(policy_id, perimeter_id)
+ assert str(exception_info.value) == '400: Policy Unknown'
def test_get_subjects(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "testuser",
"description": "test",
}
- add_subject(policy_id=policy_id, value=value)
- subjects = get_subjects(policy_id, )
+ data_helper.add_subject(policy_id=policy_id, value=value)
+ subjects = data_helper.get_subjects(policy_id=policy_id)
assert subjects
assert len(subjects) == 1
subject_id = list(subjects.keys())[0]
assert subjects[subject_id].get('policy_list')[0] == policy_id
+def test_get_subjects_with_invalid_policy_id(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ value = {
+ "name": "testuser",
+ "description": "test",
+ }
+ data_helper.add_subject(policy_id=policy_id, value=value)
+ with pytest.raises(PolicyUnknown):
+ data_helper.get_subjects(policy_id="invalid")
+
+
def test_add_subject(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "testuser",
"description": "test",
}
- subject = add_subject(policy_id=policy_id, value=value)
+ subject = data_helper.add_subject(policy_id=policy_id, value=value)
assert subject
subject_id = list(subject.keys())[0]
assert len(subject[subject_id].get('policy_list')) == 1
+ with pytest.raises(SubjectExisting) as exception_info:
+ data_helper.add_subject(policy_id=policy_id, value=value)
+ assert str(exception_info.value) == '409: Subject Existing'
def test_add_subjects_multiple_times(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "testuser",
"description": "test",
}
- subject = add_subject(policy_id=policy_id, value=value)
+ subject = data_helper.add_subject(policy_id=policy_id, value=value)
subject_id = list(subject.keys())[0]
perimeter_id = subject[subject_id].get('id')
assert subject
value = {
"name": "testuser",
"description": "test",
- "policy_list": ['policy_id_3', 'policy_id_4']
}
- subject = add_subject(mock_data.get_policy_id(), perimeter_id, value)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category2",
+ object_category_name="object_category2",
+ action_category_name="action_category2",
+ meta_rule_name="meta_rule_2",
+ model_name="model2")
+ subject = data_helper.add_subject(policy_id=policy_id, perimeter_id=perimeter_id, value=value)
assert subject
subject_id = list(subject.keys())[0]
assert len(subject[subject_id].get('policy_list')) == 2
def test_delete_subject(db):
- policy_id = mock_data.get_policy_id()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
value = {
"name": "testuser",
"description": "test",
}
- subject = add_subject(policy_id=policy_id, value=value)
+ subject = data_helper.add_subject(policy_id=policy_id, value=value)
subject_id = list(subject.keys())[0]
- delete_subject(policy_id, subject_id)
- subjects = get_subjects(policy_id, )
+ data_helper.delete_subject(policy_id, subject_id)
+ subjects = data_helper.get_subjects(policy_id, )
assert not subjects
@@ -515,30 +579,24 @@ def test_delete_subject_with_invalid_perimeter_id(db):
policy_id = "invalid"
perimeter_id = "invalid"
with pytest.raises(Exception) as exception_info:
- delete_subject(policy_id, perimeter_id)
- assert str(exception_info.value) == '400: Subject Unknown'
+ data_helper.delete_subject(policy_id, perimeter_id)
+ assert str(exception_info.value) == '400: Policy Unknown'
def test_get_available_metadata(db):
- policy_id = mock_data.get_policy_id()
- metadata = get_available_metadata(policy_id=policy_id)
- assert metadata
- assert metadata['object'][0] == "object_category_id1"
- assert metadata['subject'][0] == "subject_category_id1"
- assert metadata['subject'][1] == "subject_category_id2"
-
-
-def test_get_available_metadata_empty_model(db):
- import policies.test_policies as test_policies
- value = mock_data.create_policy("invalid")
- policy = test_policies.add_policies(value=value)
- assert policy
- policy_id = list(policy.keys())[0]
- metadata = get_available_metadata(policy_id=policy_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1")
+ metadata = data_helper.get_available_metadata(policy_id=policy_id)
assert metadata
+ assert metadata['object'][0] == object_category_id
+ assert metadata['subject'][0] == subject_category_id
+ assert metadata['action'][0] == action_category_id
def test_get_available_metadata_with_invalid_policy_id(db):
with pytest.raises(Exception) as exception_info:
- get_available_metadata(policy_id='invalid')
+ data_helper.get_available_metadata(policy_id='invalid')
assert '400: Policy Unknown' == str(exception_info.value)
diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py
index 148034ef..07ee87fd 100755
--- a/python_moondb/tests/unit_python/policies/test_policies.py
+++ b/python_moondb/tests/unit_python/policies/test_policies.py
@@ -4,69 +4,14 @@
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
import pytest
-import policies.mock_data as mock_data
-
-
-def get_policies():
- from python_moondb.core import PolicyManager
- return PolicyManager.get_policies("admin")
-
-
-def add_policies(policy_id=None, value=None):
- from python_moondb.core import PolicyManager
- if not value:
- value = {
- "name": "test_policiy",
- "model_id": "",
- "genre": "authz",
- "description": "test",
- }
- return PolicyManager.add_policy("admin", policy_id=policy_id, value=value)
-
-
-def delete_policies(uuid=None, name=None):
- from python_moondb.core import PolicyManager
- if not uuid:
- for policy_id, policy_value in get_policies():
- if name == policy_value['name']:
- uuid = policy_id
- break
- PolicyManager.delete_policy("admin", uuid)
-
-
-def update_policy(policy_id, value):
- from python_moondb.core import PolicyManager
- return PolicyManager.update_policy("admin", policy_id, value)
-
-
-def get_policy_from_meta_rules(meta_rule_id):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id)
-
-
-def get_rules(policy_id=None, meta_rule_id=None, rule_id=None):
- from python_moondb.core import PolicyManager
- return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id)
-
-
-def add_rule(policy_id=None, meta_rule_id=None, value=None):
- from python_moondb.core import PolicyManager
- if not value:
- value = {
- "rule": ("high", "medium", "vm-action"),
- "instructions": ({"decision": "grant"}),
- "enabled": "",
- }
- return PolicyManager.add_rule("", policy_id, meta_rule_id, value)
-
-
-def delete_rule(policy_id=None, rule_id=None):
- from python_moondb.core import PolicyManager
- PolicyManager.delete_rule("", policy_id, rule_id)
+import helpers.mock_data as mock_data
+import helpers.policy_helper as policy_helper
+from python_moonutilities.exceptions import *
+import helpers.pdp_helper as pdp_helper
def test_get_policies(db):
- policies = get_policies()
+ policies = policy_helper.get_policies()
assert isinstance(policies, dict)
assert not policies
@@ -78,7 +23,7 @@ def test_add_policies(db):
"genre": "authz",
"description": "test",
}
- policies = add_policies(value=value)
+ policies = policy_helper.add_policies(value=value)
assert isinstance(policies, dict)
assert policies
assert len(policies.keys()) == 1
@@ -96,10 +41,23 @@ def test_add_policies_twice_with_same_id(db):
"genre": "authz",
"description": "test",
}
- add_policies(policy_id, value)
+ policy_helper.add_policies(policy_id, value)
+ with pytest.raises(PolicyExisting) as exception_info:
+ policy_helper.add_policies(policy_id, value)
+ # assert str(exception_info.value) == '409: Policy Error'
+
+
+def test_add_policies_twice_with_same_name(db):
+ value = {
+ "name": "test_policy",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policy_helper.add_policies(value=value)
with pytest.raises(Exception) as exception_info:
- add_policies(policy_id, value)
- assert str(exception_info.value) == '409: Policy Error'
+ policy_helper.add_policies(value=value)
+ # assert str(exception_info.value) == '409: Policy Error'
def test_delete_policies(db):
@@ -109,7 +67,7 @@ def test_delete_policies(db):
"genre": "authz",
"description": "test",
}
- policies = add_policies(value=value)
+ policies = policy_helper.add_policies(value=value)
policy_id1 = list(policies.keys())[0]
value = {
"name": "test_policy2",
@@ -117,23 +75,23 @@ def test_delete_policies(db):
"genre": "authz",
"description": "test",
}
- policies = add_policies(value=value)
+ policies = policy_helper.add_policies(value=value)
policy_id2 = list(policies.keys())[0]
assert policy_id1 != policy_id2
- delete_policies(policy_id1)
- policies = get_policies()
+ policy_helper.delete_policies(policy_id1)
+ policies = policy_helper.get_policies()
assert policy_id1 not in policies
def test_delete_policies_with_invalid_id(db):
policy_id = 'policy_id_1'
- with pytest.raises(Exception) as exception_info:
- delete_policies(policy_id)
- assert str(exception_info.value) == '400: Policy Unknown'
+ with pytest.raises(PolicyUnknown) as exception_info:
+ policy_helper.delete_policies(policy_id)
+ # assert str(exception_info.value) == '400: Policy Unknown'
def test_update_policy(db):
- policies = add_policies()
+ policies = policy_helper.add_policies()
policy_id = list(policies.keys())[0]
value = {
"name": "test_policy4",
@@ -141,7 +99,7 @@ def test_update_policy(db):
"genre": "authz",
"description": "test-3",
}
- updated_policy = update_policy(policy_id, value)
+ updated_policy = policy_helper.update_policy(policy_id, value)
assert updated_policy
for key in ("genre", "name", "model_id", "description"):
assert key in updated_policy[policy_id]
@@ -156,33 +114,27 @@ def test_update_policy_with_invalid_id(db):
"genre": "authz",
"description": "test-3",
}
- with pytest.raises(Exception) as exception_info:
- update_policy(policy_id, value)
- assert str(exception_info.value) == '400: Policy Unknown'
+ with pytest.raises(PolicyUnknown) as exception_info:
+ policy_helper.update_policy(policy_id, value)
+ # assert str(exception_info.value) == '400: Policy Unknown'
def test_get_policy_from_meta_rules(db):
- import models.test_models as test_models
- import models.test_meta_rules as test_meta_rules
- import test_pdp as test_pdp
- meta_rule = test_meta_rules.add_meta_rule(value=mock_data.create_meta_rule())
- meta_rule_id = list(meta_rule.keys())[0]
- model = test_models.add_model(value=mock_data.create_model(meta_rule_id))
- model_id = list(model.keys())[0]
- value = mock_data.create_policy(model_id)
- policy = add_policies(value=value)
- assert policy
- policy_id = list(policy.keys())[0]
- pdp_ids = [policy_id,]
- pdp_obj = mock_data.create_pdp(pdp_ids)
- test_pdp.add_pdp(value=pdp_obj)
- matched_policy_id = get_policy_from_meta_rules(meta_rule_id)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
+ security_pipeline = [policy_id]
+ pdp_obj = mock_data.create_pdp(security_pipeline)
+ pdp_helper.add_pdp(value=pdp_obj)
+ matched_policy_id = policy_helper.get_policy_from_meta_rules(meta_rule_id)
assert matched_policy_id
assert policy_id == matched_policy_id
def test_get_policy_from_meta_rules_with_no_policy_ids(db):
- import test_pdp as test_pdp
meta_rule_id = 'meta_rule_id'
value = {
"name": "test_pdp",
@@ -190,58 +142,31 @@ def test_get_policy_from_meta_rules_with_no_policy_ids(db):
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- test_pdp.add_pdp(value=value)
- matched_policy_id = get_policy_from_meta_rules(meta_rule_id)
+ pdp_helper.add_pdp(value=value)
+ matched_policy_id = policy_helper.get_policy_from_meta_rules(meta_rule_id)
assert not matched_policy_id
-def test_get_policy_from_meta_rules_with_no_policies(db):
- import test_pdp as test_pdp
- meta_rule_id = 'meta_rule_id'
- policy_id = 'invalid'
- pdp_ids = [policy_id,]
- pdp_obj = mock_data.create_pdp(pdp_ids)
- test_pdp.add_pdp(value=pdp_obj)
- with pytest.raises(Exception) as exception_info:
- get_policy_from_meta_rules(meta_rule_id)
- assert str(exception_info.value) == '400: Policy Unknown'
-
-
-def test_get_policy_from_meta_rules_with_no_models(db):
- import models.test_meta_rules as test_meta_rules
- import test_pdp as test_pdp
- meta_rule = test_meta_rules.add_meta_rule(value=mock_data.create_meta_rule())
- meta_rule_id = list(meta_rule.keys())[0]
- model_id = 'invalid'
- value = mock_data.create_policy(model_id)
- policy = add_policies(value=value)
- assert policy
- policy_id = list(policy.keys())[0]
- pdp_ids = [policy_id,]
- pdp_obj = mock_data.create_pdp(pdp_ids)
- test_pdp.add_pdp(value=pdp_obj)
- with pytest.raises(Exception) as exception_info:
- get_policy_from_meta_rules(meta_rule_id)
- assert str(exception_info.value) == '400: Model Unknown'
-
-
def test_get_rules(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category12",
+ object_category_name="object_category12",
+ action_category_name="action_category12",
+ meta_rule_name="meta_rule_12",
+ model_name="model12")
value = {
"rule": ("low", "medium", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = mock_data.get_policy_id()
- meta_rule_id = "1"
- add_rule(policy_id, meta_rule_id, value)
+ policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
value = {
"rule": ("low", "low", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- meta_rule_id = "1"
- add_rule(policy_id, meta_rule_id, value)
- rules = get_rules(policy_id, meta_rule_id)
+ policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
+ rules = policy_helper.get_rules(policy_id=policy_id, meta_rule_id=meta_rule_id)
assert isinstance(rules, dict)
assert rules
obj = rules.get('rules')
@@ -249,20 +174,25 @@ def test_get_rules(db):
def test_get_rules_with_invalid_policy_id_failure(db):
- rules = get_rules("invalid_policy_id", "meta_rule_id")
+ rules = policy_helper.get_rules("invalid_policy_id", "meta_rule_id")
assert not rules.get('meta_rule-id')
assert len(rules.get('rules')) == 0
def test_add_rule(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"rule": ("high", "medium", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = mock_data.get_policy_id()
- meta_rule_id = "1"
- rules = add_rule(policy_id, meta_rule_id, value)
+
+ rules = policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
assert rules
assert len(rules) == 1
assert isinstance(rules, dict)
@@ -271,17 +201,45 @@ def test_add_rule(db):
assert key in rules[rule_id]
assert rules[rule_id][key] == value[key]
+ with pytest.raises(RuleExisting):
+ policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value)
+
def test_delete_rule(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category14",
+ object_category_name="object_category14",
+ action_category_name="action_category14",
+ meta_rule_name="meta_rule_14",
+ model_name="model14")
value = {
"rule": ("low", "low", "vm-action"),
"instructions": ({"decision": "grant"}),
"enabled": "",
}
- policy_id = mock_data.get_policy_id()
- meta_rule_id = "2"
- rules = add_rule(policy_id, meta_rule_id, value)
+ rules = policy_helper.add_rule(policy_id, meta_rule_id, value)
rule_id = list(rules.keys())[0]
- delete_rule(policy_id, rule_id)
- rules = get_rules(policy_id, meta_rule_id)
+ policy_helper.delete_rule(policy_id, rule_id)
+ rules = policy_helper.get_rules(policy_id, meta_rule_id)
assert not rules.get('rules')
+
+
+def test_delete_policies_with_pdp(db):
+ value = {
+ "name": "test_policy1",
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policies = policy_helper.add_policies(value=value)
+ policy_id1 = list(policies.keys())[0]
+ pdp_id = "pdp_id1"
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": [policy_id1],
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ pdp_helper.add_pdp(pdp_id=pdp_id, value=value)
+ with pytest.raises(DeletePolicyWithPdp) as exception_info:
+ policy_helper.delete_policies(policy_id1)
diff --git a/python_moondb/tests/unit_python/requirements.txt b/python_moondb/tests/unit_python/requirements.txt
index 5f507ff7..ff727723 100644
--- a/python_moondb/tests/unit_python/requirements.txt
+++ b/python_moondb/tests/unit_python/requirements.txt
@@ -1,5 +1,4 @@
sqlalchemy
pymysql
-pytest
requests_mock
python_moonutilities \ No newline at end of file
diff --git a/python_moondb/tests/unit_python/test_pdp.py b/python_moondb/tests/unit_python/test_pdp.py
index 5134c0fb..4d245e4d 100755
--- a/python_moondb/tests/unit_python/test_pdp.py
+++ b/python_moondb/tests/unit_python/test_pdp.py
@@ -1,112 +1,149 @@
import pytest
-
-
-def update_pdp(pdp_id, value):
- from python_moondb.core import PDPManager
- return PDPManager.update_pdp("", pdp_id, value)
-
-
-def delete_pdp(pdp_id):
- from python_moondb.core import PDPManager
- PDPManager.delete_pdp("", pdp_id)
-
-
-def add_pdp(pdp_id=None, value=None):
- from python_moondb.core import PDPManager
- return PDPManager.add_pdp("", pdp_id, value)
-
-
-def get_pdp(pdp_id=None):
- from python_moondb.core import PDPManager
- return PDPManager.get_pdp("", pdp_id)
+import helpers.mock_data as mock_data
+import helpers.pdp_helper as pdp_helper
def test_update_pdp(db):
pdp_id = "pdp_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_pdp",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "security_pipeline": [policy_id],
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- add_pdp(pdp_id, value)
- pdp = update_pdp(pdp_id, value)
+ pdp_helper.add_pdp(pdp_id, value)
+ pdp = pdp_helper.update_pdp(pdp_id, value)
assert pdp
def test_update_pdp_with_invalid_id(db):
pdp_id = "pdp_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_pdp",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "security_pipeline": [policy_id],
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
with pytest.raises(Exception) as exception_info:
- update_pdp(pdp_id, value)
+ pdp_helper.update_pdp(pdp_id, value)
assert str(exception_info.value) == '400: Pdp Unknown'
def test_delete_pdp(db):
pdp_id = "pdp_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_pdp",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "security_pipeline": [policy_id],
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- add_pdp(pdp_id, value)
- delete_pdp(pdp_id)
- assert len(get_pdp(pdp_id)) == 0
+ pdp_helper.add_pdp(pdp_id, value)
+ pdp_helper.delete_pdp(pdp_id)
+ assert len(pdp_helper.get_pdp(pdp_id)) == 0
def test_delete_pdp_with_invalid_id(db):
pdp_id = "pdp_id1"
with pytest.raises(Exception) as exception_info:
- delete_pdp(pdp_id)
+ pdp_helper.delete_pdp(pdp_id)
assert str(exception_info.value) == '400: Pdp Unknown'
def test_add_pdp(db):
pdp_id = "pdp_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_pdp",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "security_pipeline": [policy_id],
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- pdp = add_pdp(pdp_id, value)
+ pdp = pdp_helper.add_pdp(pdp_id, value)
assert pdp
def test_add_pdp_twice_with_same_id(db):
pdp_id = "pdp_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": [policy_id],
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ pdp_helper.add_pdp(pdp_id, value)
+ with pytest.raises(Exception) as exception_info:
+ pdp_helper.add_pdp(pdp_id, value)
+ assert str(exception_info.value) == '409: Pdp Error'
+
+
+def test_add_pdp_twice_with_same_name(db):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_pdp",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "security_pipeline": [policy_id],
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- add_pdp(pdp_id, value)
+ pdp_helper.add_pdp(value=value)
with pytest.raises(Exception) as exception_info:
- add_pdp(pdp_id, value)
+ pdp_helper.add_pdp(value=value)
assert str(exception_info.value) == '409: Pdp Error'
def test_get_pdp(db):
pdp_id = "pdp_id1"
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy(
+ subject_category_name="subject_category1",
+ object_category_name="object_category1",
+ action_category_name="action_category1",
+ meta_rule_name="meta_rule_1",
+ model_name="model1")
value = {
"name": "test_pdp",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "security_pipeline": [policy_id],
"keystone_project_id": "keystone_project_id1",
"description": "...",
}
- add_pdp(pdp_id, value)
- pdp = get_pdp(pdp_id)
+ pdp_helper.add_pdp(pdp_id, value)
+ pdp = pdp_helper.get_pdp(pdp_id)
assert len(pdp) == 1
def test_get_pdp_with_invalid_id(db):
pdp_id = "invalid"
- pdp = get_pdp(pdp_id)
+ pdp = pdp_helper.get_pdp(pdp_id)
assert len(pdp) == 0