From 7bb53c64da2dcf88894bfd31503accdd81498f3d Mon Sep 17 00:00:00 2001 From: Thomas Duval Date: Wed, 3 Jun 2020 10:06:52 +0200 Subject: Update to new version 5.4 Signed-off-by: Thomas Duval Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea --- moon_utilities/tests/unit_python/api/__init__.py | 12 + .../tests/unit_python/api/test_auth_functions.py | 83 +++ .../tests/unit_python/api/test_import_to_cache.py | 775 +++++++++++++++++++++ .../tests/unit_python/api/test_import_to_db.py | 772 ++++++++++++++++++++ .../unit_python/api/test_invalidate_function.py | 64 ++ .../unit_python/api/test_security_functions.py | 18 + moon_utilities/tests/unit_python/conftest.py | 172 +++++ .../tests/unit_python/helpers/__init__.py | 13 + .../helpers/import_export_cache_helper.py | 234 +++++++ .../unit_python/helpers/import_export_db_helper.py | 178 +++++ .../tests/unit_python/helpers/slaves_helpers.py | 32 + moon_utilities/tests/unit_python/mock_slaves.py | 50 ++ moon_utilities/tests/unit_python/requirements.txt | 10 + 13 files changed, 2413 insertions(+) create mode 100644 moon_utilities/tests/unit_python/api/__init__.py create mode 100644 moon_utilities/tests/unit_python/api/test_auth_functions.py create mode 100644 moon_utilities/tests/unit_python/api/test_import_to_cache.py create mode 100644 moon_utilities/tests/unit_python/api/test_import_to_db.py create mode 100644 moon_utilities/tests/unit_python/api/test_invalidate_function.py create mode 100644 moon_utilities/tests/unit_python/api/test_security_functions.py create mode 100644 moon_utilities/tests/unit_python/conftest.py create mode 100644 moon_utilities/tests/unit_python/helpers/__init__.py create mode 100644 moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py create mode 100644 moon_utilities/tests/unit_python/helpers/import_export_db_helper.py create mode 100644 moon_utilities/tests/unit_python/helpers/slaves_helpers.py create mode 100644 moon_utilities/tests/unit_python/mock_slaves.py create mode 100644 moon_utilities/tests/unit_python/requirements.txt (limited to 'moon_utilities/tests') diff --git a/moon_utilities/tests/unit_python/api/__init__.py b/moon_utilities/tests/unit_python/api/__init__.py new file mode 100644 index 00000000..1856aa2c --- /dev/null +++ b/moon_utilities/tests/unit_python/api/__init__.py @@ -0,0 +1,12 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + diff --git a/moon_utilities/tests/unit_python/api/test_auth_functions.py b/moon_utilities/tests/unit_python/api/test_auth_functions.py new file mode 100644 index 00000000..70af19c1 --- /dev/null +++ b/moon_utilities/tests/unit_python/api/test_auth_functions.py @@ -0,0 +1,83 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import os +from uuid import uuid4 +import pytest +from moon_utilities.auth_functions import xor_encode, xor_decode +from moon_utilities import exceptions +from moon_utilities.auth_functions import init_db, add_user, authenticate_user +from moon_utilities.auth_functions import authenticate_key, get_api_key, del_api_key_for_user + + +def test_xor(): + uuid1 = uuid4().hex + my_key = uuid4().hex + crypted_data = xor_encode(uuid1, my_key) + assert uuid1 != crypted_data + decrypted_data = xor_decode(crypted_data, my_key) + assert uuid1 == decrypted_data + + +def test_decrypt_exceptions(): + with pytest.raises(exceptions.DecryptError): + uuid1 = uuid4().hex + my_key = uuid4().hex + crypted_data = xor_encode(uuid1, my_key) + assert uuid1 != crypted_data + my_key = False + xor_decode(crypted_data, my_key) + with pytest.raises(exceptions.DecryptError): + uuid1 = uuid4().hex + my_key = uuid4().hex + crypted_data = xor_encode(uuid1, my_key) + assert uuid1 != crypted_data + my_key = "" + xor_decode(crypted_data, my_key) + + +def test_encrypt_exceptions(): + with pytest.raises(exceptions.EncryptError): + uuid1 = uuid4().hex + my_key = False + xor_encode(uuid1, my_key) + with pytest.raises(exceptions.EncryptError): + uuid1 = uuid4().hex + my_key = "" + xor_encode(uuid1, my_key) + + +def test_auth_api(): + try: + os.remove("/tmp/test.db") + except FileNotFoundError: + pass + init_db("/tmp/test.db") + # create the user + result = add_user("test_user", "1234567890") + assert result + # trying to auth the user + assert authenticate_user("test_user", "1234567890") + assert not authenticate_user("bad_test_user", "1234567890") + assert not authenticate_key(None) + assert not authenticate_key("") + assert authenticate_key(result['api_key']) == "test_user" + assert get_api_key("test_user", "1234567890") == result['api_key'] + # logout the user + assert del_api_key_for_user("test_user") + assert get_api_key("test_user", "1234567890") != result['api_key'] + assert get_api_key("test_user", "1234567890") is None + # re-authent user + assert authenticate_user("test_user", "1234567890") + # check that the previous api_key is not valid again + assert get_api_key("test_user", "1234567890") != result['api_key'] + assert get_api_key("test_user", "1234567890") diff --git a/moon_utilities/tests/unit_python/api/test_import_to_cache.py b/moon_utilities/tests/unit_python/api/test_import_to_cache.py new file mode 100644 index 00000000..01e717fb --- /dev/null +++ b/moon_utilities/tests/unit_python/api/test_import_to_cache.py @@ -0,0 +1,775 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import helpers.import_export_cache_helper as import_export_helper +import pytest +from moon_utilities import exceptions + + +MODEL_WITHOUT_META_RULES = [ + {"models": [{"name": "test model", "description": "", "meta_rules": []}]}, + {"models": [{"name": "test model", "description": "new description", "meta_rules": [], + "override": True}]}, + {"models": [{"name": "test model", "description": "description not taken into account", + "meta_rules": [], "override": False}]} +] + +POLICIES = [ + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}]}, + {"policies": [{"name": "test policy", "genre": "authz", + "description": "new description not taken into account", + "model": {"name": "test model"}, "mandatory": True}]}, + {"policies": [ + {"name": "test policy", "genre": "not authz ?", "description": "generates an exception", + "model": {"name": "test model"}, "override": True}]}, + {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [ + {"name": "test policy", "genre": "not authz ?", "description": "changes taken into account", + "model": {"name": "test model"}, "override": True}]}, +] + +SUBJECTS = [ + {"subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {}, "mandatory": True}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"subjects": [{"name": "testuser", "description": "new description of the subject", + "extra": {"email": "new-email@test.com"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": [{"name": "test policy"}]}]}] + +OBJECTS = [ + {"objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": True}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"objects": [{"name": "test object", "description": "new description of the object", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": [{"name": "test policy"}]}]}, +] + +ACTIONS = [ + {"actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {}, "mandatory": True}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"actions": [{"name": "test action", "description": "new description of the action", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": [{"name": "test policy"}]}]}] + +SUBJECT_CATEGORIES = [{"subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}]}, + {"subject_categories": [{"name": "test subject categories", + "description": "new subject category description"}]}] + +OBJECT_CATEGORIES = [{"object_categories": [ + {"name": "test object categories", "description": "object category description"}]}, + {"object_categories": [{"name": "test object categories", + "description": "new object category description"}]}] + +ACTION_CATEGORIES = [{"action_categories": [ + {"name": "test action categories", "description": "action category description"}]}, + {"action_categories": [{"name": "test action categories", + "description": "new action category description"}]}] + +# meta_rules import is needed otherwise the search for data do not work !!! +PRE_DATA = {"models": [ + { + "name": "test model", "description": "", + "meta_rules": [{"name": "good meta rule"}, + {"name": "other good meta rule"}]}], + "policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}, + {"name": "other test subject categories", + "description": "subject category description"}], + "object_categories": [ + {"name": "test object categories", "description": "object category description"}, + {"name": "other test object categories", + "description": "object category description"}], + "action_categories": [ + {"name": "test action categories", "description": "action category description"}, + {"name": "other test action categories", + "description": "action category description"}], + "meta_rules": [ + { + "name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}] + }, + { + "name": "other good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "other test subject categories"}], + "object_categories": [{"name": "other test object categories"}], + "action_categories": [{"name": "other test action categories"}] + }]} + +SUBJECT_DATA = [{"subject_data": [ + {"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]}, + {"subject_data": [ + {"name": "not valid subject data", "description": "", "policies": [{}], + "category": {"name": "test subject categories"}}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "subject_data": [ + {"name": "one valid subject data", "description": "description", + "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test subject categories"}}]}] + +OBJECT_DATA = [{"object_data": [ + {"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]}, + {"object_data": [ + {"name": "not valid object data", "description": "", "policies": [{}], + "category": {"name": "test object categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "object_data": [ + {"name": "one valid object data", "description": "description", "policies": [{}], + "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test object categories"}}]}] + +ACTION_DATA = [{"action_data": [ + {"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]}, + {"action_data": [ + {"name": "not valid action data", "description": "", "policies": [{}], + "category": {"name": "test action categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "action_data": [ + {"name": "one valid action data", "description": "description", "policies": [{}], + "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test action categories"}}]}] + +PRE_META_RULES = {"subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", + "description": "object category description"}], + "action_categories": [{"name": "test action categories", + "description": "object action description"}]} + +META_RULES = [{"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "not valid category"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "not valid category"}], + "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "not valid category"}]}]}, + {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}]}] + +PRE_ASSIGNMENTS = {"models": [ + {"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}], + "policies": [ + {"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", + "description": "subject category description"}], + "object_categories": [{"name": "test object categories", + "description": "object category description"}], + "action_categories": [{"name": "test action categories", + "description": "object action description"}], + "subjects": [{"name": "testuser", "description": "description of the subject", + "extra": {}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", + "extra": {}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", + "extra": {}, "policies": [{"name": "test policy"}]}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}], + "subject_data": [{"name": "subject data", "description": "test subject data", + "policies": [{"name": "test policy"}], + "category": {"name": "test subject categories"}}], + "object_data": [{"name": "object data", "description": "test object data", + "policies": [{"name": "test policy"}], + "category": {"name": "test object categories"}}], + "action_data": [{"name": "action data", "description": "test action data", + "policies": [{"name": "test policy"}], + "category": {"name": "test action categories"}}]} + +SUBJECT_ASSIGNMENTS = [ + {"subject_assignments": [ + {"subject": {"name": "unknown"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "subject data"}]}], + "exception": exceptions.InvalidJson + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "subject data"}]}], + "exception": exceptions.UnknownName + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "subject data"}]}], + "exception": None + }] + +OBJECT_ASSIGNMENTS = [ + {"object_assignments": [ + {"object": {"name": "unknown"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "object data"}]}], + "exception": exceptions.InvalidJson + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "object data"}]}], + "exception": exceptions.UnknownName + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "object data"}]}], + "exception": None + }] + +ACTION_ASSIGNMENTS = [ + {"action_assignments": [ + {"action": {"name": "unknown"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "action data"}]}], + "exception": exceptions.InvalidJson + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "action data"}]}], + "exception": exceptions.UnknownName + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "action data"}]}], + "exception": None + }] + +RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test " + "policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown " + "policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "unknown subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "unknown object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "unknown action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}] + + +def test_import_models_without_new_meta_rules(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + import_export_helper.clean_all() + counter = 0 + for models_description in MODEL_WITHOUT_META_RULES: + data = import_to_cache.import_json(body=models_description) + assert "models" in data + models = _cache.models + assert len(list(models.keys())) == 1 + values = list(models.values()) + assert values[0]["name"] == "test model" + if counter == 0: + assert len(values[0]["description"]) == 0 + if counter == 1 or counter == 2: + assert values[0]["description"] == "new description" + counter = counter + 1 + import_export_helper.clean_all() + + +def test_import_policies(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + counter = -1 + for policy_description in POLICIES: + counter = counter + 1 + if counter == 2: + with pytest.raises(exceptions.UnknownName): + import_to_cache.import_json(body=policy_description) + continue + else: + data = import_to_cache.import_json(body=policy_description) + assert "policies" in data + if counter == 2: + assert "models" in data + + from moon_cache.cache import Cache + _cache = Cache.getInstance() + policies = _cache.policies + assert len(list(policies.keys())) == 1 + values = list(policies.values()) + assert values[0]["name"] == "test policy" + if counter < 3: + assert values[0]["genre"] == "authz" + assert values[0]["description"] == "description" + else: + assert values[0]["genre"] == "not authz ?" + assert values[0]["description"] == "changes taken into account" + assert len(values[0]["model_id"]) > 0 + + +def test_import_subject_object_action(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECTS + clean_method = import_export_helper.clean_subjects + name = "testuser" + key_extra = "email" + value_extra = "new-email@test.com" + elif type_element == "object": + elements = OBJECTS + clean_method = import_export_helper.clean_objects + name = "test object" + key_extra = "test" + value_extra = "test extra" + else: + elements = ACTIONS + clean_method = import_export_helper.clean_actions + name = "test action" + key_extra = "test" + value_extra = "test extra" + + for element in elements: + counter = counter + 1 + if counter == 2 or counter == 4: + clean_method() + if counter == 3: + clean_method() + data = import_to_cache.import_json(body=element) + elif counter < 2: + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + import_to_cache.import_json(body=element) + assert '400: Policy Unknown' == str(exception_info.value) + continue + else: + data = import_to_cache.import_json(body=element) + assert data + + if counter != 3: + assert type_element+"s" in data + assert "policies" in data + + get_elements = getattr(_cache, type_element + "s") + + policy_key = list(get_elements.keys())[0] + assert len(list(get_elements[policy_key].keys())) == 1 + values = list(get_elements[policy_key].values()) + assert values[0]["name"] == name + if counter == 2 or counter == 4: + assert values[0]["description"] == "description of the " + type_element + if counter == 3: + assert values[0]["description"] == "new description of the " + type_element + assert values[0]["extra"][key_extra] == value_extra + + assert len(values[0]["policy_list"]) == 1 + import_export_helper.clean_all() + + +def test_import_subject_object_action_categories(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_CATEGORIES + elif type_element == "object": + elements = OBJECT_CATEGORIES + else: + elements = ACTION_CATEGORIES + + for element in elements: + data = import_to_cache.import_json(body=element) + counter = counter + 1 + assert type_element + "_categories" in data + get_elements = getattr(_cache, type_element + "_categories") + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == "test " + type_element + " categories" + assert values[0]["description"] == type_element + " category description" + + +def test_import_meta_rules(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + import_export_helper.clean_all() + # import some categories + data = import_to_cache.import_json(body=PRE_META_RULES) + for cat in ['subject_categories', 'object_categories', 'action_categories']: + assert cat in data + + counter = -1 + for meta_rule in META_RULES: + counter = counter + 1 + if counter != 3: + with pytest.raises(exceptions.UnknownName) as exception_info: + import_to_cache.import_json(body=meta_rule) + assert '400: Unknown Name.' == str(exception_info.value) + continue + else: + data = import_to_cache.import_json(body=meta_rule) + assert "meta_rules" in data + assert _cache.meta_rules + + meta_rules = _cache.meta_rules + key = list(meta_rules.keys())[0] + assert isinstance(meta_rules, dict) + assert meta_rules[key]["name"] == "good meta rule" + assert meta_rules[key]["description"] == "valid meta rule" + assert len(meta_rules[key]["subject_categories"]) == 1 + assert len(meta_rules[key]["object_categories"]) == 1 + assert len(meta_rules[key]["action_categories"]) == 1 + + subject_category_key = meta_rules[key]["subject_categories"][0] + object_category_key = meta_rules[key]["object_categories"][0] + action_category_key = meta_rules[key]["action_categories"][0] + + sub_cat = _cache.subject_categories + assert sub_cat[subject_category_key]["name"] == "test subject categories" + + ob_cat = _cache.object_categories + assert ob_cat[object_category_key]["name"] == "test object categories" + + ac_cat = _cache.action_categories + assert ac_cat[action_category_key]["name"] == "test action categories" + + import_export_helper.clean_all() + + +def test_import_subject_object_action_assignments(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + import_export_helper.clean_all() + + data = import_to_cache.import_json(body=PRE_ASSIGNMENTS) + for item in PRE_ASSIGNMENTS.keys(): + assert item in data + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + counter = -1 + if type_element == "subject": + datas = SUBJECT_ASSIGNMENTS + elif type_element == "object": + datas = OBJECT_ASSIGNMENTS + else: + datas = ACTION_ASSIGNMENTS + + for assignments in datas: + counter = counter + 1 + my_exception = assignments.pop("exception") + if my_exception: + with pytest.raises(my_exception) as exception_info: + import_to_cache.import_json(body=assignments) + assert '400:' in str(exception_info.value) + else: + data = import_to_cache.import_json(body=assignments) + assert type_element+"_assignments" in data + assert getattr(_cache, type_element+"_assignments") + + assert len(getattr(_cache, type_element+"_assignments")) == 1 + + +def test_import_rules(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + import_export_helper.clean_all() + + data = import_to_cache.import_json(body=PRE_ASSIGNMENTS) + for item in PRE_ASSIGNMENTS.keys(): + assert item in data + + counter = -1 + for rule in RULES: + counter = counter + 1 + if counter < 5: + with pytest.raises(exceptions.UnknownName) as exception_info: + import_to_cache.import_json(body=rule) + + assert '400: Unknown Name.' == str(exception_info.value) + continue + data = import_to_cache.import_json(body=rule) + assert "rules" in data + policies = _cache.policies + policy_id = None + for policy in policies: + if policies[policy]['name'] == rule['rules'][0]['policy']['name']: + policy_id = policy + break + + assert policy_id + rules = _cache.rules + assert len(rules) == 1 + rule_content = list(rules.values())[0] + assert policy_id == rule_content["policy_id"] + assert rule_content["rules"] + assert len(rule_content["rules"]) == 1 + assert rule_content["rules"][0]["enabled"] + assert rule_content["rules"][0]["instructions"][0]["decision"] == "grant" + + meta_rules = _cache.meta_rules + assert meta_rules[list(meta_rules.keys())[0]]["name"] == "good meta rule" + + +def test_import_subject_object_action_data(): + from moon_utilities import json_utils + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_to_cache = json_utils.JsonImport(driver_name="cache", driver=_cache) + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + data = import_to_cache.import_json(body=PRE_DATA) + for key in PRE_DATA.keys(): + assert key in data + counter = -1 + if type_element == "subject": + elements = SUBJECT_DATA + get_method = _cache.subject_data + get_categories = _cache.subject_categories + elif type_element == "object": + elements = OBJECT_DATA + get_method = _cache.object_data + get_categories = _cache.object_categories + else: + elements = ACTION_DATA + get_method = _cache.action_data + get_categories = _cache.action_categories + + for element in elements: + counter = counter + 1 + if counter == 0 or counter == 1: + with pytest.raises(exceptions.MissingIdOrName) as exception_info: + import_to_cache.import_json(body=element) + assert '400: Missing ID or Name.' == str(exception_info.value) + continue + else: + data = import_to_cache.import_json(body=element) + for key in element.keys(): + assert key in data + + policies = _cache.policies + categories = get_categories + case_tested = False + for policy_key in policies.keys(): + policy = policies[policy_key] + for category_key in categories: + get_elements = get_method + _all_data = [] + for _data in get_elements: + if category_key == _data["category_id"] and policy_key == _data["policy_id"]: + _all_data.append(_data) + get_elements = _all_data + if not get_elements: + continue + if policy["name"] == "test policy": + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + if counter == 2: + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "one valid " + type_element + " data" + if counter == 3: + assert len(el["data"].keys()) == 2 + el1 = el["data"][list(el["data"].keys())[0]] + el2 = el["data"][list(el["data"].keys())[1]] + if "value" in el1: + el1 = el1["value"] + el2 = el2["value"] + assert (el1["name"] == "one valid " + type_element + " data" and el2[ + "name"] == "valid " + type_element + " data") or (el2[ + "name"] == "one valid " + type_element + " data" and + el1[ + "name"] == "valid " + type_element + " data") + assert el1["description"] == "description" + assert el2["description"] == "description" + + case_tested = True + + if policy["name"] == "test other policy": + if counter == 4: + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "valid " + type_element + " data" + assert el["description"] == "new description" + case_tested = True + + assert case_tested is True + + +def test_clean(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + import_export_helper.clean_all() + assert not _cache.subject_categories + assert not _cache.object_categories + assert not _cache.action_categories + assert not _cache.subjects + assert not _cache.objects + assert not _cache.actions + assert not _cache.subject_data + assert not _cache.object_data + assert not _cache.action_data + assert not _cache.subject_assignments + assert not _cache.object_assignments + assert not _cache.action_assignments + assert not _cache.models + assert not _cache.pdp + assert not _cache.policies diff --git a/moon_utilities/tests/unit_python/api/test_import_to_db.py b/moon_utilities/tests/unit_python/api/test_import_to_db.py new file mode 100644 index 00000000..34479b8c --- /dev/null +++ b/moon_utilities/tests/unit_python/api/test_import_to_db.py @@ -0,0 +1,772 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import pytest +from moon_utilities import exceptions +import helpers.import_export_db_helper as import_export_helper + + +MODEL_WITHOUT_META_RULES = [ + {"models": [{"name": "test model", "description": "", "meta_rules": []}]}, + {"models": [{"name": "test model", "description": "new description", "meta_rules": [], + "override": True}]}, + {"models": [{"name": "test model", "description": "description not taken into account", + "meta_rules": [], "override": False}]} +] + +POLICIES = [ + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}]}, + {"policies": [{"name": "test policy", "genre": "authz", + "description": "new description not taken into account", + "model": {"name": "test model"}, "mandatory": True}]}, + {"policies": [ + {"name": "test policy", "genre": "not authz ?", "description": "generates an exception", + "model": {"name": "test model"}, "override": True}]}, + {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [ + {"name": "test policy", "genre": "not authz ?", "description": "changes taken into account", + "model": {"name": "test model"}, "override": True}]}, +] + +SUBJECTS = [ + {"subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {}, "mandatory": True}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"subjects": [{"name": "testuser", "description": "new description of the subject", + "extra": {"email": "new-email@test.com"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": [{"name": "test policy"}]}]}] + +OBJECTS = [ + {"objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": True}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"objects": [{"name": "test object", "description": "new description of the object", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": [{"name": "test policy"}]}]}, +] + +ACTIONS = [{"actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {}, "mandatory": True}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"actions": [{"name": "test action", "description": "new description of the action", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": [{"name": "test policy"}]}]}] + +SUBJECT_CATEGORIES = [{"subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}]}, + {"subject_categories": [{"name": "test subject categories", + "description": "new subject category description"}]}] + +OBJECT_CATEGORIES = [{"object_categories": [ + {"name": "test object categories", "description": "object category description"}]}, + {"object_categories": [{"name": "test object categories", + "description": "new object category description"}]}] + +ACTION_CATEGORIES = [{"action_categories": [ + {"name": "test action categories", "description": "action category description"}]}, + {"action_categories": [{"name": "test action categories", + "description": "new action category description"}]}] + +# meta_rules import is needed otherwise the search for data do not work !!! +PRE_DATA = {"models": [{"name": "test model", "description": "", + "meta_rules": [{"name": "good meta rule"}, + {"name": "other good meta rule"}]}], + "policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}, + {"name": "other test subject categories", + "description": "subject category description"}], + "object_categories": [ + {"name": "test object categories", "description": "object category description"}, + {"name": "other test object categories", + "description": "object category description"}], + "action_categories": [ + {"name": "test action categories", "description": "action category description"}, + {"name": "other test action categories", + "description": "action category description"}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}, + {"name": "other good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "other test subject categories"}], + "object_categories": [{"name": "other test object categories"}], + "action_categories": [{"name": "other test action categories"}]}]} + +SUBJECT_DATA = [{"subject_data": [ + {"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]}, + {"subject_data": [ + {"name": "not valid subject data", "description": "", "policies": [{}], + "category": {"name": "test subject categories"}}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "subject_data": [ + {"name": "one valid subject data", "description": "description", + "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test subject categories"}}]}] + +OBJECT_DATA = [{"object_data": [ + {"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]}, + {"object_data": [ + {"name": "not valid object data", "description": "", "policies": [{}], + "category": {"name": "test object categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "object_data": [ + {"name": "one valid object data", "description": "description", "policies": [{}], + "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test object categories"}}]}] + +ACTION_DATA = [{"action_data": [ + {"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]}, + {"action_data": [ + {"name": "not valid action data", "description": "", "policies": [{}], + "category": {"name": "test action categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "action_data": [ + {"name": "one valid action data", "description": "description", "policies": [{}], + "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test action categories"}}]}] + +PRE_META_RULES = {"subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", + "description": "object category description"}], + "action_categories": [{"name": "test action categories", + "description": "object action description"}]} + +META_RULES = [{"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "not valid category"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "not valid category"}], + "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "not valid category"}]}]}, + {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}]}] + +PRE_ASSIGNMENTS = {"models": [ + {"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}], + "policies": [ + {"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", + "description": "subject category description"}], + "object_categories": [{"name": "test object categories", + "description": "object category description"}], + "action_categories": [{"name": "test action categories", + "description": "object action description"}], + "subjects": [{"name": "testuser", "description": "description of the subject", + "extra": {}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", + "extra": {}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", + "extra": {}, "policies": [{"name": "test policy"}]}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}], + "subject_data": [{"name": "subject data", "description": "test subject data", + "policies": [{"name": "test policy"}], + "category": {"name": "test subject categories"}}], + "object_data": [{"name": "object data", "description": "test object data", + "policies": [{"name": "test policy"}], + "category": {"name": "test object categories"}}], + "action_data": [{"name": "action data", "description": "test action data", + "policies": [{"name": "test policy"}], + "category": {"name": "test action categories"}}]} + +SUBJECT_ASSIGNMENTS = [ + {"subject_assignments": [ + {"subject": {"name": "unknown"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "subject data"}]}], + "exception": exceptions.InvalidJson + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "subject data"}]}], + "exception": exceptions.UnknownName + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "subject data"}]}], + "exception": None + }] + +OBJECT_ASSIGNMENTS = [ + {"object_assignments": [ + {"object": {"name": "unknown"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "object data"}]}], + "exception": exceptions.InvalidJson + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "object data"}]}], + "exception": exceptions.UnknownName + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "object data"}]}], + "exception": None + }] + +ACTION_ASSIGNMENTS = [ + {"action_assignments": [ + {"action": {"name": "unknown"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "action data"}]}], + "exception": exceptions.InvalidJson + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "action data"}]}], + "exception": exceptions.UnknownName + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "action data"}]}], + "exception": None + }] + +RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test " + "policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown " + "policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "unknown subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "unknown object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "unknown action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": ({"decision": "grant"},), "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}] + + +def test_import_models_without_new_meta_rules(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + import_export_helper.clean_all() + counter = 0 + for models_description in MODEL_WITHOUT_META_RULES: + data = import_to_db.import_json(body=models_description) + assert "models" in data + models = _cache.get_models() + assert len(list(models.keys())) == 1 + values = list(models.values()) + assert values[0]["name"] == "test model" + if counter == 0: + assert len(values[0]["description"]) == 0 + if counter == 1 or counter == 2: + assert values[0]["description"] == "new description" + counter = counter + 1 + import_export_helper.clean_all() + + +def test_import_policies(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + counter = -1 + for policy_description in POLICIES: + counter = counter + 1 + if counter == 2: + with pytest.raises(exceptions.UnknownName): + import_to_db.import_json(body=policy_description) + continue + else: + data = import_to_db.import_json(body=policy_description) + assert "policies" in data + if counter == 2: + assert "models" in data + + policies = db_driver.PolicyManager.get_policies(moon_user_id="admin") + assert len(list(policies.keys())) == 1 + values = list(policies.values()) + assert values[0]["name"] == "test policy" + if counter < 3: + assert values[0]["genre"] == "authz" + assert values[0]["description"] == "description" + else: + assert values[0]["genre"] == "not authz ?" + assert values[0]["description"] == "changes taken into account" + assert len(values[0]["model_id"]) > 0 + + +def test_import_subject_object_action(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECTS + clean_method = import_export_helper.clean_subjects + name = "testuser" + key_extra = "email" + value_extra = "new-email@test.com" + elif type_element == "object": + elements = OBJECTS + clean_method = import_export_helper.clean_objects + name = "test object" + key_extra = "test" + value_extra = "test extra" + else: + elements = ACTIONS + clean_method = import_export_helper.clean_actions + name = "test action" + key_extra = "test" + value_extra = "test extra" + + for element in elements: + counter = counter + 1 + if counter == 2 or counter == 4: + clean_method() + if counter == 3: + clean_method() + data = import_to_db.import_json(body=element) + elif counter < 2: + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + import_to_db.import_json(body=element) + assert '400: Policy Unknown' == str(exception_info.value) + continue + else: + data = import_to_db.import_json(body=element) + assert data + + if counter != 3: + assert type_element+"s" in data + assert "policies" in data + + get_elements = getattr(_cache, type_element + "s") + + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == name + if counter == 2 or counter == 4: + assert values[0]["description"] == "description of the " + type_element + if counter == 3: + assert values[0]["description"] == "new description of the " + type_element + assert values[0]["extra"][key_extra] == value_extra + + assert len(values[0]["policy_list"]) == 1 + import_export_helper.clean_all() + + +def test_import_subject_object_action_categories(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_CATEGORIES + elif type_element == "object": + elements = OBJECT_CATEGORIES + else: + elements = ACTION_CATEGORIES + + for element in elements: + data = import_to_db.import_json(body=element) + counter = counter + 1 + assert type_element + "_categories" in data + get_elements = getattr(_cache, type_element + "_categories") + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == "test " + type_element + " categories" + assert values[0]["description"] == type_element + " category description" + + +def test_import_meta_rules(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + import_export_helper.clean_all() + # import some categories + data = import_to_db.import_json(body=PRE_META_RULES) + for cat in ['subject_categories', 'object_categories', 'action_categories']: + assert cat in data + + counter = -1 + for meta_rule in META_RULES: + counter = counter + 1 + if counter != 3: + with pytest.raises(exceptions.UnknownName) as exception_info: + import_to_db.import_json(body=meta_rule) + assert '400: Unknown Name.' == str(exception_info.value) + continue + else: + data = import_to_db.import_json(body=meta_rule) + assert "meta_rules" in data + assert _cache.meta_rules + + meta_rules = _cache.meta_rules + key = list(meta_rules.keys())[0] + assert isinstance(meta_rules, dict) + assert meta_rules[key]["name"] == "good meta rule" + assert meta_rules[key]["description"] == "valid meta rule" + assert len(meta_rules[key]["subject_categories"]) == 1 + assert len(meta_rules[key]["object_categories"]) == 1 + assert len(meta_rules[key]["action_categories"]) == 1 + + subject_category_key = meta_rules[key]["subject_categories"][0] + object_category_key = meta_rules[key]["object_categories"][0] + action_category_key = meta_rules[key]["action_categories"][0] + + sub_cat = _cache.subject_categories + assert sub_cat[subject_category_key]["name"] == "test subject categories" + + ob_cat = _cache.object_categories + assert ob_cat[object_category_key]["name"] == "test object categories" + + ac_cat = _cache.action_categories + assert ac_cat[action_category_key]["name"] == "test action categories" + + import_export_helper.clean_all() + + +def test_import_subject_object_action_assignments(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + import_export_helper.clean_all() + + data = import_to_db.import_json(body=PRE_ASSIGNMENTS) + for item in PRE_ASSIGNMENTS.keys(): + assert item in data + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + counter = -1 + if type_element == "subject": + datas = SUBJECT_ASSIGNMENTS + elif type_element == "object": + datas = OBJECT_ASSIGNMENTS + else: + datas = ACTION_ASSIGNMENTS + + for assignments in datas: + counter = counter + 1 + my_exception = assignments.pop("exception") + if my_exception: + with pytest.raises(my_exception) as exception_info: + import_to_db.import_json(body=assignments) + assert '400:' in str(exception_info.value) + else: + data = import_to_db.import_json(body=assignments) + assert type_element+"_assignments" in data + assert getattr(_cache, type_element+"_assignments") + + assert len(getattr(_cache, type_element+"_assignments")) == 1 + + +def test_import_rules(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + import_export_helper.clean_all() + + data = import_to_db.import_json(body=PRE_ASSIGNMENTS) + for item in PRE_ASSIGNMENTS.keys(): + assert item in data + + counter = -1 + for rule in RULES: + counter = counter + 1 + if counter < 5: + with pytest.raises(exceptions.UnknownName) as exception_info: + import_to_db.import_json(body=rule) + + assert '400: Unknown Name.' == str(exception_info.value) + continue + data = import_to_db.import_json(body=rule) + assert "rules" in data + policies = _cache.policies + policy_id = None + for policy in policies: + if policies[policy]['name'] == rule['rules'][0]['policy']['name']: + policy_id = policy + break + + assert policy_id + rules = [] + for _rule in _cache.rules: + if policy_id == _rule['policy_id']: + rules.append(_rule) + assert len(rules) == 1 + rule_content = rules[0] + assert policy_id == rule_content["policy_id"] + assert rule_content["rules"] + assert len(rule_content["rules"]) == 1 + # FIXME: this is different from cache tests (where we have a "value" attribute) + assert rule_content["rules"][0]["enabled"] + assert rule_content["rules"][0]["instructions"][0]["decision"] == "grant" + + meta_rules = _cache.meta_rules + assert meta_rules[list(meta_rules.keys())[0]]["name"] == "good meta rule" + + +def test_import_subject_object_action_data(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + data = import_to_db.import_json(body=PRE_DATA) + for key in PRE_DATA.keys(): + assert key in data + counter = -1 + if type_element == "subject": + elements = SUBJECT_DATA + get_method = _cache.get_subject_data + get_categories = _cache.subject_categories + elif type_element == "object": + elements = OBJECT_DATA + get_method = _cache.get_object_data + get_categories = _cache.object_categories + else: + elements = ACTION_DATA + get_method = _cache.get_action_data + get_categories = _cache.action_categories + + for element in elements: + counter = counter + 1 + if counter == 0 or counter == 1: + with pytest.raises(exceptions.MissingIdOrName) as exception_info: + import_to_db.import_json(body=element) + assert '400: Missing ID or Name.' == str(exception_info.value) + continue + else: + data = import_to_db.import_json(body=element) + for key in element.keys(): + assert key in data + + policies = _cache.policies + categories = get_categories + case_tested = False + for policy_key in policies.keys(): + policy = policies[policy_key] + for category_key in categories: + get_elements = get_method(policy_id=policy_key, category_id=category_key) + _all_data = [] + for _data in get_elements: + if category_key == _data["category_id"] and \ + policy_key == _data["policy_id"]: + _all_data.append(_data) + get_elements = _all_data + if not get_elements: + continue + if policy["name"] == "test policy": + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + if counter == 2: + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "one valid " + type_element + " data" + if counter == 3: + assert len(el["data"].keys()) == 2 + el1 = el["data"][list(el["data"].keys())[0]] + el2 = el["data"][list(el["data"].keys())[1]] + if "value" in el1: + el1 = el1["value"] + el2 = el2["value"] + assert (el1["name"] == "one valid " + type_element + " data" and + el2["name"] == "valid " + type_element + " data") or \ + (el2["name"] == "one valid " + type_element + " data" and + el1["name"] == "valid " + type_element + " data") + assert el1["description"] == "description" + assert el2["description"] == "description" + + case_tested = True + break + + if policy["name"] == "test other policy": + if counter == 4: + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "valid " + type_element + " data" + assert el["description"] == "new description" + case_tested = True + break + + assert case_tested is True + + +def test_clean(): + from moon_utilities import json_utils + from moon_manager import db_driver + import_to_db = json_utils.JsonImport(driver_name="db", driver=db_driver) + _cache = import_to_db.driver + import_export_helper.clean_all() + assert not _cache.subject_categories + assert not _cache.object_categories + assert not _cache.action_categories + assert not _cache.subjects + assert not _cache.objects + assert not _cache.actions + assert not _cache.subject_data + assert not _cache.object_data + assert not _cache.action_data + assert not _cache.subject_assignments + assert not _cache.object_assignments + assert not _cache.action_assignments + assert not _cache.models + assert not _cache.pdp + assert not _cache.policies diff --git a/moon_utilities/tests/unit_python/api/test_invalidate_function.py b/moon_utilities/tests/unit_python/api/test_invalidate_function.py new file mode 100644 index 00000000..ed0357dc --- /dev/null +++ b/moon_utilities/tests/unit_python/api/test_invalidate_function.py @@ -0,0 +1,64 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import moon_utilities.invalided_functions + + +def test_invalidate_assignment_in_slaves(slaves): + result = moon_utilities.invalided_functions.invalidate_assignment_in_slaves( + slaves, "098764321", "098764321", "098764321", "098764321", 'subject') + assert result + assert "slave_test" in list(result) + + +def test_invalidate_data_in_slaves(slaves): + result = moon_utilities.invalided_functions.invalidate_data_in_slaves( + slaves, "__policy_id__", "__category_id__", "098764321", "subject") + assert result + assert "slave_test" in list(result) + + +def test_invalidate_perimeter_in_slaves(slaves): + result = moon_utilities.invalided_functions.invalidate_perimeter_in_slaves( + slaves, "098764321", "098764321", "subject", is_delete=False) + assert result + assert "slave_test" in list(result) + + +def test_invalidate_pdp_in_slaves(slaves): + result = moon_utilities.invalided_functions.invalidate_pdp_in_slaves( + slaves, "098764321", is_delete=False) + assert result + assert "slave_test" in list(result) + + +def test_invalidate_policy_in_slaves(slaves): + result = moon_utilities.invalided_functions.invalidate_policy_in_slaves( + slaves, "098764321", is_delete=False) + assert result + assert "slave_test" in list(result) + + +def test_invalidate_rules_in_slaves(): + pass + + +def test_invalidate_model_in_slaves(): + pass + + +def test_invalidate_meta_data_in_slaves(): + pass + + +def test_invalidate_meta_rule_in_slaves(): + pass diff --git a/moon_utilities/tests/unit_python/api/test_security_functions.py b/moon_utilities/tests/unit_python/api/test_security_functions.py new file mode 100644 index 00000000..469a70a2 --- /dev/null +++ b/moon_utilities/tests/unit_python/api/test_security_functions.py @@ -0,0 +1,18 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from moon_utilities.security_functions import enforce + + +def test_enforce(): + # TODO: implement the test + assert True diff --git a/moon_utilities/tests/unit_python/conftest.py b/moon_utilities/tests/unit_python/conftest.py new file mode 100644 index 00000000..da55d3ff --- /dev/null +++ b/moon_utilities/tests/unit_python/conftest.py @@ -0,0 +1,172 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import mock_slaves +import os +import pytest +import requests_mock +import yaml + + +__CONF = """ +debug: true + +database: + url: sqlite:////tmp/database_test.db + driver: moon_manager.plugins.sql + migration_dir: moon_manager.api.db.migrations + +dashboard: + root: ../dashboard/www/ + +management: + url: http://127.0.0.1:8000 + user: admin + password: admin + token_file: db.json + +orchestration: + driver: moon_manager.plugins.pyorchestrator + connection: local + port: 10000...10100 + config_dir: /tmp + +information: + openstack: + driver: moon_manager.plugins.moon_openstack_plugin + url: http://keystone:5000/v3 + user: admin + password: p4ssw0rd + domain: default + project: admin + check_token: false + certificate: false + global_attrs: + driver: moon_manager.plugins.global_attrs + attributes: + mode: + values: + - build + - run + default: build + url: file:/etc/moon/mode + #url: https://127.0.0.1:8080/mode + #url: mysql+pymysql://moon:p4sswOrd1@db/moon_mode + #url: sqlite:////tmp/database.db + #url: driver://moon_manager.plugins.my_plugin + +plugins: + directory: /var/moon/plugins + +components: + manager: + port: 8080 + bind: 0.0.0.0 + hostname: manager + +logging: + version: 1 + + formatters: + brief: + format: "%(levelname)s %(name)s %(message)-30s" + custom: + format: "%(asctime)-15s %(levelname)s %(name)s %(message)s" + + handlers: + console: + class : logging.StreamHandler + formatter: custom + level : INFO + stream : ext://sys.stdout + file: + class : logging.handlers.RotatingFileHandler + formatter: custom + level : DEBUG + filename: /tmp/moon.log + maxBytes: 1048576 + backupCount: 3 + + loggers: + moon: + level: DEBUG + handlers: [console, file] + propagate: no + + root: + level: ERROR + handlers: [console] +""" + + +@pytest.fixture +def slaves(): + return { + "slaves": { + "d464cc58a0cd46dea3191ba70f4e7df8": { + "name": "slave_test", + "address": "", + "description": "...", + "extra": { + "description": "...", + "starttime": 1543851265.76279, + "port": 10000, + "server_ip": "127.0.0.1", + "status": "up", + "api_key": "e58a882a6b658a22660f00a0c273e7f6b4c4eb5abe54eccba2cae307905d67e374" + "6537bd790c41887e11840c2d186b6d6eeec0e426bcfa7a872cc3417a35124a" + } + } + } + } + + +@pytest.fixture(autouse=True) +def no_requests(monkeypatch): + """ Modify the response from Requests module + """ + with requests_mock.Mocker(real_http=True) as m: + try: + os.remove("/tmp/database_test.db") + except FileNotFoundError: + pass + try: + os.remove("/tmp/moon.pwd") + except FileNotFoundError: + pass + print("Configure...") + from moon_manager.api.configuration import init_database, set_configuration + set_configuration(yaml.safe_load(__CONF)) + print("Create a new user") + from moon_utilities.auth_functions import add_user, init_db, get_api_key_for_user + init_db() + # try: + # user = add_user("admin", "admin") + # manager_api_key = user["api_key"] + # except KeyError: + # print("User already exists") + # manager_api_key = get_api_key_for_user("admin") + print("Initialize the database") + init_database() + from moon_manager import db_driver + db_driver.init() + + mock_slaves.register_slaves(m) + + print("End registering URI") + + # from moon_manager.pip_driver import InformationManager + # InformationManager.set_auth() + + yield m + + # InformationManager.unset_auth() diff --git a/moon_utilities/tests/unit_python/helpers/__init__.py b/moon_utilities/tests/unit_python/helpers/__init__.py new file mode 100644 index 00000000..2a5459c2 --- /dev/null +++ b/moon_utilities/tests/unit_python/helpers/__init__.py @@ -0,0 +1,13 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + + diff --git a/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py b/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py new file mode 100644 index 00000000..ff22abe2 --- /dev/null +++ b/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py @@ -0,0 +1,234 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import logging + +logger = logging.getLogger("moon.manager.test.api." + __name__) + + +def clean_models(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + keys = list(_cache.models.keys()) + for key in keys: + _cache.delete_model(key) + + +def clean_policies(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + keys = list(_cache.policies.keys()) + for key in keys: + _cache.delete_policy(key) + + +def clean_subjects(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + policy_keys = list(_cache.policies.keys()) + subjects = _cache.subjects + for policy_key in policy_keys: + for key in subjects: + try: + _cache.delete_subject(policy_key, key) + except AttributeError: + pass + _cache.delete_subject() + + +def clean_objects(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + policy_keys = list(_cache.policies.keys()) + objects = _cache.objects + for policy_key in policy_keys: + for key in objects: + try: + _cache.delete_object(policy_key, key) + except AttributeError: + pass + _cache.delete_object() + + +def clean_actions(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + policy_keys = list(_cache.policies.keys()) + actions = _cache.actions + for policy_key in policy_keys: + for key in actions: + try: + _cache.delete_action(policy_key, key) + except AttributeError: + pass + _cache.delete_action() + + +def clean_subject_categories(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + categories = list(_cache.subject_categories.keys()) + for key in categories: + _cache.delete_subject_category(key) + + +def clean_object_categories(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + categories = list(_cache.object_categories.keys()) + for key in categories: + _cache.delete_object_category(key) + + +def clean_action_categories(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + categories = list(_cache.action_categories.keys()) + for key in categories: + _cache.delete_action_category(key) + + +def clean_subject_data(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + _cache.delete_subject_data() + + +def clean_object_data(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + _cache.delete_object_data() + + +def clean_action_data(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + _cache.delete_action_data() + + +def clean_meta_rule(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + categories = list(_cache.meta_rules.keys()) + for key in categories: + _cache.delete_meta_rule(key) + + +def clean_subject_assignments(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + if not _cache.subject_assignments: + return + for policy_key in _cache.policies.keys(): + if policy_key not in _cache.subject_assignments: + continue + for key in _cache.subject_assignments[policy_key]: + # policy_key = _cache.subject_assignments[policy_key][key]["policy_id"] + subject_key = _cache.subject_assignments[policy_key][key]["subject_id"] + cat_key = _cache.subject_assignments[policy_key][key]["category_id"] + data_keys = _cache.subject_assignments[policy_key][key]["assignments"] + for data_key in data_keys: + _cache.delete_subject_assignment( + policy_id=policy_key, + perimeter_id=subject_key, + category_id=cat_key, + data_id=data_key + ) + _cache.delete_subject_assignment() + + +def clean_object_assignments(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + if not _cache.object_assignments: + return + for policy_key in _cache.policies.keys(): + if policy_key not in _cache.object_assignments: + continue + for key in _cache.object_assignments[policy_key]: + # policy_key = _cache.object_assignments[policy_key][key]["policy_id"] + object_key = _cache.object_assignments[policy_key][key]["object_id"] + cat_key = _cache.object_assignments[policy_key][key]["category_id"] + data_keys = _cache.object_assignments[policy_key][key]["assignments"] + for data_key in data_keys: + _cache.delete_object_assignment( + policy_id=policy_key, + perimeter_id=object_key, + category_id=cat_key, + data_id=data_key + ) + _cache.delete_object_assignment() + + +def clean_action_assignments(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + if not _cache.action_assignments: + return + for policy_key in _cache.policies.keys(): + if policy_key not in _cache.action_assignments: + continue + for key in _cache.action_assignments[policy_key]: + action_key = _cache.action_assignments[policy_key][key]["action_id"] + cat_key = _cache.action_assignments[policy_key][key]["category_id"] + data_keys = _cache.action_assignments[policy_key][key]["assignments"] + for data_key in data_keys: + _cache.delete_action_assignment( + policy_id=policy_key, + perimeter_id=action_key, + category_id=cat_key, + data_id=data_key + ) + _cache.delete_action_assignment() + + +def clean_rules(): + from moon_cache.cache import Cache + _cache = Cache.getInstance() + + rules = list(_cache.rules.keys()) + for key in rules: + _cache.delete_rule(key) + + +def clean_all(): + clean_rules() + + clean_subject_assignments() + clean_object_assignments() + clean_action_assignments() + + clean_subject_data() + clean_object_data() + clean_action_data() + + clean_actions() + clean_objects() + clean_subjects() + + clean_policies() + clean_models() + clean_meta_rule() + + clean_subject_categories() + clean_object_categories() + clean_action_categories() diff --git a/moon_utilities/tests/unit_python/helpers/import_export_db_helper.py b/moon_utilities/tests/unit_python/helpers/import_export_db_helper.py new file mode 100644 index 00000000..ecb9fa26 --- /dev/null +++ b/moon_utilities/tests/unit_python/helpers/import_export_db_helper.py @@ -0,0 +1,178 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import logging + +logger = logging.getLogger("moon.manager.test.api." + __name__) + + +def clean_models(): + from moon_manager import db_driver as driver + keys = driver.ModelManager.get_models(moon_user_id="admin") + for key in keys: + driver.ModelManager.delete_model(moon_user_id="admin", model_id=key) + + +def clean_policies(): + from moon_manager import db_driver as driver + keys = driver.PolicyManager.get_policies(moon_user_id="admin") + for key in keys: + driver.PolicyManager.delete_policy(moon_user_id="admin", policy_id=key) + + +def clean_subjects(): + from moon_manager import db_driver as driver + for policy in driver.PolicyManager.get_policies(moon_user_id="admin"): + subjects = driver.PolicyManager.get_subjects(moon_user_id="admin", policy_id=policy) + for key in subjects: + driver.PolicyManager.delete_subject(moon_user_id="admin", policy_id=policy, + perimeter_id=key) + + +def clean_objects(): + from moon_manager import db_driver as driver + for policy in driver.PolicyManager.get_policies(moon_user_id="admin"): + objects = driver.PolicyManager.get_objects(moon_user_id="admin", policy_id=policy) + for key in objects: + driver.PolicyManager.delete_object(moon_user_id="admin", policy_id=policy, + perimeter_id=key) + + +def clean_actions(): + from moon_manager import db_driver as driver + for policy in driver.PolicyManager.get_policies(moon_user_id="admin"): + actions = driver.PolicyManager.get_actions(moon_user_id="admin", policy_id=policy) + for key in actions: + driver.PolicyManager.delete_action(moon_user_id="admin", policy_id=policy, + perimeter_id=key) + + +def clean_subject_categories(): + from moon_manager import db_driver as driver + categories = driver.ModelManager.get_subject_categories(moon_user_id="admin") + + for key in categories: + driver.ModelManager.delete_subject_category(moon_user_id="admin", category_id=key) + + +def clean_object_categories(): + from moon_manager import db_driver as driver + categories = driver.ModelManager.get_object_categories(moon_user_id="admin") + + for key in categories: + driver.ModelManager.delete_object_category(moon_user_id="admin", category_id=key) + + +def clean_action_categories(): + from moon_manager import db_driver as driver + categories = driver.ModelManager.get_action_categories(moon_user_id="admin") + + for key in categories: + driver.ModelManager.delete_action_category(moon_user_id="admin", category_id=key) + + +def clean_subject_data(): + from moon_manager import db_driver as driver + policies = driver.PolicyManager.get_policies(moon_user_id="admin") + categories = driver.ModelManager.get_subject_categories(moon_user_id="admin") + for policy in policies: + for category in categories: + data_object = driver.PolicyManager.get_subject_data( + moon_user_id="admin", policy_id=policy, category_id=category) + for data_item in data_object: + for data in data_item.get("data", {}): + driver.PolicyManager.delete_subject_data(moon_user_id="admin", policy_id=policy, + category_id=category, data_id=data) + + +def clean_object_data(): + from moon_manager import db_driver as driver + policies = driver.PolicyManager.get_policies(moon_user_id="admin") + categories = driver.ModelManager.get_object_categories(moon_user_id="admin") + for policy in policies: + for category in categories: + data_object = driver.PolicyManager.get_object_data( + moon_user_id="admin", policy_id=policy, category_id=category) + for data_item in data_object: + for data in data_item.get("data", {}): + driver.PolicyManager.delete_object_data(moon_user_id="admin", policy_id=policy, + category_id=category, data_id=data) + + +def clean_action_data(): + from moon_manager import db_driver as driver + policies = driver.PolicyManager.get_policies(moon_user_id="admin") + categories = driver.ModelManager.get_action_categories(moon_user_id="admin") + for policy in policies: + for category in categories: + data_object = driver.PolicyManager.get_action_data( + moon_user_id="admin", policy_id=policy, category_id=category) + for data_item in data_object: + for data in data_item.get("data", {}): + driver.PolicyManager.delete_action_data(moon_user_id="admin", policy_id=policy, + category_id=category, data_id=data) + + +def clean_meta_rule(): + from moon_manager import db_driver as driver + keys = driver.ModelManager.get_meta_rules(moon_user_id="admin") + + for key in keys: + driver.ModelManager.delete_meta_rule(moon_user_id="admin", meta_rule_id=key) + + +def clean_subject_assignments(): + pass + + +def clean_object_assignments(): + pass + + +def clean_action_assignments(): + pass + + +def clean_rules(): + from moon_manager import db_driver as driver + + policies = driver.PolicyManager.get_policies(moon_user_id="admin") + + for policy in policies: + rules = driver.PolicyManager.get_rules(moon_user_id="admin", policy_id=policy) + + for rule in rules: + driver.PolicyManager.delete_rule(moon_user_id="admin", policy_id=policy, rule_id=rule) + + +def clean_all(): + clean_rules() + + clean_subject_assignments() + clean_object_assignments() + clean_action_assignments() + + clean_subject_data() + clean_object_data() + clean_action_data() + + clean_actions() + clean_objects() + clean_subjects() + + clean_policies() + clean_models() + clean_meta_rule() + + clean_subject_categories() + clean_object_categories() + clean_action_categories() diff --git a/moon_utilities/tests/unit_python/helpers/slaves_helpers.py b/moon_utilities/tests/unit_python/helpers/slaves_helpers.py new file mode 100644 index 00000000..1b2764d2 --- /dev/null +++ b/moon_utilities/tests/unit_python/helpers/slaves_helpers.py @@ -0,0 +1,32 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + + +SLAVES = { + "slaves": { + "d464cc58a0cd46dea3191ba70f4e7df8": { + "name": "slave_test", + "address": "", + "description": "...", + "extra": { + "description": "...", + "starttime": 1543851265.76279, + "port": 10000, + "server_ip": "127.0.0.1", + "status": "down", + "api_key": "e58a882a6b658a22660f00a0c273e7f6b4c4eb5abe54eccba2cae307905d67e3746537" + "bd790c41887e11840c2d186b6d6eeec0e426bcfa7a872cc3417a35124a" + } + } + } +} + diff --git a/moon_utilities/tests/unit_python/mock_slaves.py b/moon_utilities/tests/unit_python/mock_slaves.py new file mode 100644 index 00000000..f3ac04d8 --- /dev/null +++ b/moon_utilities/tests/unit_python/mock_slaves.py @@ -0,0 +1,50 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + + +def register_slaves(m): + m.register_uri( + 'DELETE', 'http://127.0.0.1:10000/update/assignment/098764321/subject/098764321/098764321/098764321', + json={} + ) + m.register_uri( + 'DELETE', 'http://127.0.0.1:10000/update/data/098764321/subject', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/perimeter/098764321/098764321/subject', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/pdp/098764321', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/policy/098764321', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/rules/1234567890', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/model/1234567890', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/meta_data/1234567890', + json={} + ) + m.register_uri( + 'PUT', 'http://127.0.0.1:10000/update/meta_rule/1234567890', + json={} + ) diff --git a/moon_utilities/tests/unit_python/requirements.txt b/moon_utilities/tests/unit_python/requirements.txt new file mode 100644 index 00000000..91cbc2c0 --- /dev/null +++ b/moon_utilities/tests/unit_python/requirements.txt @@ -0,0 +1,10 @@ +pytest +pbr +pytest-cov +cliff +requests_mock +tinydb +moon_manager +moon_cache +sqlalchemy +pymysql \ No newline at end of file -- cgit 1.2.3-korg