aboutsummaryrefslogtreecommitdiffstats
path: root/moon_utilities/tests/unit_python
diff options
context:
space:
mode:
Diffstat (limited to 'moon_utilities/tests/unit_python')
-rw-r--r--moon_utilities/tests/unit_python/api/__init__.py12
-rw-r--r--moon_utilities/tests/unit_python/api/test_auth_functions.py83
-rw-r--r--moon_utilities/tests/unit_python/api/test_import_to_cache.py775
-rw-r--r--moon_utilities/tests/unit_python/api/test_import_to_db.py772
-rw-r--r--moon_utilities/tests/unit_python/api/test_invalidate_function.py64
-rw-r--r--moon_utilities/tests/unit_python/api/test_security_functions.py18
-rw-r--r--moon_utilities/tests/unit_python/conftest.py172
-rw-r--r--moon_utilities/tests/unit_python/helpers/__init__.py13
-rw-r--r--moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py234
-rw-r--r--moon_utilities/tests/unit_python/helpers/import_export_db_helper.py178
-rw-r--r--moon_utilities/tests/unit_python/helpers/slaves_helpers.py32
-rw-r--r--moon_utilities/tests/unit_python/mock_slaves.py50
-rw-r--r--moon_utilities/tests/unit_python/requirements.txt10
13 files changed, 2413 insertions, 0 deletions
diff --git a/moon_utilities/tests/unit_python/api/__init__.py b/moon_utilities/tests/unit_python/api/__init__.py
new file mode 100644
index 00000000..1856aa2c
--- /dev/null
+++ b/moon_utilities/tests/unit_python/api/__init__.py
@@ -0,0 +1,12 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
diff --git a/moon_utilities/tests/unit_python/api/test_auth_functions.py b/moon_utilities/tests/unit_python/api/test_auth_functions.py
new file mode 100644
index 00000000..70af19c1
--- /dev/null
+++ b/moon_utilities/tests/unit_python/api/test_auth_functions.py
@@ -0,0 +1,83 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import os
+from uuid import uuid4
+import pytest
+from moon_utilities.auth_functions import xor_encode, xor_decode
+from moon_utilities import exceptions
+from moon_utilities.auth_functions import init_db, add_user, authenticate_user
+from moon_utilities.auth_functions import authenticate_key, get_api_key, del_api_key_for_user
+
+
+def test_xor():
+ uuid1 = uuid4().hex
+ my_key = uuid4().hex
+ crypted_data = xor_encode(uuid1, my_key)
+ assert uuid1 != crypted_data
+ decrypted_data = xor_decode(crypted_data, my_key)
+ assert uuid1 == decrypted_data
+
+
+def test_decrypt_exceptions():
+ with pytest.raises(exceptions.DecryptError):
+ uuid1 = uuid4().hex
+ my_key = uuid4().hex
+ crypted_data = xor_encode(uuid1, my_key)
+ assert uuid1 != crypted_data
+ my_key = False
+ xor_decode(crypted_data, my_key)
+ with pytest.raises(exceptions.DecryptError):
+ uuid1 = uuid4().hex
+ my_key = uuid4().hex
+ crypted_data = xor_encode(uuid1, my_key)
+ assert uuid1 != crypted_data
+ my_key = ""
+ xor_decode(crypted_data, my_key)
+
+
+def test_encrypt_exceptions():
+ with pytest.raises(exceptions.EncryptError):
+ uuid1 = uuid4().hex
+ my_key = False
+ xor_encode(uuid1, my_key)
+ with pytest.raises(exceptions.EncryptError):
+ uuid1 = uuid4().hex
+ my_key = ""
+ xor_encode(uuid1, my_key)
+
+
+def test_auth_api():
+ try:
+ os.remove("/tmp/test.db")
+ except FileNotFoundError:
+ pass
+ init_db("/tmp/test.db")
+ # create the user
+ result = add_user("test_user", "1234567890")
+ assert result
+ # trying to auth the user
+ assert authenticate_user("test_user", "1234567890")
+ assert not authenticate_user("bad_test_user", "1234567890")
+ assert not authenticate_key(None)
+ assert not authenticate_key("")
+ assert authenticate_key(result['api_key']) == "test_user"
+ assert get_api_key("test_user", "1234567890") == result['api_key']
+ # logout the user
+ assert del_api_key_for_user("test_user")
+ assert get_api_key("test_user", "1234567890") != result['api_key']
+ assert get_api_key("test_user", "1234567890") is None
+ # re-authent user
+ assert authenticate_user("test_user", "1234567890")
+ # check that the previous api_key is not valid again
+ assert get_api_key("test_user", "1234567890") != result['api_key']
+ assert get_api_key("test_user", "1234567890")
diff --git a/moon_utilities/tests/unit_python/api/test_import_to_cache.py b/moon_utilities/tests/unit_python/api/test_import_to_cache.py
new file mode 100644
index 00000000..01e717fb
--- /dev/null
+++ b/moon_utilities/tests/unit_python/api/test_import_to_cache.py
@@ -0,0 +1,775 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import helpers.import_export_cache_helper as import_export_helper
+import pytest
+from moon_utilities import exceptions
+
+
+MODEL_WITHOUT_META_RULES = [
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}]},
+ {"models": [{"name": "test model", "description": "new description", "meta_rules": [],
+ "override": True}]},
+ {"models": [{"name": "test model", "description": "description not taken into account",
+ "meta_rules": [], "override": False}]}
+]
+
+POLICIES = [
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}]},
+ {"policies": [{"name": "test policy", "genre": "authz",
+ "description": "new description not taken into account",
+ "model": {"name": "test model"}, "mandatory": True}]},
+ {"policies": [
+ {"name": "test policy", "genre": "not authz ?", "description": "generates an exception",
+ "model": {"name": "test model"}, "override": True}]},
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [
+ {"name": "test policy", "genre": "not authz ?", "description": "changes taken into account",
+ "model": {"name": "test model"}, "override": True}]},
+]
+
+SUBJECTS = [
+ {"subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {}, "mandatory": True}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"subjects": [{"name": "testuser", "description": "new description of the subject",
+ "extra": {"email": "new-email@test.com"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": [{"name": "test policy"}]}]}]
+
+OBJECTS = [
+ {"objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": True}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"objects": [{"name": "test object", "description": "new description of the object",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": [{"name": "test policy"}]}]},
+]
+
+ACTIONS = [
+ {"actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {}, "mandatory": True}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"actions": [{"name": "test action", "description": "new description of the action",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": [{"name": "test policy"}]}]}]
+
+SUBJECT_CATEGORIES = [{"subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"}]},
+ {"subject_categories": [{"name": "test subject categories",
+ "description": "new subject category description"}]}]
+
+OBJECT_CATEGORIES = [{"object_categories": [
+ {"name": "test object categories", "description": "object category description"}]},
+ {"object_categories": [{"name": "test object categories",
+ "description": "new object category description"}]}]
+
+ACTION_CATEGORIES = [{"action_categories": [
+ {"name": "test action categories", "description": "action category description"}]},
+ {"action_categories": [{"name": "test action categories",
+ "description": "new action category description"}]}]
+
+# meta_rules import is needed otherwise the search for data do not work !!!
+PRE_DATA = {"models": [
+ {
+ "name": "test model", "description": "",
+ "meta_rules": [{"name": "good meta rule"},
+ {"name": "other good meta rule"}]}],
+ "policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"},
+ {"name": "other test subject categories",
+ "description": "subject category description"}],
+ "object_categories": [
+ {"name": "test object categories", "description": "object category description"},
+ {"name": "other test object categories",
+ "description": "object category description"}],
+ "action_categories": [
+ {"name": "test action categories", "description": "action category description"},
+ {"name": "other test action categories",
+ "description": "action category description"}],
+ "meta_rules": [
+ {
+ "name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]
+ },
+ {
+ "name": "other good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "other test subject categories"}],
+ "object_categories": [{"name": "other test object categories"}],
+ "action_categories": [{"name": "other test action categories"}]
+ }]}
+
+SUBJECT_DATA = [{"subject_data": [
+ {"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]},
+ {"subject_data": [
+ {"name": "not valid subject data", "description": "", "policies": [{}],
+ "category": {"name": "test subject categories"}}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "subject_data": [
+ {"name": "one valid subject data", "description": "description",
+ "policies": [{}], "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test subject categories"}}]}]
+
+OBJECT_DATA = [{"object_data": [
+ {"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]},
+ {"object_data": [
+ {"name": "not valid object data", "description": "", "policies": [{}],
+ "category": {"name": "test object categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "object_data": [
+ {"name": "one valid object data", "description": "description", "policies": [{}],
+ "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test object categories"}}]}]
+
+ACTION_DATA = [{"action_data": [
+ {"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]},
+ {"action_data": [
+ {"name": "not valid action data", "description": "", "policies": [{}],
+ "category": {"name": "test action categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "action_data": [
+ {"name": "one valid action data", "description": "description", "policies": [{}],
+ "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test action categories"}}]}]
+
+PRE_META_RULES = {"subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories",
+ "description": "object category description"}],
+ "action_categories": [{"name": "test action categories",
+ "description": "object action description"}]}
+
+META_RULES = [{"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "not valid category"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "not valid category"}],
+ "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "not valid category"}]}]},
+ {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}]}]
+
+PRE_ASSIGNMENTS = {"models": [
+ {"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}],
+ "policies": [
+ {"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [{"name": "test subject categories",
+ "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories",
+ "description": "object category description"}],
+ "action_categories": [{"name": "test action categories",
+ "description": "object action description"}],
+ "subjects": [{"name": "testuser", "description": "description of the subject",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object", "description": "description of the object",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action", "description": "description of the action",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}],
+ "subject_data": [{"name": "subject data", "description": "test subject data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "object data", "description": "test object data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "action data", "description": "test action data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test action categories"}}]}
+
+SUBJECT_ASSIGNMENTS = [
+ {"subject_assignments": [
+ {"subject": {"name": "unknown"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": None
+ }]
+
+OBJECT_ASSIGNMENTS = [
+ {"object_assignments": [
+ {"object": {"name": "unknown"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": None
+ }]
+
+ACTION_ASSIGNMENTS = [
+ {"action_assignments": [
+ {"action": {"name": "unknown"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": None
+ }]
+
+RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test "
+ "policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown "
+ "policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "unknown subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "unknown object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "unknown action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]}]
+
+
+def test_import_models_without_new_meta_rules():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ import_export_helper.clean_all()
+ counter = 0
+ for models_description in MODEL_WITHOUT_META_RULES:
+ data = import_to_cache.import_json(body=models_description)
+ assert "models" in data
+ models = _cache.models
+ assert len(list(models.keys())) == 1
+ values = list(models.values())
+ assert values[0]["name"] == "test model"
+ if counter == 0:
+ assert len(values[0]["description"]) == 0
+ if counter == 1 or counter == 2:
+ assert values[0]["description"] == "new description"
+ counter = counter + 1
+ import_export_helper.clean_all()
+
+
+def test_import_policies():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ counter = -1
+ for policy_description in POLICIES:
+ counter = counter + 1
+ if counter == 2:
+ with pytest.raises(exceptions.UnknownName):
+ import_to_cache.import_json(body=policy_description)
+ continue
+ else:
+ data = import_to_cache.import_json(body=policy_description)
+ assert "policies" in data
+ if counter == 2:
+ assert "models" in data
+
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ policies = _cache.policies
+ assert len(list(policies.keys())) == 1
+ values = list(policies.values())
+ assert values[0]["name"] == "test policy"
+ if counter < 3:
+ assert values[0]["genre"] == "authz"
+ assert values[0]["description"] == "description"
+ else:
+ assert values[0]["genre"] == "not authz ?"
+ assert values[0]["description"] == "changes taken into account"
+ assert len(values[0]["model_id"]) > 0
+
+
+def test_import_subject_object_action():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECTS
+ clean_method = import_export_helper.clean_subjects
+ name = "testuser"
+ key_extra = "email"
+ value_extra = "new-email@test.com"
+ elif type_element == "object":
+ elements = OBJECTS
+ clean_method = import_export_helper.clean_objects
+ name = "test object"
+ key_extra = "test"
+ value_extra = "test extra"
+ else:
+ elements = ACTIONS
+ clean_method = import_export_helper.clean_actions
+ name = "test action"
+ key_extra = "test"
+ value_extra = "test extra"
+
+ for element in elements:
+ counter = counter + 1
+ if counter == 2 or counter == 4:
+ clean_method()
+ if counter == 3:
+ clean_method()
+ data = import_to_cache.import_json(body=element)
+ elif counter < 2:
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ import_to_cache.import_json(body=element)
+ assert '400: Policy Unknown' == str(exception_info.value)
+ continue
+ else:
+ data = import_to_cache.import_json(body=element)
+ assert data
+
+ if counter != 3:
+ assert type_element+"s" in data
+ assert "policies" in data
+
+ get_elements = getattr(_cache, type_element + "s")
+
+ policy_key = list(get_elements.keys())[0]
+ assert len(list(get_elements[policy_key].keys())) == 1
+ values = list(get_elements[policy_key].values())
+ assert values[0]["name"] == name
+ if counter == 2 or counter == 4:
+ assert values[0]["description"] == "description of the " + type_element
+ if counter == 3:
+ assert values[0]["description"] == "new description of the " + type_element
+ assert values[0]["extra"][key_extra] == value_extra
+
+ assert len(values[0]["policy_list"]) == 1
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action_categories():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECT_CATEGORIES
+ elif type_element == "object":
+ elements = OBJECT_CATEGORIES
+ else:
+ elements = ACTION_CATEGORIES
+
+ for element in elements:
+ data = import_to_cache.import_json(body=element)
+ counter = counter + 1
+ assert type_element + "_categories" in data
+ get_elements = getattr(_cache, type_element + "_categories")
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == "test " + type_element + " categories"
+ assert values[0]["description"] == type_element + " category description"
+
+
+def test_import_meta_rules():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ import_export_helper.clean_all()
+ # import some categories
+ data = import_to_cache.import_json(body=PRE_META_RULES)
+ for cat in ['subject_categories', 'object_categories', 'action_categories']:
+ assert cat in data
+
+ counter = -1
+ for meta_rule in META_RULES:
+ counter = counter + 1
+ if counter != 3:
+ with pytest.raises(exceptions.UnknownName) as exception_info:
+ import_to_cache.import_json(body=meta_rule)
+ assert '400: Unknown Name.' == str(exception_info.value)
+ continue
+ else:
+ data = import_to_cache.import_json(body=meta_rule)
+ assert "meta_rules" in data
+ assert _cache.meta_rules
+
+ meta_rules = _cache.meta_rules
+ key = list(meta_rules.keys())[0]
+ assert isinstance(meta_rules, dict)
+ assert meta_rules[key]["name"] == "good meta rule"
+ assert meta_rules[key]["description"] == "valid meta rule"
+ assert len(meta_rules[key]["subject_categories"]) == 1
+ assert len(meta_rules[key]["object_categories"]) == 1
+ assert len(meta_rules[key]["action_categories"]) == 1
+
+ subject_category_key = meta_rules[key]["subject_categories"][0]
+ object_category_key = meta_rules[key]["object_categories"][0]
+ action_category_key = meta_rules[key]["action_categories"][0]
+
+ sub_cat = _cache.subject_categories
+ assert sub_cat[subject_category_key]["name"] == "test subject categories"
+
+ ob_cat = _cache.object_categories
+ assert ob_cat[object_category_key]["name"] == "test object categories"
+
+ ac_cat = _cache.action_categories
+ assert ac_cat[action_category_key]["name"] == "test action categories"
+
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action_assignments():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ import_export_helper.clean_all()
+
+ data = import_to_cache.import_json(body=PRE_ASSIGNMENTS)
+ for item in PRE_ASSIGNMENTS.keys():
+ assert item in data
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ counter = -1
+ if type_element == "subject":
+ datas = SUBJECT_ASSIGNMENTS
+ elif type_element == "object":
+ datas = OBJECT_ASSIGNMENTS
+ else:
+ datas = ACTION_ASSIGNMENTS
+
+ for assignments in datas:
+ counter = counter + 1
+ my_exception = assignments.pop("exception")
+ if my_exception:
+ with pytest.raises(my_exception) as exception_info:
+ import_to_cache.import_json(body=assignments)
+ assert '400:' in str(exception_info.value)
+ else:
+ data = import_to_cache.import_json(body=assignments)
+ assert type_element+"_assignments" in data
+ assert getattr(_cache, type_element+"_assignments")
+
+ assert len(getattr(_cache, type_element+"_assignments")) == 1
+
+
+def test_import_rules():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ import_export_helper.clean_all()
+
+ data = import_to_cache.import_json(body=PRE_ASSIGNMENTS)
+ for item in PRE_ASSIGNMENTS.keys():
+ assert item in data
+
+ counter = -1
+ for rule in RULES:
+ counter = counter + 1
+ if counter < 5:
+ with pytest.raises(exceptions.UnknownName) as exception_info:
+ import_to_cache.import_json(body=rule)
+
+ assert '400: Unknown Name.' == str(exception_info.value)
+ continue
+ data = import_to_cache.import_json(body=rule)
+ assert "rules" in data
+ policies = _cache.policies
+ policy_id = None
+ for policy in policies:
+ if policies[policy]['name'] == rule['rules'][0]['policy']['name']:
+ policy_id = policy
+ break
+
+ assert policy_id
+ rules = _cache.rules
+ assert len(rules) == 1
+ rule_content = list(rules.values())[0]
+ assert policy_id == rule_content["policy_id"]
+ assert rule_content["rules"]
+ assert len(rule_content["rules"]) == 1
+ assert rule_content["rules"][0]["enabled"]
+ assert rule_content["rules"][0]["instructions"][0]["decision"] == "grant"
+
+ meta_rules = _cache.meta_rules
+ assert meta_rules[list(meta_rules.keys())[0]]["name"] == "good meta rule"
+
+
+def test_import_subject_object_action_data():
+ from moon_utilities import json_utils
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache)
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ data = import_to_cache.import_json(body=PRE_DATA)
+ for key in PRE_DATA.keys():
+ assert key in data
+ counter = -1
+ if type_element == "subject":
+ elements = SUBJECT_DATA
+ get_method = _cache.subject_data
+ get_categories = _cache.subject_categories
+ elif type_element == "object":
+ elements = OBJECT_DATA
+ get_method = _cache.object_data
+ get_categories = _cache.object_categories
+ else:
+ elements = ACTION_DATA
+ get_method = _cache.action_data
+ get_categories = _cache.action_categories
+
+ for element in elements:
+ counter = counter + 1
+ if counter == 0 or counter == 1:
+ with pytest.raises(exceptions.MissingIdOrName) as exception_info:
+ import_to_cache.import_json(body=element)
+ assert '400: Missing ID or Name.' == str(exception_info.value)
+ continue
+ else:
+ data = import_to_cache.import_json(body=element)
+ for key in element.keys():
+ assert key in data
+
+ policies = _cache.policies
+ categories = get_categories
+ case_tested = False
+ for policy_key in policies.keys():
+ policy = policies[policy_key]
+ for category_key in categories:
+ get_elements = get_method
+ _all_data = []
+ for _data in get_elements:
+ if category_key == _data["category_id"] and policy_key == _data["policy_id"]:
+ _all_data.append(_data)
+ get_elements = _all_data
+ if not get_elements:
+ continue
+ if policy["name"] == "test policy":
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ if counter == 2:
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "one valid " + type_element + " data"
+ if counter == 3:
+ assert len(el["data"].keys()) == 2
+ el1 = el["data"][list(el["data"].keys())[0]]
+ el2 = el["data"][list(el["data"].keys())[1]]
+ if "value" in el1:
+ el1 = el1["value"]
+ el2 = el2["value"]
+ assert (el1["name"] == "one valid " + type_element + " data" and el2[
+ "name"] == "valid " + type_element + " data") or (el2[
+ "name"] == "one valid " + type_element + " data" and
+ el1[
+ "name"] == "valid " + type_element + " data")
+ assert el1["description"] == "description"
+ assert el2["description"] == "description"
+
+ case_tested = True
+
+ if policy["name"] == "test other policy":
+ if counter == 4:
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "valid " + type_element + " data"
+ assert el["description"] == "new description"
+ case_tested = True
+
+ assert case_tested is True
+
+
+def test_clean():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ import_export_helper.clean_all()
+ assert not _cache.subject_categories
+ assert not _cache.object_categories
+ assert not _cache.action_categories
+ assert not _cache.subjects
+ assert not _cache.objects
+ assert not _cache.actions
+ assert not _cache.subject_data
+ assert not _cache.object_data
+ assert not _cache.action_data
+ assert not _cache.subject_assignments
+ assert not _cache.object_assignments
+ assert not _cache.action_assignments
+ assert not _cache.models
+ assert not _cache.pdp
+ assert not _cache.policies
diff --git a/moon_utilities/tests/unit_python/api/test_import_to_db.py b/moon_utilities/tests/unit_python/api/test_import_to_db.py
new file mode 100644
index 00000000..34479b8c
--- /dev/null
+++ b/moon_utilities/tests/unit_python/api/test_import_to_db.py
@@ -0,0 +1,772 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import pytest
+from moon_utilities import exceptions
+import helpers.import_export_db_helper as import_export_helper
+
+
+MODEL_WITHOUT_META_RULES = [
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}]},
+ {"models": [{"name": "test model", "description": "new description", "meta_rules": [],
+ "override": True}]},
+ {"models": [{"name": "test model", "description": "description not taken into account",
+ "meta_rules": [], "override": False}]}
+]
+
+POLICIES = [
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}]},
+ {"policies": [{"name": "test policy", "genre": "authz",
+ "description": "new description not taken into account",
+ "model": {"name": "test model"}, "mandatory": True}]},
+ {"policies": [
+ {"name": "test policy", "genre": "not authz ?", "description": "generates an exception",
+ "model": {"name": "test model"}, "override": True}]},
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [
+ {"name": "test policy", "genre": "not authz ?", "description": "changes taken into account",
+ "model": {"name": "test model"}, "override": True}]},
+]
+
+SUBJECTS = [
+ {"subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {}, "mandatory": True}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"subjects": [{"name": "testuser", "description": "new description of the subject",
+ "extra": {"email": "new-email@test.com"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": [{"name": "test policy"}]}]}]
+
+OBJECTS = [
+ {"objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": True}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"objects": [{"name": "test object", "description": "new description of the object",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": [{"name": "test policy"}]}]},
+]
+
+ACTIONS = [{"actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {}, "mandatory": True}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"actions": [{"name": "test action", "description": "new description of the action",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": [{"name": "test policy"}]}]}]
+
+SUBJECT_CATEGORIES = [{"subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"}]},
+ {"subject_categories": [{"name": "test subject categories",
+ "description": "new subject category description"}]}]
+
+OBJECT_CATEGORIES = [{"object_categories": [
+ {"name": "test object categories", "description": "object category description"}]},
+ {"object_categories": [{"name": "test object categories",
+ "description": "new object category description"}]}]
+
+ACTION_CATEGORIES = [{"action_categories": [
+ {"name": "test action categories", "description": "action category description"}]},
+ {"action_categories": [{"name": "test action categories",
+ "description": "new action category description"}]}]
+
+# meta_rules import is needed otherwise the search for data do not work !!!
+PRE_DATA = {"models": [{"name": "test model", "description": "",
+ "meta_rules": [{"name": "good meta rule"},
+ {"name": "other good meta rule"}]}],
+ "policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"},
+ {"name": "other test subject categories",
+ "description": "subject category description"}],
+ "object_categories": [
+ {"name": "test object categories", "description": "object category description"},
+ {"name": "other test object categories",
+ "description": "object category description"}],
+ "action_categories": [
+ {"name": "test action categories", "description": "action category description"},
+ {"name": "other test action categories",
+ "description": "action category description"}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]},
+ {"name": "other good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "other test subject categories"}],
+ "object_categories": [{"name": "other test object categories"}],
+ "action_categories": [{"name": "other test action categories"}]}]}
+
+SUBJECT_DATA = [{"subject_data": [
+ {"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]},
+ {"subject_data": [
+ {"name": "not valid subject data", "description": "", "policies": [{}],
+ "category": {"name": "test subject categories"}}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "subject_data": [
+ {"name": "one valid subject data", "description": "description",
+ "policies": [{}], "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test subject categories"}}]}]
+
+OBJECT_DATA = [{"object_data": [
+ {"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]},
+ {"object_data": [
+ {"name": "not valid object data", "description": "", "policies": [{}],
+ "category": {"name": "test object categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "object_data": [
+ {"name": "one valid object data", "description": "description", "policies": [{}],
+ "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test object categories"}}]}]
+
+ACTION_DATA = [{"action_data": [
+ {"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]},
+ {"action_data": [
+ {"name": "not valid action data", "description": "", "policies": [{}],
+ "category": {"name": "test action categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "action_data": [
+ {"name": "one valid action data", "description": "description", "policies": [{}],
+ "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test action categories"}}]}]
+
+PRE_META_RULES = {"subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories",
+ "description": "object category description"}],
+ "action_categories": [{"name": "test action categories",
+ "description": "object action description"}]}
+
+META_RULES = [{"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "not valid category"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "not valid category"}],
+ "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "not valid category"}]}]},
+ {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}]}]
+
+PRE_ASSIGNMENTS = {"models": [
+ {"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}],
+ "policies": [
+ {"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [{"name": "test subject categories",
+ "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories",
+ "description": "object category description"}],
+ "action_categories": [{"name": "test action categories",
+ "description": "object action description"}],
+ "subjects": [{"name": "testuser", "description": "description of the subject",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object", "description": "description of the object",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action", "description": "description of the action",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}],
+ "subject_data": [{"name": "subject data", "description": "test subject data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "object data", "description": "test object data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "action data", "description": "test action data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test action categories"}}]}
+
+SUBJECT_ASSIGNMENTS = [
+ {"subject_assignments": [
+ {"subject": {"name": "unknown"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": None
+ }]
+
+OBJECT_ASSIGNMENTS = [
+ {"object_assignments": [
+ {"object": {"name": "unknown"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": None
+ }]
+
+ACTION_ASSIGNMENTS = [
+ {"action_assignments": [
+ {"action": {"name": "unknown"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": None
+ }]
+
+RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test "
+ "policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown "
+ "policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "unknown subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "unknown object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "unknown action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": ({"decision": "grant"},), "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]}]
+
+
+def test_import_models_without_new_meta_rules():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ import_export_helper.clean_all()
+ counter = 0
+ for models_description in MODEL_WITHOUT_META_RULES:
+ data = import_to_db.import_json(body=models_description)
+ assert "models" in data
+ models = _cache.get_models()
+ assert len(list(models.keys())) == 1
+ values = list(models.values())
+ assert values[0]["name"] == "test model"
+ if counter == 0:
+ assert len(values[0]["description"]) == 0
+ if counter == 1 or counter == 2:
+ assert values[0]["description"] == "new description"
+ counter = counter + 1
+ import_export_helper.clean_all()
+
+
+def test_import_policies():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ counter = -1
+ for policy_description in POLICIES:
+ counter = counter + 1
+ if counter == 2:
+ with pytest.raises(exceptions.UnknownName):
+ import_to_db.import_json(body=policy_description)
+ continue
+ else:
+ data = import_to_db.import_json(body=policy_description)
+ assert "policies" in data
+ if counter == 2:
+ assert "models" in data
+
+ policies = db_driver.PolicyManager.get_policies(moon_user_id="admin")
+ assert len(list(policies.keys())) == 1
+ values = list(policies.values())
+ assert values[0]["name"] == "test policy"
+ if counter < 3:
+ assert values[0]["genre"] == "authz"
+ assert values[0]["description"] == "description"
+ else:
+ assert values[0]["genre"] == "not authz ?"
+ assert values[0]["description"] == "changes taken into account"
+ assert len(values[0]["model_id"]) > 0
+
+
+def test_import_subject_object_action():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECTS
+ clean_method = import_export_helper.clean_subjects
+ name = "testuser"
+ key_extra = "email"
+ value_extra = "new-email@test.com"
+ elif type_element == "object":
+ elements = OBJECTS
+ clean_method = import_export_helper.clean_objects
+ name = "test object"
+ key_extra = "test"
+ value_extra = "test extra"
+ else:
+ elements = ACTIONS
+ clean_method = import_export_helper.clean_actions
+ name = "test action"
+ key_extra = "test"
+ value_extra = "test extra"
+
+ for element in elements:
+ counter = counter + 1
+ if counter == 2 or counter == 4:
+ clean_method()
+ if counter == 3:
+ clean_method()
+ data = import_to_db.import_json(body=element)
+ elif counter < 2:
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ import_to_db.import_json(body=element)
+ assert '400: Policy Unknown' == str(exception_info.value)
+ continue
+ else:
+ data = import_to_db.import_json(body=element)
+ assert data
+
+ if counter != 3:
+ assert type_element+"s" in data
+ assert "policies" in data
+
+ get_elements = getattr(_cache, type_element + "s")
+
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == name
+ if counter == 2 or counter == 4:
+ assert values[0]["description"] == "description of the " + type_element
+ if counter == 3:
+ assert values[0]["description"] == "new description of the " + type_element
+ assert values[0]["extra"][key_extra] == value_extra
+
+ assert len(values[0]["policy_list"]) == 1
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action_categories():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECT_CATEGORIES
+ elif type_element == "object":
+ elements = OBJECT_CATEGORIES
+ else:
+ elements = ACTION_CATEGORIES
+
+ for element in elements:
+ data = import_to_db.import_json(body=element)
+ counter = counter + 1
+ assert type_element + "_categories" in data
+ get_elements = getattr(_cache, type_element + "_categories")
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == "test " + type_element + " categories"
+ assert values[0]["description"] == type_element + " category description"
+
+
+def test_import_meta_rules():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ import_export_helper.clean_all()
+ # import some categories
+ data = import_to_db.import_json(body=PRE_META_RULES)
+ for cat in ['subject_categories', 'object_categories', 'action_categories']:
+ assert cat in data
+
+ counter = -1
+ for meta_rule in META_RULES:
+ counter = counter + 1
+ if counter != 3:
+ with pytest.raises(exceptions.UnknownName) as exception_info:
+ import_to_db.import_json(body=meta_rule)
+ assert '400: Unknown Name.' == str(exception_info.value)
+ continue
+ else:
+ data = import_to_db.import_json(body=meta_rule)
+ assert "meta_rules" in data
+ assert _cache.meta_rules
+
+ meta_rules = _cache.meta_rules
+ key = list(meta_rules.keys())[0]
+ assert isinstance(meta_rules, dict)
+ assert meta_rules[key]["name"] == "good meta rule"
+ assert meta_rules[key]["description"] == "valid meta rule"
+ assert len(meta_rules[key]["subject_categories"]) == 1
+ assert len(meta_rules[key]["object_categories"]) == 1
+ assert len(meta_rules[key]["action_categories"]) == 1
+
+ subject_category_key = meta_rules[key]["subject_categories"][0]
+ object_category_key = meta_rules[key]["object_categories"][0]
+ action_category_key = meta_rules[key]["action_categories"][0]
+
+ sub_cat = _cache.subject_categories
+ assert sub_cat[subject_category_key]["name"] == "test subject categories"
+
+ ob_cat = _cache.object_categories
+ assert ob_cat[object_category_key]["name"] == "test object categories"
+
+ ac_cat = _cache.action_categories
+ assert ac_cat[action_category_key]["name"] == "test action categories"
+
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action_assignments():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ import_export_helper.clean_all()
+
+ data = import_to_db.import_json(body=PRE_ASSIGNMENTS)
+ for item in PRE_ASSIGNMENTS.keys():
+ assert item in data
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ counter = -1
+ if type_element == "subject":
+ datas = SUBJECT_ASSIGNMENTS
+ elif type_element == "object":
+ datas = OBJECT_ASSIGNMENTS
+ else:
+ datas = ACTION_ASSIGNMENTS
+
+ for assignments in datas:
+ counter = counter + 1
+ my_exception = assignments.pop("exception")
+ if my_exception:
+ with pytest.raises(my_exception) as exception_info:
+ import_to_db.import_json(body=assignments)
+ assert '400:' in str(exception_info.value)
+ else:
+ data = import_to_db.import_json(body=assignments)
+ assert type_element+"_assignments" in data
+ assert getattr(_cache, type_element+"_assignments")
+
+ assert len(getattr(_cache, type_element+"_assignments")) == 1
+
+
+def test_import_rules():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ import_export_helper.clean_all()
+
+ data = import_to_db.import_json(body=PRE_ASSIGNMENTS)
+ for item in PRE_ASSIGNMENTS.keys():
+ assert item in data
+
+ counter = -1
+ for rule in RULES:
+ counter = counter + 1
+ if counter < 5:
+ with pytest.raises(exceptions.UnknownName) as exception_info:
+ import_to_db.import_json(body=rule)
+
+ assert '400: Unknown Name.' == str(exception_info.value)
+ continue
+ data = import_to_db.import_json(body=rule)
+ assert "rules" in data
+ policies = _cache.policies
+ policy_id = None
+ for policy in policies:
+ if policies[policy]['name'] == rule['rules'][0]['policy']['name']:
+ policy_id = policy
+ break
+
+ assert policy_id
+ rules = []
+ for _rule in _cache.rules:
+ if policy_id == _rule['policy_id']:
+ rules.append(_rule)
+ assert len(rules) == 1
+ rule_content = rules[0]
+ assert policy_id == rule_content["policy_id"]
+ assert rule_content["rules"]
+ assert len(rule_content["rules"]) == 1
+ # FIXME: this is different from cache tests (where we have a "value" attribute)
+ assert rule_content["rules"][0]["enabled"]
+ assert rule_content["rules"][0]["instructions"][0]["decision"] == "grant"
+
+ meta_rules = _cache.meta_rules
+ assert meta_rules[list(meta_rules.keys())[0]]["name"] == "good meta rule"
+
+
+def test_import_subject_object_action_data():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ data = import_to_db.import_json(body=PRE_DATA)
+ for key in PRE_DATA.keys():
+ assert key in data
+ counter = -1
+ if type_element == "subject":
+ elements = SUBJECT_DATA
+ get_method = _cache.get_subject_data
+ get_categories = _cache.subject_categories
+ elif type_element == "object":
+ elements = OBJECT_DATA
+ get_method = _cache.get_object_data
+ get_categories = _cache.object_categories
+ else:
+ elements = ACTION_DATA
+ get_method = _cache.get_action_data
+ get_categories = _cache.action_categories
+
+ for element in elements:
+ counter = counter + 1
+ if counter == 0 or counter == 1:
+ with pytest.raises(exceptions.MissingIdOrName) as exception_info:
+ import_to_db.import_json(body=element)
+ assert '400: Missing ID or Name.' == str(exception_info.value)
+ continue
+ else:
+ data = import_to_db.import_json(body=element)
+ for key in element.keys():
+ assert key in data
+
+ policies = _cache.policies
+ categories = get_categories
+ case_tested = False
+ for policy_key in policies.keys():
+ policy = policies[policy_key]
+ for category_key in categories:
+ get_elements = get_method(policy_id=policy_key, category_id=category_key)
+ _all_data = []
+ for _data in get_elements:
+ if category_key == _data["category_id"] and \
+ policy_key == _data["policy_id"]:
+ _all_data.append(_data)
+ get_elements = _all_data
+ if not get_elements:
+ continue
+ if policy["name"] == "test policy":
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ if counter == 2:
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "one valid " + type_element + " data"
+ if counter == 3:
+ assert len(el["data"].keys()) == 2
+ el1 = el["data"][list(el["data"].keys())[0]]
+ el2 = el["data"][list(el["data"].keys())[1]]
+ if "value" in el1:
+ el1 = el1["value"]
+ el2 = el2["value"]
+ assert (el1["name"] == "one valid " + type_element + " data" and
+ el2["name"] == "valid " + type_element + " data") or \
+ (el2["name"] == "one valid " + type_element + " data" and
+ el1["name"] == "valid " + type_element + " data")
+ assert el1["description"] == "description"
+ assert el2["description"] == "description"
+
+ case_tested = True
+ break
+
+ if policy["name"] == "test other policy":
+ if counter == 4:
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "valid " + type_element + " data"
+ assert el["description"] == "new description"
+ case_tested = True
+ break
+
+ assert case_tested is True
+
+
+def test_clean():
+ from moon_utilities import json_utils
+ from moon_manager import db_driver
+ import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver)
+ _cache = import_to_db.driver
+ import_export_helper.clean_all()
+ assert not _cache.subject_categories
+ assert not _cache.object_categories
+ assert not _cache.action_categories
+ assert not _cache.subjects
+ assert not _cache.objects
+ assert not _cache.actions
+ assert not _cache.subject_data
+ assert not _cache.object_data
+ assert not _cache.action_data
+ assert not _cache.subject_assignments
+ assert not _cache.object_assignments
+ assert not _cache.action_assignments
+ assert not _cache.models
+ assert not _cache.pdp
+ assert not _cache.policies
diff --git a/moon_utilities/tests/unit_python/api/test_invalidate_function.py b/moon_utilities/tests/unit_python/api/test_invalidate_function.py
new file mode 100644
index 00000000..ed0357dc
--- /dev/null
+++ b/moon_utilities/tests/unit_python/api/test_invalidate_function.py
@@ -0,0 +1,64 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import moon_utilities.invalided_functions
+
+
+def test_invalidate_assignment_in_slaves(slaves):
+ result = moon_utilities.invalided_functions.invalidate_assignment_in_slaves(
+ slaves, "098764321", "098764321", "098764321", "098764321", 'subject')
+ assert result
+ assert "slave_test" in list(result)
+
+
+def test_invalidate_data_in_slaves(slaves):
+ result = moon_utilities.invalided_functions.invalidate_data_in_slaves(
+ slaves, "__policy_id__", "__category_id__", "098764321", "subject")
+ assert result
+ assert "slave_test" in list(result)
+
+
+def test_invalidate_perimeter_in_slaves(slaves):
+ result = moon_utilities.invalided_functions.invalidate_perimeter_in_slaves(
+ slaves, "098764321", "098764321", "subject", is_delete=False)
+ assert result
+ assert "slave_test" in list(result)
+
+
+def test_invalidate_pdp_in_slaves(slaves):
+ result = moon_utilities.invalided_functions.invalidate_pdp_in_slaves(
+ slaves, "098764321", is_delete=False)
+ assert result
+ assert "slave_test" in list(result)
+
+
+def test_invalidate_policy_in_slaves(slaves):
+ result = moon_utilities.invalided_functions.invalidate_policy_in_slaves(
+ slaves, "098764321", is_delete=False)
+ assert result
+ assert "slave_test" in list(result)
+
+
+def test_invalidate_rules_in_slaves():
+ pass
+
+
+def test_invalidate_model_in_slaves():
+ pass
+
+
+def test_invalidate_meta_data_in_slaves():
+ pass
+
+
+def test_invalidate_meta_rule_in_slaves():
+ pass
diff --git a/moon_utilities/tests/unit_python/api/test_security_functions.py b/moon_utilities/tests/unit_python/api/test_security_functions.py
new file mode 100644
index 00000000..469a70a2
--- /dev/null
+++ b/moon_utilities/tests/unit_python/api/test_security_functions.py
@@ -0,0 +1,18 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from moon_utilities.security_functions import enforce
+
+
+def test_enforce():
+ # TODO: implement the test
+ assert True
diff --git a/moon_utilities/tests/unit_python/conftest.py b/moon_utilities/tests/unit_python/conftest.py
new file mode 100644
index 00000000..da55d3ff
--- /dev/null
+++ b/moon_utilities/tests/unit_python/conftest.py
@@ -0,0 +1,172 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import mock_slaves
+import os
+import pytest
+import requests_mock
+import yaml
+
+
+__CONF = """
+debug: true
+
+database:
+ url: sqlite:////tmp/database_test.db
+ driver: moon_manager.plugins.sql
+ migration_dir: moon_manager.api.db.migrations
+
+dashboard:
+ root: ../dashboard/www/
+
+management:
+ url: http://127.0.0.1:8000
+ user: admin
+ password: admin
+ token_file: db.json
+
+orchestration:
+ driver: moon_manager.plugins.pyorchestrator
+ connection: local
+ port: 10000...10100
+ config_dir: /tmp
+
+information:
+ openstack:
+ driver: moon_manager.plugins.moon_openstack_plugin
+ url: http://keystone:5000/v3
+ user: admin
+ password: p4ssw0rd
+ domain: default
+ project: admin
+ check_token: false
+ certificate: false
+ global_attrs:
+ driver: moon_manager.plugins.global_attrs
+ attributes:
+ mode:
+ values:
+ - build
+ - run
+ default: build
+ url: file:/etc/moon/mode
+ #url: https://127.0.0.1:8080/mode
+ #url: mysql+pymysql://moon:p4sswOrd1@db/moon_mode
+ #url: sqlite:////tmp/database.db
+ #url: driver://moon_manager.plugins.my_plugin
+
+plugins:
+ directory: /var/moon/plugins
+
+components:
+ manager:
+ port: 8080
+ bind: 0.0.0.0
+ hostname: manager
+
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: "%(levelname)s %(name)s %(message)-30s"
+ custom:
+ format: "%(asctime)-15s %(levelname)s %(name)s %(message)s"
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ formatter: custom
+ level : INFO
+ stream : ext://sys.stdout
+ file:
+ class : logging.handlers.RotatingFileHandler
+ formatter: custom
+ level : DEBUG
+ filename: /tmp/moon.log
+ maxBytes: 1048576
+ backupCount: 3
+
+ loggers:
+ moon:
+ level: DEBUG
+ handlers: [console, file]
+ propagate: no
+
+ root:
+ level: ERROR
+ handlers: [console]
+"""
+
+
+@pytest.fixture
+def slaves():
+ return {
+ "slaves": {
+ "d464cc58a0cd46dea3191ba70f4e7df8": {
+ "name": "slave_test",
+ "address": "",
+ "description": "...",
+ "extra": {
+ "description": "...",
+ "starttime": 1543851265.76279,
+ "port": 10000,
+ "server_ip": "127.0.0.1",
+ "status": "up",
+ "api_key": "e58a882a6b658a22660f00a0c273e7f6b4c4eb5abe54eccba2cae307905d67e374"
+ "6537bd790c41887e11840c2d186b6d6eeec0e426bcfa7a872cc3417a35124a"
+ }
+ }
+ }
+ }
+
+
+@pytest.fixture(autouse=True)
+def no_requests(monkeypatch):
+ """ Modify the response from Requests module
+ """
+ with requests_mock.Mocker(real_http=True) as m:
+ try:
+ os.remove("/tmp/database_test.db")
+ except FileNotFoundError:
+ pass
+ try:
+ os.remove("/tmp/moon.pwd")
+ except FileNotFoundError:
+ pass
+ print("Configure...")
+ from moon_manager.api.configuration import init_database, set_configuration
+ set_configuration(yaml.safe_load(__CONF))
+ print("Create a new user")
+ from moon_utilities.auth_functions import add_user, init_db, get_api_key_for_user
+ init_db()
+ # try:
+ # user = add_user("admin", "admin")
+ # manager_api_key = user["api_key"]
+ # except KeyError:
+ # print("User already exists")
+ # manager_api_key = get_api_key_for_user("admin")
+ print("Initialize the database")
+ init_database()
+ from moon_manager import db_driver
+ db_driver.init()
+
+ mock_slaves.register_slaves(m)
+
+ print("End registering URI")
+
+ # from moon_manager.pip_driver import InformationManager
+ # InformationManager.set_auth()
+
+ yield m
+
+ # InformationManager.unset_auth()
diff --git a/moon_utilities/tests/unit_python/helpers/__init__.py b/moon_utilities/tests/unit_python/helpers/__init__.py
new file mode 100644
index 00000000..2a5459c2
--- /dev/null
+++ b/moon_utilities/tests/unit_python/helpers/__init__.py
@@ -0,0 +1,13 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+
diff --git a/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py b/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py
new file mode 100644
index 00000000..ff22abe2
--- /dev/null
+++ b/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py
@@ -0,0 +1,234 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import logging
+
+logger = logging.getLogger("moon.manager.test.api." + __name__)
+
+
+def clean_models():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ keys = list(_cache.models.keys())
+ for key in keys:
+ _cache.delete_model(key)
+
+
+def clean_policies():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ keys = list(_cache.policies.keys())
+ for key in keys:
+ _cache.delete_policy(key)
+
+
+def clean_subjects():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ policy_keys = list(_cache.policies.keys())
+ subjects = _cache.subjects
+ for policy_key in policy_keys:
+ for key in subjects:
+ try:
+ _cache.delete_subject(policy_key, key)
+ except AttributeError:
+ pass
+ _cache.delete_subject()
+
+
+def clean_objects():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ policy_keys = list(_cache.policies.keys())
+ objects = _cache.objects
+ for policy_key in policy_keys:
+ for key in objects:
+ try:
+ _cache.delete_object(policy_key, key)
+ except AttributeError:
+ pass
+ _cache.delete_object()
+
+
+def clean_actions():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ policy_keys = list(_cache.policies.keys())
+ actions = _cache.actions
+ for policy_key in policy_keys:
+ for key in actions:
+ try:
+ _cache.delete_action(policy_key, key)
+ except AttributeError:
+ pass
+ _cache.delete_action()
+
+
+def clean_subject_categories():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.subject_categories.keys())
+ for key in categories:
+ _cache.delete_subject_category(key)
+
+
+def clean_object_categories():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.object_categories.keys())
+ for key in categories:
+ _cache.delete_object_category(key)
+
+
+def clean_action_categories():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.action_categories.keys())
+ for key in categories:
+ _cache.delete_action_category(key)
+
+
+def clean_subject_data():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ _cache.delete_subject_data()
+
+
+def clean_object_data():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ _cache.delete_object_data()
+
+
+def clean_action_data():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ _cache.delete_action_data()
+
+
+def clean_meta_rule():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.meta_rules.keys())
+ for key in categories:
+ _cache.delete_meta_rule(key)
+
+
+def clean_subject_assignments():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ if not _cache.subject_assignments:
+ return
+ for policy_key in _cache.policies.keys():
+ if policy_key not in _cache.subject_assignments:
+ continue
+ for key in _cache.subject_assignments[policy_key]:
+ # policy_key = _cache.subject_assignments[policy_key][key]["policy_id"]
+ subject_key = _cache.subject_assignments[policy_key][key]["subject_id"]
+ cat_key = _cache.subject_assignments[policy_key][key]["category_id"]
+ data_keys = _cache.subject_assignments[policy_key][key]["assignments"]
+ for data_key in data_keys:
+ _cache.delete_subject_assignment(
+ policy_id=policy_key,
+ perimeter_id=subject_key,
+ category_id=cat_key,
+ data_id=data_key
+ )
+ _cache.delete_subject_assignment()
+
+
+def clean_object_assignments():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ if not _cache.object_assignments:
+ return
+ for policy_key in _cache.policies.keys():
+ if policy_key not in _cache.object_assignments:
+ continue
+ for key in _cache.object_assignments[policy_key]:
+ # policy_key = _cache.object_assignments[policy_key][key]["policy_id"]
+ object_key = _cache.object_assignments[policy_key][key]["object_id"]
+ cat_key = _cache.object_assignments[policy_key][key]["category_id"]
+ data_keys = _cache.object_assignments[policy_key][key]["assignments"]
+ for data_key in data_keys:
+ _cache.delete_object_assignment(
+ policy_id=policy_key,
+ perimeter_id=object_key,
+ category_id=cat_key,
+ data_id=data_key
+ )
+ _cache.delete_object_assignment()
+
+
+def clean_action_assignments():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ if not _cache.action_assignments:
+ return
+ for policy_key in _cache.policies.keys():
+ if policy_key not in _cache.action_assignments:
+ continue
+ for key in _cache.action_assignments[policy_key]:
+ action_key = _cache.action_assignments[policy_key][key]["action_id"]
+ cat_key = _cache.action_assignments[policy_key][key]["category_id"]
+ data_keys = _cache.action_assignments[policy_key][key]["assignments"]
+ for data_key in data_keys:
+ _cache.delete_action_assignment(
+ policy_id=policy_key,
+ perimeter_id=action_key,
+ category_id=cat_key,
+ data_id=data_key
+ )
+ _cache.delete_action_assignment()
+
+
+def clean_rules():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ rules = list(_cache.rules.keys())
+ for key in rules:
+ _cache.delete_rule(key)
+
+
+def clean_all():
+ clean_rules()
+
+ clean_subject_assignments()
+ clean_object_assignments()
+ clean_action_assignments()
+
+ clean_subject_data()
+ clean_object_data()
+ clean_action_data()
+
+ clean_actions()
+ clean_objects()
+ clean_subjects()
+
+ clean_policies()
+ clean_models()
+ clean_meta_rule()
+
+ clean_subject_categories()
+ clean_object_categories()
+ clean_action_categories()
diff --git a/moon_utilities/tests/unit_python/helpers/import_export_db_helper.py b/moon_utilities/tests/unit_python/helpers/import_export_db_helper.py
new file mode 100644
index 00000000..ecb9fa26
--- /dev/null
+++ b/moon_utilities/tests/unit_python/helpers/import_export_db_helper.py
@@ -0,0 +1,178 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import logging
+
+logger = logging.getLogger("moon.manager.test.api." + __name__)
+
+
+def clean_models():
+ from moon_manager import db_driver as driver
+ keys = driver.ModelManager.get_models(moon_user_id="admin")
+ for key in keys:
+ driver.ModelManager.delete_model(moon_user_id="admin", model_id=key)
+
+
+def clean_policies():
+ from moon_manager import db_driver as driver
+ keys = driver.PolicyManager.get_policies(moon_user_id="admin")
+ for key in keys:
+ driver.PolicyManager.delete_policy(moon_user_id="admin", policy_id=key)
+
+
+def clean_subjects():
+ from moon_manager import db_driver as driver
+ for policy in driver.PolicyManager.get_policies(moon_user_id="admin"):
+ subjects = driver.PolicyManager.get_subjects(moon_user_id="admin", policy_id=policy)
+ for key in subjects:
+ driver.PolicyManager.delete_subject(moon_user_id="admin", policy_id=policy,
+ perimeter_id=key)
+
+
+def clean_objects():
+ from moon_manager import db_driver as driver
+ for policy in driver.PolicyManager.get_policies(moon_user_id="admin"):
+ objects = driver.PolicyManager.get_objects(moon_user_id="admin", policy_id=policy)
+ for key in objects:
+ driver.PolicyManager.delete_object(moon_user_id="admin", policy_id=policy,
+ perimeter_id=key)
+
+
+def clean_actions():
+ from moon_manager import db_driver as driver
+ for policy in driver.PolicyManager.get_policies(moon_user_id="admin"):
+ actions = driver.PolicyManager.get_actions(moon_user_id="admin", policy_id=policy)
+ for key in actions:
+ driver.PolicyManager.delete_action(moon_user_id="admin", policy_id=policy,
+ perimeter_id=key)
+
+
+def clean_subject_categories():
+ from moon_manager import db_driver as driver
+ categories = driver.ModelManager.get_subject_categories(moon_user_id="admin")
+
+ for key in categories:
+ driver.ModelManager.delete_subject_category(moon_user_id="admin", category_id=key)
+
+
+def clean_object_categories():
+ from moon_manager import db_driver as driver
+ categories = driver.ModelManager.get_object_categories(moon_user_id="admin")
+
+ for key in categories:
+ driver.ModelManager.delete_object_category(moon_user_id="admin", category_id=key)
+
+
+def clean_action_categories():
+ from moon_manager import db_driver as driver
+ categories = driver.ModelManager.get_action_categories(moon_user_id="admin")
+
+ for key in categories:
+ driver.ModelManager.delete_action_category(moon_user_id="admin", category_id=key)
+
+
+def clean_subject_data():
+ from moon_manager import db_driver as driver
+ policies = driver.PolicyManager.get_policies(moon_user_id="admin")
+ categories = driver.ModelManager.get_subject_categories(moon_user_id="admin")
+ for policy in policies:
+ for category in categories:
+ data_object = driver.PolicyManager.get_subject_data(
+ moon_user_id="admin", policy_id=policy, category_id=category)
+ for data_item in data_object:
+ for data in data_item.get("data", {}):
+ driver.PolicyManager.delete_subject_data(moon_user_id="admin", policy_id=policy,
+ category_id=category, data_id=data)
+
+
+def clean_object_data():
+ from moon_manager import db_driver as driver
+ policies = driver.PolicyManager.get_policies(moon_user_id="admin")
+ categories = driver.ModelManager.get_object_categories(moon_user_id="admin")
+ for policy in policies:
+ for category in categories:
+ data_object = driver.PolicyManager.get_object_data(
+ moon_user_id="admin", policy_id=policy, category_id=category)
+ for data_item in data_object:
+ for data in data_item.get("data", {}):
+ driver.PolicyManager.delete_object_data(moon_user_id="admin", policy_id=policy,
+ category_id=category, data_id=data)
+
+
+def clean_action_data():
+ from moon_manager import db_driver as driver
+ policies = driver.PolicyManager.get_policies(moon_user_id="admin")
+ categories = driver.ModelManager.get_action_categories(moon_user_id="admin")
+ for policy in policies:
+ for category in categories:
+ data_object = driver.PolicyManager.get_action_data(
+ moon_user_id="admin", policy_id=policy, category_id=category)
+ for data_item in data_object:
+ for data in data_item.get("data", {}):
+ driver.PolicyManager.delete_action_data(moon_user_id="admin", policy_id=policy,
+ category_id=category, data_id=data)
+
+
+def clean_meta_rule():
+ from moon_manager import db_driver as driver
+ keys = driver.ModelManager.get_meta_rules(moon_user_id="admin")
+
+ for key in keys:
+ driver.ModelManager.delete_meta_rule(moon_user_id="admin", meta_rule_id=key)
+
+
+def clean_subject_assignments():
+ pass
+
+
+def clean_object_assignments():
+ pass
+
+
+def clean_action_assignments():
+ pass
+
+
+def clean_rules():
+ from moon_manager import db_driver as driver
+
+ policies = driver.PolicyManager.get_policies(moon_user_id="admin")
+
+ for policy in policies:
+ rules = driver.PolicyManager.get_rules(moon_user_id="admin", policy_id=policy)
+
+ for rule in rules:
+ driver.PolicyManager.delete_rule(moon_user_id="admin", policy_id=policy, rule_id=rule)
+
+
+def clean_all():
+ clean_rules()
+
+ clean_subject_assignments()
+ clean_object_assignments()
+ clean_action_assignments()
+
+ clean_subject_data()
+ clean_object_data()
+ clean_action_data()
+
+ clean_actions()
+ clean_objects()
+ clean_subjects()
+
+ clean_policies()
+ clean_models()
+ clean_meta_rule()
+
+ clean_subject_categories()
+ clean_object_categories()
+ clean_action_categories()
diff --git a/moon_utilities/tests/unit_python/helpers/slaves_helpers.py b/moon_utilities/tests/unit_python/helpers/slaves_helpers.py
new file mode 100644
index 00000000..1b2764d2
--- /dev/null
+++ b/moon_utilities/tests/unit_python/helpers/slaves_helpers.py
@@ -0,0 +1,32 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+
+SLAVES = {
+ "slaves": {
+ "d464cc58a0cd46dea3191ba70f4e7df8": {
+ "name": "slave_test",
+ "address": "",
+ "description": "...",
+ "extra": {
+ "description": "...",
+ "starttime": 1543851265.76279,
+ "port": 10000,
+ "server_ip": "127.0.0.1",
+ "status": "down",
+ "api_key": "e58a882a6b658a22660f00a0c273e7f6b4c4eb5abe54eccba2cae307905d67e3746537"
+ "bd790c41887e11840c2d186b6d6eeec0e426bcfa7a872cc3417a35124a"
+ }
+ }
+ }
+}
+
diff --git a/moon_utilities/tests/unit_python/mock_slaves.py b/moon_utilities/tests/unit_python/mock_slaves.py
new file mode 100644
index 00000000..f3ac04d8
--- /dev/null
+++ b/moon_utilities/tests/unit_python/mock_slaves.py
@@ -0,0 +1,50 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+
+def register_slaves(m):
+ m.register_uri(
+ 'DELETE', 'http://127.0.0.1:10000/update/assignment/098764321/subject/098764321/098764321/098764321',
+ json={}
+ )
+ m.register_uri(
+ 'DELETE', 'http://127.0.0.1:10000/update/data/098764321/subject',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/perimeter/098764321/098764321/subject',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/pdp/098764321',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/policy/098764321',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/rules/1234567890',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/model/1234567890',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/meta_data/1234567890',
+ json={}
+ )
+ m.register_uri(
+ 'PUT', 'http://127.0.0.1:10000/update/meta_rule/1234567890',
+ json={}
+ )
diff --git a/moon_utilities/tests/unit_python/requirements.txt b/moon_utilities/tests/unit_python/requirements.txt
new file mode 100644
index 00000000..91cbc2c0
--- /dev/null
+++ b/moon_utilities/tests/unit_python/requirements.txt
@@ -0,0 +1,10 @@
+pytest
+pbr
+pytest-cov
+cliff
+requests_mock
+tinydb
+moon_manager
+moon_cache
+sqlalchemy
+pymysql \ No newline at end of file