diff options
Diffstat (limited to 'moon_manager/tests/unit_python/api')
21 files changed, 6123 insertions, 789 deletions
diff --git a/moon_manager/tests/unit_python/api/__init__.py b/moon_manager/tests/unit_python/api/__init__.py index e69de29b..1856aa2c 100644 --- a/moon_manager/tests/unit_python/api/__init__.py +++ b/moon_manager/tests/unit_python/api/__init__.py @@ -0,0 +1,12 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + diff --git a/moon_manager/tests/unit_python/api/meta_data_test.py b/moon_manager/tests/unit_python/api/meta_data_test.py deleted file mode 100644 index 8fb39ae1..00000000 --- a/moon_manager/tests/unit_python/api/meta_data_test.py +++ /dev/null @@ -1,196 +0,0 @@ -import json -import api.utilities as utilities - -#subject_categories_test - - -def get_subject_categories(client): - req = client.get("/subject_categories") - subject_categories = utilities.get_json(req.data) - return req, subject_categories - - -def add_subject_categories(client, name): - data = { - "name": name, - "description": "description of {}".format(name) - } - req = client.post("/subject_categories", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - subject_categories = utilities.get_json(req.data) - return req, subject_categories - - -def delete_subject_categories(client, name): - request, subject_categories = get_subject_categories(client) - for key, value in subject_categories['subject_categories'].items(): - if value['name'] == name: - req = client.delete("/subject_categories/{}".format(key)) - break - return req - - -def delete_subject_categories_without_id(client): - req = client.delete("/subject_categories/{}".format("")) - return req - - -def test_get_subject_categories(): - client = utilities.register_client() - req, subject_categories = get_subject_categories(client) - assert req.status_code == 200 - assert isinstance(subject_categories, dict) - assert "subject_categories" in subject_categories - - -def test_add_subject_categories(): - client = utilities.register_client() - req, subject_categories = add_subject_categories(client, "testuser") - assert req.status_code == 200 - assert isinstance(subject_categories, dict) - value = list(subject_categories["subject_categories"].values())[0] - assert "subject_categories" in subject_categories - assert value['name'] == "testuser" - assert value['description'] == "description of {}".format("testuser") - - -def test_delete_subject_categories(): - client = utilities.register_client() - req = delete_subject_categories(client, "testuser") - assert req.status_code == 200 - - -def test_delete_subject_categories_without_id(): - client = utilities.register_client() - req = delete_subject_categories_without_id(client) - assert req.status_code == 500 - - -#--------------------------------------------------------------------------- -#object_categories_test - -def get_object_categories(client): - req = client.get("/object_categories") - object_categories = utilities.get_json(req.data) - return req, object_categories - - -def add_object_categories(client, name): - data = { - "name": name, - "description": "description of {}".format(name) - } - req = client.post("/object_categories", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - object_categories = utilities.get_json(req.data) - return req, object_categories - - -def delete_object_categories(client, name): - request, object_categories = get_object_categories(client) - for key, value in object_categories['object_categories'].items(): - if value['name'] == name: - req = client.delete("/object_categories/{}".format(key)) - break - return req - - -def delete_object_categories_without_id(client): - req = client.delete("/object_categories/{}".format("")) - return req - - -def test_get_object_categories(): - client = utilities.register_client() - req, object_categories = get_object_categories(client) - assert req.status_code == 200 - assert isinstance(object_categories, dict) - assert "object_categories" in object_categories - - -def test_add_object_categories(): - client = utilities.register_client() - req, object_categories = add_object_categories(client, "testuser") - assert req.status_code == 200 - assert isinstance(object_categories, dict) - value = list(object_categories["object_categories"].values())[0] - assert "object_categories" in object_categories - assert value['name'] == "testuser" - assert value['description'] == "description of {}".format("testuser") - - -def test_delete_object_categories(): - client = utilities.register_client() - req = delete_object_categories(client, "testuser") - assert req.status_code == 200 - - -def test_delete_object_categories_without_id(): - client = utilities.register_client() - req = delete_object_categories_without_id(client) - assert req.status_code == 500 - - -#--------------------------------------------------------------------------- -#action_categories_test - -def get_action_categories(client): - req = client.get("/action_categories") - action_categories = utilities.get_json(req.data) - return req, action_categories - - -def add_action_categories(client, name): - data = { - "name": name, - "description": "description of {}".format(name) - } - req = client.post("/action_categories", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - action_categories = utilities.get_json(req.data) - return req, action_categories - - -def delete_action_categories(client, name): - request, action_categories = get_action_categories(client) - for key, value in action_categories['action_categories'].items(): - if value['name'] == name: - req = client.delete("/action_categories/{}".format(key)) - break - return req - - -def delete_action_categories_without_id(client): - req = client.delete("/action_categories/{}".format("")) - return req - - -def test_get_action_categories(): - client = utilities.register_client() - req, action_categories = get_action_categories(client) - assert req.status_code == 200 - assert isinstance(action_categories, dict) - assert "action_categories" in action_categories - - -def test_add_action_categories(): - client = utilities.register_client() - req, action_categories = add_action_categories(client, "testuser") - assert req.status_code == 200 - assert isinstance(action_categories, dict) - value = list(action_categories["action_categories"].values())[0] - assert "action_categories" in action_categories - assert value['name'] == "testuser" - assert value['description'] == "description of {}".format("testuser") - - -def test_delete_action_categories(): - client = utilities.register_client() - req = delete_action_categories(client, "testuser") - assert req.status_code == 200 - - -def test_delete_action_categories_without_id(): - client = utilities.register_client() - req = delete_action_categories_without_id(client) - assert req.status_code == 500
\ No newline at end of file diff --git a/moon_manager/tests/unit_python/api/meta_rules_test.py b/moon_manager/tests/unit_python/api/meta_rules_test.py deleted file mode 100644 index b5b1ecf8..00000000 --- a/moon_manager/tests/unit_python/api/meta_rules_test.py +++ /dev/null @@ -1,69 +0,0 @@ -import json -import api.utilities as utilities - - -def get_meta_rules(client): - req = client.get("/meta_rules") - meta_rules = utilities.get_json(req.data) - return req, meta_rules - - -def add_meta_rules(client, name): - data = { - "name": name, - "subject_categories": ["subject_category_id1", - "subject_category_id2"], - "object_categories": ["object_category_id1"], - "action_categories": ["action_category_id1"] - } - req = client.post("/meta_rules", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - meta_rules = utilities.get_json(req.data) - return req, meta_rules - - -def delete_meta_rules(client, name): - request, meta_rules = get_meta_rules(client) - for key, value in meta_rules['meta_rules'].items(): - if value['name'] == name: - req = client.delete("/meta_rules/{}".format(key)) - break - return req - - -def delete_meta_rules_without_id(client): - req = client.delete("/meta_rules/{}".format("")) - return req - - -def test_get_meta_rules(): - client = utilities.register_client() - req, meta_rules = get_meta_rules(client) - assert req.status_code == 200 - assert isinstance(meta_rules, dict) - assert "meta_rules" in meta_rules - - -def test_add_meta_rules(): - client = utilities.register_client() - req, meta_rules = add_meta_rules(client, "testuser") - assert req.status_code == 200 - assert isinstance(meta_rules, dict) - value = list(meta_rules["meta_rules"].values())[0] - assert "meta_rules" in meta_rules - assert value['name'] == "testuser" - assert value["subject_categories"][0] == "subject_category_id1" - assert value["object_categories"][0] == "object_category_id1" - assert value["action_categories"][0] == "action_category_id1" - - -def test_delete_meta_rules(): - client = utilities.register_client() - req = delete_meta_rules(client, "testuser") - assert req.status_code == 200 - - -def test_delete_meta_rules_without_id(): - client = utilities.register_client() - req = delete_meta_rules_without_id(client) - assert req.status_code == 500 diff --git a/moon_manager/tests/unit_python/api/test_assignement.py b/moon_manager/tests/unit_python/api/test_assignement.py new file mode 100644 index 00000000..3a127477 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_assignement.py @@ -0,0 +1,469 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +from uuid import uuid4 +import pytest +import api.utilities as utilities +from helpers import data_builder as builder +from moon_utilities import exceptions + + +def delete_assignment_based_on_parameters(type, policy_id, pre_id=None, cat_id=None, data_id=None): + if type in ["subject_assignments", "object_assignments", "action_assignments"] and policy_id: + url = "/policies/" + policy_id + "/" + type + if pre_id: + url += "/" + pre_id + if cat_id: + url += "/" + cat_id + if data_id: + url += "/" + data_id + else: + return "" + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(assignments, url, headers=auth_headers) + return req + + +# subject_categories_test + + +def get_subject_assignment(policy_id): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(assignments, "/policies/{}/subject_assignments".format(policy_id), headers=auth_headers) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment(): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + subject_id = builder.create_subject(policy_id) + data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + data = { + "id": subject_id, + "category_id": subject_category_id, + "data_id": data_id + } + req = hug.test.post(assignments, "/policies/{}/subject_assignments/".format(policy_id), + body=data, headers=auth_headers) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment_without_cat_id(): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + data = { + "id": "subject_id", + "category_id": "", + "data_id": "data_id" + } + req = hug.test.post(assignments, "/policies/{}/subject_assignments".format("1111"), body=data, + headers=auth_headers) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def delete_subject_assignment(policy_id, sub_id, cat_id, data_id): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(assignments, "/policies/{}/subject_assignments/{}/{}/{}".format( + policy_id, sub_id, cat_id, data_id), headers=auth_headers) + return req + + +def test_add_subject_assignment(): + req, subject_assignment = add_subject_assignment() + assert req.status == hug.HTTP_200 + assert isinstance(subject_assignment, dict) + assert "subject_assignments" in subject_assignment + + +# def test_add_subject_assignment_without_cat_id(): +# client = utilities.register_client() +# req, subject_assignment = add_subject_assignment_without_cat_id(client) +# assert req.status == hug.HTTP_400 +# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_get_subject_assignment(): + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(policy_id) + assert req.status == hug.HTTP_200 + assert isinstance(subject_assignment, dict) + assert "subject_assignments" in subject_assignment + + +def test_delete_subject_assignment(): + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(policy_id) + value = subject_assignment["subject_assignments"] + _id = list(value.keys())[0] + success_req = delete_subject_assignment( + policy_id, + value[_id]['subject_id'], + value[_id]['category_id'], + value[_id]['assignments'][0]) + assert success_req.status == hug.HTTP_200 + + +def test_delete_subject_assignment_using_policy(): + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(policy_id) + value = subject_assignment["subject_assignments"] + _id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "subject_assignments", + policy_id) + assert success_req.status == hug.HTTP_200 + + +def test_delete_subject_assignment_using_policy_perimeter_id(): + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(policy_id) + value = subject_assignment["subject_assignments"] + _id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "subject_assignments", + policy_id, + value[_id]['subject_id']) + assert success_req.status == hug.HTTP_200 + + +def test_delete_subject_assignment_using_policy_perimeter_id_category_id(): + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(policy_id) + value = subject_assignment["subject_assignments"] + _id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "subject_assignments", + policy_id, + value[_id]['subject_id'], + value[_id]['category_id']) + assert success_req.status == hug.HTTP_200 + + +def test_delete_subject_assignment_without_policy_id(): + + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + success_req = delete_subject_assignment("", "id1", "111", "data_id1") + + assert '400: Policy Unknown' == str(exception_info.value) + + # assert success_req.status == hug.HTTP_400 + # assert success_req.data["message"] == "400: Policy Unknown" + + +# --------------------------------------------------------------------------- +# object_categories_test + + +def get_object_assignment(policy_id): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(assignments, "/policies/{}/object_assignments".format(policy_id), headers=auth_headers) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment(): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + object_id = builder.create_object(policy_id) + data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id) + + data = { + "id": object_id, + "category_id": object_category_id, + "data_id": data_id + } + + req = hug.test.post(assignments, "/policies/{}/object_assignments".format(policy_id), + body=data, headers=auth_headers) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment_without_cat_id(): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + data = { + "id": "object_id", + "category_id": "", + "data_id": "data_id" + } + req = hug.test.post(assignments, "/policies/{}/object_assignments".format("1111"), + body=data, headers=auth_headers) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def delete_object_assignment(policy_id, obj_id, cat_id, data_id): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(assignments, "/policies/{}/object_assignments/{}/{}/{}".format( + policy_id, obj_id, cat_id, data_id), headers=auth_headers) + return req + + +def test_get_object_assignment(): + policy_id = builder.get_policy_id_with_object_assignment() + + req, object_assignment = get_object_assignment(policy_id) + assert req.status == hug.HTTP_200 + assert isinstance(object_assignment, dict) + assert "object_assignments" in object_assignment + + +def test_add_object_assignment(): + req, object_assignment = add_object_assignment() + assert req.status == hug.HTTP_200 + assert "object_assignments" in object_assignment + + +# def test_add_object_assignment_without_cat_id(): +# client = utilities.register_client() +# req, object_assignment = add_object_assignment_without_cat_id(client) +# assert req.status == hug.HTTP_400 +# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_delete_object_assignment(): + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(policy_id) + value = object_assignment["object_assignments"] + _id = list(value.keys())[0] + success_req = delete_object_assignment(policy_id, + value[_id]['object_id'], + value[_id]['category_id'], + value[_id]['assignments'][0]) + assert success_req.status == hug.HTTP_200 + + +def test_delete_object_assignment_using_policy(): + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(policy_id) + value = object_assignment["object_assignments"] + _id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "object_assignments", + policy_id) + assert success_req.status == hug.HTTP_200 + + +def test_delete_object_assignment_using_policy_perimeter_id(): + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(policy_id) + value = object_assignment["object_assignments"] + _id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "object_assignments", + policy_id, + value[_id]['object_id']) + assert success_req.status == hug.HTTP_200 + + +def test_delete_object_assignment_using_policy_perimeter_id_category_id(): + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(policy_id) + value = object_assignment["object_assignments"] + _id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "object_assignments", + policy_id, + value[_id]['object_id'], + value[_id]['category_id']) + assert success_req.status == hug.HTTP_200 + + +def test_delete_object_assignment_without_policy_id(): + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + success_req = delete_object_assignment("", "id1", "111", "data_id1") + # assert success_req.status == hug.HTTP_400 + # assert success_req.data["message"] == "400: Policy Unknown" + assert '400: Policy Unknown' == str(exception_info.value) + + +# --------------------------------------------------------------------------- +# action_categories_test + + +def get_action_assignment(policy_id): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(assignments, "/policies/{}/action_assignments".format(policy_id), headers=auth_headers) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment(): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + action_id = builder.create_action(policy_id) + data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id) + + data = { + "id": action_id, + "category_id": action_category_id, + "data_id": data_id + } + req = hug.test.post(assignments, "/policies/{}/action_assignments".format(policy_id), + body=data, + headers=auth_headers) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment_without_cat_id(): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + data = { + "id": "action_id", + "category_id": "", + "data_id": "data_id" + } + req = hug.test.post(assignments, "/policies/{}/action_assignments".format("1111"), + body=data, headers=auth_headers) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def delete_action_assignment(policy_id, action_id, cat_id, data_id): + from moon_manager.api import assignments + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(assignments, "/policies/{}/action_assignments/{}/{}/{}".format( + policy_id, action_id, cat_id, data_id), headers=auth_headers) + return req + + +def test_get_action_assignment(): + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(policy_id) + assert req.status == hug.HTTP_200 + assert isinstance(action_assignment, dict) + assert "action_assignments" in action_assignment + + +def test_add_action_assignment(): + req, action_assignment = add_action_assignment() + assert req.status == hug.HTTP_200 + assert "action_assignments" in action_assignment + + +# def test_add_action_assignment_without_cat_id(): +# client = utilities.register_client() +# req, action_assignment = add_action_assignment_without_cat_id(client) +# assert req.status == hug.HTTP_400 +# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_delete_action_assignment(): + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_action_assignment(policy_id, + value[id]['action_id'], + value[id]['category_id'], + value[id]['assignments'][0]) + assert success_req.status == hug.HTTP_200 + + +def test_delete_action_assignment_policy(): + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "action_assignments", + policy_id) + assert success_req.status == hug.HTTP_200 + + +def test_delete_action_assignment_policy_perimeter_id(): + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "action_assignments", + policy_id, + value[id]['action_id']) + assert success_req.status == hug.HTTP_200 + + +def test_delete_action_assignment_policy_perimeter_id_category_id(): + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_assignment_based_on_parameters( + "action_assignments", + policy_id, + value[id]['action_id'], + value[id]['category_id']) + assert success_req.status == hug.HTTP_200 + + +def test_delete_action_assignment_without_policy_id(): + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + success_req = delete_action_assignment("", "id1", "111", "data_id1") + # assert success_req.status == hug.HTTP_400 + # assert success_req.data["message"] == "400: Policy Unknown" + assert '400: Policy Unknown' == str(exception_info.value) + +# --------------------------------------------------------------------------- diff --git a/moon_manager/tests/unit_python/api/test_assignemnt.py b/moon_manager/tests/unit_python/api/test_assignemnt.py deleted file mode 100644 index 08688e04..00000000 --- a/moon_manager/tests/unit_python/api/test_assignemnt.py +++ /dev/null @@ -1,174 +0,0 @@ -import api.utilities as utilities -import json - - -# subject_categories_test - - -def get_subject_assignment(client, policy_id): - req = client.get("/policies/{}/subject_assignments".format(policy_id)) - subject_assignment = utilities.get_json(req.data) - return req, subject_assignment - - -def add_subject_assignment(client, policy_id, category_id): - data = { - "id": "id1", - "category_id": category_id, - "data_id": "data_id1" - } - req = client.post("/policies/{}/subject_assignments/{}".format(policy_id, category_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - subject_assignment = utilities.get_json(req.data) - return req, subject_assignment - - -def delete_subject_assignment(client, policy_id): - req = client.delete("/policies/{}/subject_assignments".format(policy_id)) - return req - - -def test_get_subject_assignment(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, subject_assignment = get_subject_assignment(client, policy_id) - assert req.status_code == 200 - assert isinstance(subject_assignment, dict) - assert "subject_assignments" in subject_assignment - - -def test_add_subject_assignment(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, subject_assignment = add_subject_assignment(client, policy_id, "111") - assert req.status_code == 200 - assert isinstance(subject_assignment, dict) - value = subject_assignment["subject_assignments"] - assert "subject_assignments" in subject_assignment - id = list(value.keys())[0] - assert value[id]['policy_id'] == policy_id - assert value[id]['category_id'] == "111" - assert value[id]['subject_id'] == "id1" - - -def test_delete_subject_assignment(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_subject_assignment(client, policy_id) - assert success_req.status_code == 200 - -# --------------------------------------------------------------------------- - -# object_categories_test - - -def get_object_assignment(client, policy_id): - req = client.get("/policies/{}/object_assignments".format(policy_id)) - object_assignment = utilities.get_json(req.data) - return req, object_assignment - - -def add_object_assignment(client, policy_id, category_id): - data = { - "id": "id1", - "category_id": category_id, - "data_id": "data_id1" - } - req = client.post("/policies/{}/object_assignments/{}".format(policy_id, category_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - object_assignment = utilities.get_json(req.data) - return req, object_assignment - - -def delete_object_assignment(client, policy_id): - req = client.delete("/policies/{}/object_assignments".format(policy_id)) - return req - - -def test_get_object_assignment(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, object_assignment = get_object_assignment(client, policy_id) - assert req.status_code == 200 - assert isinstance(object_assignment, dict) - assert "object_assignments" in object_assignment - - -def test_add_object_assignment(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, object_assignment = add_object_assignment(client, policy_id, "111") - assert req.status_code == 200 - assert isinstance(object_assignment, dict) - value = object_assignment["object_assignments"] - assert "object_assignments" in object_assignment - id = list(value.keys())[0] - assert value[id]['policy_id'] == policy_id - assert value[id]['category_id'] == "111" - assert value[id]['object_id'] == "id1" - - -def test_delete_object_assignment(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_object_assignment(client, policy_id) - assert success_req.status_code == 200 - -# --------------------------------------------------------------------------- - -# action_categories_test - - -def get_action_assignment(client, policy_id): - req = client.get("/policies/{}/action_assignments".format(policy_id)) - action_assignment = utilities.get_json(req.data) - return req, action_assignment - - -def add_action_assignment(client, policy_id, category_id): - data = { - "id": "id1", - "category_id": category_id, - "data_id": "data_id1" - } - req = client.post("/policies/{}/action_assignments/{}".format(policy_id, category_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - action_assignment = utilities.get_json(req.data) - return req, action_assignment - - -def delete_action_assignment(client, policy_id): - req = client.delete("/policies/{}/action_assignments".format(policy_id)) - return req - - -def test_get_action_assignment(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, action_assignment = get_action_assignment(client, policy_id) - assert req.status_code == 200 - assert isinstance(action_assignment, dict) - assert "action_assignments" in action_assignment - - -def test_add_action_assignment(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, action_assignment = add_action_assignment(client, policy_id, "111") - assert req.status_code == 200 - assert isinstance(action_assignment, dict) - value = action_assignment["action_assignments"] - assert "action_assignments" in action_assignment - id = list(value.keys())[0] - assert value[id]['policy_id'] == policy_id - assert value[id]['category_id'] == "111" - assert value[id]['action_id'] == "id1" - - -def test_delete_action_assignment(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_action_assignment(client, policy_id) - assert success_req.status_code == 200 - -# ---------------------------------------------------------------------------
\ No newline at end of file diff --git a/moon_manager/tests/unit_python/api/test_auth.py b/moon_manager/tests/unit_python/api/test_auth.py new file mode 100644 index 00000000..ee59bf5e --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_auth.py @@ -0,0 +1,71 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from falcon import HTTP_200, HTTP_204, HTTP_401 +import hug +import base64 +from uuid import uuid4 +from helpers import data_builder as builder + + +def test_get_auth(): + from moon_utilities.auth_functions import get_api_key_for_user + from moon_manager.api import auth + from moon_manager.api import policy + headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))} + req = hug.test.get(auth, 'auth/', headers=headers) + assert req.status == HTTP_200 + key = req.data + assert get_api_key_for_user("admin") == req.data + headers = {"x-api-key": key} + req = hug.test.get(policy, 'policies/', headers=headers) + assert req.status == HTTP_200 + + +def test_del_auth(): + from moon_utilities.auth_functions import get_api_key_for_user + from moon_manager.api import auth + from moon_manager.api import policy + headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))} + req = hug.test.get(auth, 'auth/', headers=headers) + assert req.status == HTTP_200 + key = req.data + headers = {"x-api-key": key} + req = hug.test.delete(auth, 'auth/', headers=headers) + assert req.status == HTTP_204 + req = hug.test.get(policy, 'policies/', headers=headers) + assert req.status == HTTP_401 + assert not get_api_key_for_user("admin") + + +def test_readd_auth(): + from moon_utilities.auth_functions import get_api_key_for_user + from moon_manager.api import auth + from moon_manager.api import policy + headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))} + req = hug.test.get(auth, 'auth/', headers=headers) + assert req.status == HTTP_200 + key = req.data + headers = {"x-api-key": key} + req = hug.test.delete(auth, 'auth/', headers=headers) + assert req.status == HTTP_204 + headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))} + req = hug.test.get(auth, 'auth/', headers=headers) + assert req.status == HTTP_200 + new_key = req.data + headers = {"x-api-key": new_key} + req = hug.test.get(policy, 'policies/', headers=headers) + assert req.status == HTTP_200 + assert get_api_key_for_user("admin") + assert get_api_key_for_user("admin") == new_key + assert get_api_key_for_user("admin") != key + diff --git a/moon_manager/tests/unit_python/api/test_data.py b/moon_manager/tests/unit_python/api/test_data.py index 87a80c69..019a8b45 100644 --- a/moon_manager/tests/unit_python/api/test_data.py +++ b/moon_manager/tests/unit_python/api/test_data.py @@ -1,46 +1,83 @@ -import api.utilities as utilities +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + import json +from uuid import uuid4 + +import hug +import pytest +from helpers import data_builder as builder +from helpers import policy_helper +from moon_utilities import exceptions # subject_categories_test -def get_subject_data(client, policy_id): - req = client.get("/policies/{}/subject_data".format(policy_id)) - subject_data = utilities.get_json(req.data) +def get_subject_data(policy_id, category_id=None): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if category_id is None: + req = hug.test.get(data, "/policies/{}/subject_data".format(policy_id), headers=auth_headers) + else: + req = hug.test.get(data, "/policies/{}/subject_data/{}".format(policy_id, category_id), headers=auth_headers) + subject_data = req.data return req, subject_data -def add_subject_data(client, name, policy_id, category_id): - data = { +def add_subject_data(name): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + body = { "name": name, "description": "description of {}".format(name) } - req = client.post("/policies/{}/subject_data/{}".format(policy_id, category_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - subject_data = utilities.get_json(req.data) + req = hug.test.post(data, "/policies/{}/subject_data/{}".format(policy_id, subject_category_id), + body=json.dumps(body), + headers={'Content-Type': 'application/json', "X-Api-Key": get_api_key_for_user("admin")}) + subject_data = req.data return req, subject_data -def delete_subject_data(client, policy_id): - req = client.delete("/policies/{}/subject_data".format(policy_id)) +def delete_subject_data(policy_id, category_id, data_id): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(data, "/policies/{}/subject_data/{}/{}".format(policy_id, category_id, + data_id), headers=auth_headers) return req def test_get_subject_data(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, subject_data = get_subject_data(client, policy_id) - assert req.status_code == 200 + policy = policy_helper.add_policies() + policy_id = next(iter(policy)) + req, subject_data = get_subject_data(policy_id) + assert req.status == hug.HTTP_200 assert isinstance(subject_data, dict) assert "subject_data" in subject_data def test_add_subject_data(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, subject_data = add_subject_data(client, "testuser", policy_id, "111") - assert req.status_code == 200 + req, subject_data = add_subject_data("testuser") + assert req.status == hug.HTTP_200 assert isinstance(subject_data, dict) value = subject_data["subject_data"]['data'] assert "subject_data" in subject_data @@ -49,120 +86,236 @@ def test_add_subject_data(): assert value[id]['description'] == "description of {}".format("testuser") +def test_add_subject_data_invalid_name(): + with pytest.raises(exceptions.DataContentError) as exception_info: + req, subject_data = add_subject_data(" ") + # assert req.status == hug.HTTP_400 + assert '400: Data Content Error' == str(exception_info.value) + with pytest.raises(exceptions.DataContentError) as exception_info: + req, subject_data = add_subject_data("") + # assert req.status == hug.HTTP_400 + assert '400: Data Content Error' == str(exception_info.value) + + def test_delete_subject_data(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_subject_data(client, policy_id) - assert success_req.status_code == 200 + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + data_id = builder.create_subject_data(policy_id, subject_category_id) + success_req = delete_subject_data(policy_id, subject_category_id, data_id) + assert success_req.status == hug.HTTP_200 + + +def test_add_subject_data_with_forbidden_char_in_user(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, subject_data = add_subject_data("<a>") + # assert '400: Invalid Content' == str(exception_info.value) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_subject_data_without_policy_id(): + success_req = delete_subject_data("", "", "") + assert success_req.status == hug.HTTP_405 -# --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- # object_categories_test -def get_object_data(client, policy_id): - req = client.get("/policies/{}/object_data".format(policy_id)) - object_data = utilities.get_json(req.data) +def get_object_data(policy_id, category_id=None): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if category_id is None: + req = hug.test.get(data, "/policies/{}/object_data".format(policy_id), headers=auth_headers) + else: + req = hug.test.get(data, "/policies/{}/object_data/{}".format(policy_id, category_id), headers=auth_headers) + object_data = req.data return req, object_data -def add_object_data(client, name, policy_id, category_id): - data = { +def add_object_data(name): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + body = { "name": name, "description": "description of {}".format(name) } - req = client.post("/policies/{}/object_data/{}".format(policy_id, category_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - object_data = utilities.get_json(req.data) + req = hug.test.post(data, "/policies/{}/object_data/{}".format(policy_id, object_category_id), + body=json.dumps(body), headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + object_data = req.data return req, object_data -def delete_object_data(client, policy_id): - req = client.delete("/policies/{}/object_data".format(policy_id)) +def delete_object_data(policy_id, category_id, data_id): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(data, "/policies/{}/object_data/{}/{}".format(policy_id, category_id, + data_id), headers=auth_headers) return req def test_get_object_data(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, object_data = get_object_data(client, policy_id) - assert req.status_code == 200 + policy = policy_helper.add_policies() + policy_id = next(iter(policy)) + req, object_data = get_object_data(policy_id) + assert req.status == hug.HTTP_200 assert isinstance(object_data, dict) assert "object_data" in object_data def test_add_object_data(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, object_data = add_object_data(client, "testuser", policy_id, "111") - assert req.status_code == 200 + req, object_data = add_object_data("testuser") + assert req.status == hug.HTTP_200 assert isinstance(object_data, dict) value = object_data["object_data"]['data'] assert "object_data" in object_data - id = list(value.keys())[0] - assert value[id]['value']['name'] == "testuser" - assert value[id]['value']['description'] == "description of {}".format("testuser") + _id = list(value.keys())[0] + assert value[_id]['name'] == "testuser" + assert value[_id]['description'] == "description of {}".format("testuser") + + +def test_add_object_data_invalid_name(): + with pytest.raises(exceptions.DataContentError) as exception_info: + req, object_data = add_object_data(" ") + # assert req.status == hug.HTTP_400 + assert '400: Data Content Error' == str(exception_info.value) + with pytest.raises(exceptions.DataContentError): + req, object_data = add_object_data("") + # assert req.status == hug.HTTP_400 + assert '400: Data Content Error' == str(exception_info.value) def test_delete_object_data(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_object_data(client, policy_id) - assert success_req.status_code == 200 + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + data_id = builder.create_object_data(policy_id, object_category_id) + success_req = delete_object_data(policy_id, data_id, object_category_id) + assert success_req.status == hug.HTTP_200 + + +def test_add_object_data_with_forbidden_char_in_user(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, subject_data = add_object_data("<a>") + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + # assert '400: Invalid Content' == str(exception_info.value) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) -# --------------------------------------------------------------------------- +def test_delete_object_data_without_policy_id(): + success_req = delete_object_data("", "", "") + assert success_req.status == hug.HTTP_405 + + +# --------------------------------------------------------------------------- # action_categories_test -def get_action_data(client, policy_id): - req = client.get("/policies/{}/action_data".format(policy_id)) - action_data = utilities.get_json(req.data) +def get_action_data(policy_id, category_id=None): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if category_id is None: + req = hug.test.get(data, "/policies/{}/action_data".format(policy_id), + headers=auth_headers) + else: + req = hug.test.get(data, "/policies/{}/action_data/{}".format(policy_id, category_id), + headers=auth_headers) + action_data = req.data return req, action_data -def add_action_data(client, name, policy_id, category_id): - data = { +def add_action_data(name): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + body = { "name": name, "description": "description of {}".format(name) } - req = client.post("/policies/{}/action_data/{}".format(policy_id, category_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - action_data = utilities.get_json(req.data) + req = hug.test.post(data, "/policies/{}/action_data/{}".format(policy_id, action_category_id), + body=json.dumps(body), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + action_data = req.data return req, action_data -def delete_action_data(client, policy_id): - req = client.delete("/policies/{}/action_data".format(policy_id)) +def delete_action_data(policy_id, categorgy_id, data_id): + from moon_manager.api import data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(data, "/policies/{}/action_data/{}/{}".format(policy_id, categorgy_id, + data_id), headers=auth_headers) return req def test_get_action_data(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, action_data = get_action_data(client, policy_id) - assert req.status_code == 200 + policy = policy_helper.add_policies() + policy_id = next(iter(policy)) + req, action_data = get_action_data(policy_id) + assert req.status == hug.HTTP_200 assert isinstance(action_data, dict) assert "action_data" in action_data def test_add_action_data(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, action_data = add_action_data(client, "testuser", policy_id, "111") - assert req.status_code == 200 + req, action_data = add_action_data("testuser") + assert req.status == hug.HTTP_200 assert isinstance(action_data, dict) value = action_data["action_data"]['data'] assert "action_data" in action_data id = list(value.keys())[0] - assert value[id]['value']['name'] == "testuser" - assert value[id]['value']['description'] == "description of {}".format("testuser") + assert value[id]['name'] == "testuser" + assert value[id]['description'] == "description of {}".format("testuser") + + +def test_add_action_data_invalid_name(): + + with pytest.raises(exceptions.DataContentError)as exception_info: + req, action_data = add_action_data(" ") + # assert req.status == hug.HTTP_400 + assert '400: Data Content Error' == str(exception_info.value) + with pytest.raises(exceptions.DataContentError) as exception_info: + req, action_data = add_action_data("") + # assert req.status == hug.HTTP_400 + assert '400: Data Content Error' == str(exception_info.value) def test_delete_action_data(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_action_data(client, policy_id) - assert success_req.status_code == 200 + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + data_id = builder.create_action_data(policy_id, action_category_id) + success_req = delete_action_data(policy_id, data_id, action_category_id) + assert success_req.status == hug.HTTP_200 + + +def test_add_action_data_with_forbidden_char_in_user(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, action_data = add_action_data("<a>") + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + # assert '400: Invalid Content' == str(exception_info.value) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) -# ---------------------------------------------------------------------------
\ No newline at end of file + +def test_delete_action_data_without_policy_id(): + success_req = delete_action_data("", "", "") + assert success_req.status == hug.HTTP_405 +# --------------------------------------------------------------------------- diff --git a/moon_manager/tests/unit_python/api/test_json_export.py b/moon_manager/tests/unit_python/api/test_json_export.py new file mode 100644 index 00000000..8de394c9 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_json_export.py @@ -0,0 +1,321 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import json +import api.utilities as utilities +import helpers.import_export_helper as import_export_helper +import hug + +MODEL_WITHOUT_META_RULES = {"models": [{"name": "test model", "description": "model description", "meta_rules": []}]} + +DATA = {"subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}] + } + +META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action action description"}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]} + + +SUBJECTS_OBJECTS_ACTIONS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name":"meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + **META_RULES + } + +SUBJECT_OBJECT_ACTION_CATEGORIES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}]} + +SUBJECT_OBJECT_ACTION_DATA = {**SUBJECTS_OBJECTS_ACTIONS, + **DATA + } +POLICIES = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + **META_RULES, + } + +ASSIGNMENTS = {**POLICIES, + **DATA, + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object e0", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action e0", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}], + "object_assignments": [{"object": {"name": "test object e0"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}], + "action_assignments": [{"action": {"name": "test action e0"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}]} + +RULES = {**POLICIES, + **DATA, + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object e1", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action e1", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}], + "object_assignments": [{"object": {"name": "test object e1"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}], + "action_assignments": [{"action": {"name": "test action e1"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}], + "rules": [{"meta_rule": {"name": "meta rule"}, "rule": {"subject_data": [{"name": "test " + "subject data"}], + "object_data": [{"name": "test object data"}], + "action_data": [{"name": "test action data"}]}, "policy": {"name":"test policy"}, "instructions": [{"decision": "grant"}], "enabled": True}] + } + + +def test_export_models(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + # import_export_helper.clean_all() + + req = hug.test.post(json_import, "/import", body=json.dumps( + MODEL_WITHOUT_META_RULES), headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")} ) + data = utilities.get_json(req.data) + assert all(e in data for e in MODEL_WITHOUT_META_RULES.keys()) + + req = hug.test.get(json_export, "/export", headers=auth_headers) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + + assert "content" in data + assert "models" in data["content"] + assert isinstance(data["content"]["models"], list) + assert len(data["content"]["models"]) == 1 + model = data["content"]["models"][0] + assert model["name"] == "test model" + assert model["description"] == "model description" + assert isinstance(model["meta_rules"], list) + assert len(model["meta_rules"]) == 0 + + +def test_export_policies(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + import_export_helper.clean_all() + req = hug.test.post(json_import, "/import", body=json.dumps( + POLICIES), headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = utilities.get_json(req.data) + assert all(e in data for e in POLICIES.keys()) + + req = hug.test.get(json_export, "/export", headers=auth_headers) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + + assert "content" in data + assert "policies" in data["content"] + assert isinstance(data["content"]["policies"], list) + assert len(data["content"]["policies"]) == 1 + policy = data["content"]["policies"][0] + assert policy["name"] == "test policy" + assert policy["genre"] == "authz" + assert policy["description"] == "policy description" + assert "model" in policy + assert "name" in policy["model"] + model = policy["model"] + assert model["name"] == "test model" + + +def test_export_subject_object_action(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + + import_export_helper.clean_all() + req = hug.test.post(json_import, "/import", body=json.dumps( + SUBJECTS_OBJECTS_ACTIONS) ,headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = utilities.get_json(req.data) + assert all(e in data for e in SUBJECTS_OBJECTS_ACTIONS.keys()) + + req = hug.test.get(json_export, "/export", headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "s" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + element = data["content"][key][0] + if type_element == "subject": + assert element["name"] == "testuser" + else: + assert element["name"] == "test "+ type_element + assert element["description"] == "description of the " + type_element + assert "policies" in element + assert isinstance(element["policies"], list) + assert len(element["policies"]) == 1 + assert isinstance(element["policies"][0], dict) + assert element["policies"][0]["name"] == "test policy" + assert isinstance(element["extra"], dict) + key_dict = "field_extra_" + type_element + value_dict = "value extra " + type_element + assert key_dict in element["extra"] + assert element["extra"][key_dict] == value_dict + + +def test_export_subject_object_action_categories(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + import_export_helper.clean_all() + req = hug.test.post(json_import, "/import", body=json.dumps( + SUBJECT_OBJECT_ACTION_CATEGORIES), headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = utilities.get_json(req.data) + assert all(e in data for e in SUBJECT_OBJECT_ACTION_CATEGORIES.keys()) + + req = hug.test.get(json_export, "/export", headers=auth_headers) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_categories" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + category = data["content"][key][0] + assert category["name"] == "test " + type_element + " categories" + assert category["description"] == type_element + " category description" + + +def test_export_subject_object_action_data(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + import_export_helper.clean_all() + req = hug.test.post(json_import, "/import", body=json.dumps( + SUBJECT_OBJECT_ACTION_DATA), headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = utilities.get_json(req.data) + assert all(e in data for e in SUBJECT_OBJECT_ACTION_DATA.keys()) + + req = hug.test.get(json_export, "/export", headers=auth_headers) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_data" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + data_elt = data["content"][key][0] + assert data_elt["name"] == "test " + type_element + " data" + assert data_elt["description"] == type_element + " data description" + assert isinstance(data_elt["policy"], dict) + assert data_elt["policy"]["name"] == "test policy" + assert isinstance(data_elt["category"], dict) + assert data_elt["category"]["name"] == "test " + type_element + " categories" + + +def test_export_assignments(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + import_export_helper.clean_all() + req = hug.test.post(json_import, "/import", body=json.dumps( + ASSIGNMENTS), headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = utilities.get_json(req.data) + assert all(e in data for e in ASSIGNMENTS.keys()) + + req = hug.test.get(json_export, "/export", headers=auth_headers) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_assignments" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + assignment_elt = data["content"][key][0] + assert type_element in assignment_elt + assert isinstance(assignment_elt[type_element], dict) + if type_element == "subject": + assert assignment_elt[type_element]["name"] == "testuser" + else: + assert assignment_elt[type_element]["name"] == "test " + type_element + " e0" + assert "category" in assignment_elt + assert isinstance(assignment_elt["category"], dict) + assert assignment_elt["category"]["name"] == "test " + type_element + " categories" + assert "assignments" in assignment_elt + assert isinstance(assignment_elt["assignments"], list) + assert len(assignment_elt["assignments"]) == 1 + assert assignment_elt["assignments"][0]["name"] == "test " + type_element + " data" + + import_export_helper.clean_all() + + +def test_export_rules(): + from moon_manager.api import json_import + from moon_manager.api import json_export + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + import_export_helper.clean_all() + req = hug.test.post(json_import, "/import", body=json.dumps( + RULES), headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = utilities.get_json(req.data) + assert all(e in data for e in RULES.keys()) + + req = hug.test.get(json_export, "/export", headers=auth_headers) + assert req.status == hug.HTTP_200 + data = utilities.get_json(req.data) + assert "content" in data + assert "rules" in data["content"] + assert isinstance(data["content"]["rules"], list) + assert len(data["content"]["rules"]) == 1 + rule = data["content"]["rules"][0] + assert "instructions" in rule + assert "decision" in rule["instructions"][0] + assert rule["instructions"][0]["decision"] == "grant" + assert "enabled" in rule + assert rule["enabled"] + assert "meta_rule" in rule + assert rule["meta_rule"]["name"] == "meta rule" + assert "policy" in rule + assert rule["policy"]["name"] == "test policy" + assert "rule" in rule + rule = rule["rule"] + assert "subject_data" in rule + assert isinstance(rule["subject_data"], list) + assert len(rule["subject_data"]) == 1 + assert rule["subject_data"][0]["name"] == "test subject data" + assert "object_data" in rule + assert isinstance(rule["object_data"], list) + assert len(rule["object_data"]) == 1 + assert rule["object_data"][0]["name"] == "test object data" + assert "action_data" in rule + assert isinstance(rule["action_data"], list) + assert len(rule["action_data"]) == 1 + assert rule["action_data"][0]["name"] == "test action data" diff --git a/moon_manager/tests/unit_python/api/test_json_import.py b/moon_manager/tests/unit_python/api/test_json_import.py new file mode 100644 index 00000000..3195eca3 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_json_import.py @@ -0,0 +1,832 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import api.test_models as test_models +import api.test_policies as test_policies +import api.test_meta_data as test_categories +import api.test_data as test_data +import api.test_meta_rules as test_meta_rules +import api.test_assignement as test_assignments +import api.test_rules as test_rules +import helpers.import_export_helper as import_export_helper +import helpers.policy_helper as policy_helper +import hug +import json +import pytest +from moon_utilities import exceptions + + +MODEL_WITHOUT_META_RULES = [ + {"models": [{"name": "test model", "description": "", "meta_rules": []}]}, + {"models": [{"name": "test model", "description": "new description", "meta_rules": [], + "override": True}]}, + {"models": [{"name": "test model", "description": "description not taken into account", + "meta_rules": [], "override": False}]} +] + +POLICIES = [ + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}]}, + {"policies": [{"name": "test policy", "genre": "authz", + "description": "new description not taken into account", + "model": {"name": "test model"}, "mandatory": True}]}, + {"policies": [ + {"name": "test policy", "genre": "not authz ?", "description": "generates an exception", + "model": {"name": "test model"}, "override": True}]}, + {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [ + {"name": "test policy", "genre": "not authz ?", "description": "changes taken into account", + "model": {"name": "test model"}, "override": True}]}, +] + +SUBJECTS = [{"subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {}, "mandatory": True}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": []}]}, + {"subjects": [{"name": "testuser", "description": "new description of the subject", + "extra": {"email": "new-email@test.com"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "subjects": [ + {"name": "testuser", "description": "description of the subject", "extra": {}, + "policies": [{"name": "test policy"}]}]}] + +OBJECTS = [ + {"objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": True}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": []}]}, + {"objects": [{"name": "test object", "description": "new description of the object", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, + "policies": [{"name": "test policy"}]}]}, +] + +ACTIONS = [{"actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {}, "mandatory": True}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": []}]}, + {"actions": [{"name": "test action", "description": "new description of the action", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", "model": {}, + "mandatory": False}], "actions": [ + {"name": "test action", "description": "description of the action", "extra": {}, + "policies": [{"name": "test policy"}]}]}] + +SUBJECT_CATEGORIES = [{"subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}]}, + {"subject_categories": [{"name": "test subject categories", + "description": "new subject category description"}]}] + +OBJECT_CATEGORIES = [{"object_categories": [ + {"name": "test object categories", "description": "object category description"}]}, + {"object_categories": [{"name": "test object categories", + "description": "new object category description"}]}] + +ACTION_CATEGORIES = [{"action_categories": [ + {"name": "test action categories", "description": "action category description"}]}, + {"action_categories": [{"name": "test action categories", + "description": "new action category description"}]}] + +# meta_rules import is needed otherwise the search for data do not work !!! +PRE_DATA = {"models": [{"name": "test model", "description": "", + "meta_rules": [{"name": "good meta rule"}, + {"name": "other good meta rule"}]}], + "policies": [ + {"name": "test other policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}, + {"name": "other test subject categories", + "description": "subject category description"}], + "object_categories": [ + {"name": "test object categories", "description": "object category description"}, + {"name": "other test object categories", + "description": "object category description"}], + "action_categories": [ + {"name": "test action categories", "description": "action category description"}, + {"name": "other test action categories", + "description": "action category description"}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}, + {"name": "other good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "other test subject categories"}], + "object_categories": [{"name": "other test object categories"}], + "action_categories": [{"name": "other test action categories"}]}]} + +SUBJECT_DATA = [{"subject_data": [ + {"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]}, + {"subject_data": [ + {"name": "not valid subject data", "description": "", "policies": [{}], + "category": {"name": "test subject categories"}}]}, + {"policies": [ + {"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "subject_data": [ + {"name": "one valid subject data", "description": "description", + "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test subject categories"}}]}] + +OBJECT_DATA = [{"object_data": [ + {"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]}, + {"object_data": [ + {"name": "not valid object data", "description": "", "policies": [{}], + "category": {"name": "test object categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "object_data": [ + {"name": "one valid object data", "description": "description", "policies": [{}], + "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test object categories"}}]}] + +ACTION_DATA = [{"action_data": [ + {"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]}, + {"action_data": [ + {"name": "not valid action data", "description": "", "policies": [{}], + "category": {"name": "test action categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], "action_data": [ + {"name": "one valid action data", "description": "description", "policies": [{}], + "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "description", + "policies": [{"name": "test policy"}], + "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "new description", + "policies": [{"name": "test other policy"}], + "category": {"name": "test action categories"}}]}] + +PRE_META_RULES = {"subject_categories": [ + {"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", + "description": "object category description"}], + "action_categories": [{"name": "test action categories", + "description": "object action description"}]} + +META_RULES = [{"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "not valid category"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "not valid category"}], + "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "not valid category"}]}]}, + {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}]}] + +PRE_ASSIGNMENTS = {"models": [ + {"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}], + "policies": [ + {"name": "test policy", "genre": "authz", "description": "description", + "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", + "description": "subject category description"}], + "object_categories": [{"name": "test object categories", + "description": "object category description"}], + "action_categories": [{"name": "test action categories", + "description": "object action description"}], + "subjects": [{"name": "testuser", "description": "description of the subject", + "extra": {}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", + "extra": {}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", + "extra": {}, "policies": [{"name": "test policy"}]}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", + "subject_categories": [{"name": "test subject categories"}], + "object_categories": [{"name": "test object categories"}], + "action_categories": [{"name": "test action categories"}]}], + "subject_data": [{"name": "subject data", "description": "test subject data", + "policies": [{"name": "test policy"}], + "category": {"name": "test subject categories"}}], + "object_data": [{"name": "object data", "description": "test object data", + "policies": [{"name": "test policy"}], + "category": {"name": "test object categories"}}], + "action_data": [{"name": "action data", "description": "test action data", + "policies": [{"name": "test policy"}], + "category": {"name": "test action categories"}}]} + +SUBJECT_ASSIGNMENTS = [ + {"subject_assignments": [ + {"subject": {"name": "unknown"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "subject data"}]}], + "exception": exceptions.InvalidJson + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "subject data"}]}], + "exception": exceptions.UnknownName + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"subject_assignments": [ + {"subject": {"name": "testuser"}, + "category": {"name": "test subject categories"}, + "assignments": [{"name": "subject data"}]}], + "exception": None + }] + +OBJECT_ASSIGNMENTS = [ + {"object_assignments": [ + {"object": {"name": "unknown"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "object data"}]}], + "exception": exceptions.InvalidJson + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "object data"}]}], + "exception": exceptions.UnknownName + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"object_assignments": [ + {"object": {"name": "test object"}, + "category": {"name": "test object categories"}, + "assignments": [{"name": "object data"}]}], + "exception": None + }] + +ACTION_ASSIGNMENTS = [ + {"action_assignments": [ + {"action": {"name": "unknown"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "action data"}]}], + "exception": exceptions.InvalidJson + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "unknown"}, + "assignments": [{"name": "action data"}]}], + "exception": exceptions.UnknownName + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "unknown"}]}], + "exception": exceptions.InvalidJson + }, + {"action_assignments": [ + {"action": {"name": "test action"}, + "category": {"name": "test action categories"}, + "assignments": [{"name": "action data"}]}], + "exception": None + }] + +RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test " + "policy"}, + "instructions": [{"decision": "grant"}], "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown " + "policy"}, + "instructions": [{"decision": "grant"}], "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": [{"decision": "grant"}], "enabled": True, "rule": { + "subject_data": [{"name": "unknown subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": [{"decision": "grant"}], "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "unknown object data"}], + "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": [{"decision": "grant"}], "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "unknown action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, + "instructions": [{"decision": "grant"}], "enabled": True, "rule": { + "subject_data": [{"name": "subject data"}], + "object_data": [{"name": "object data"}], + "action_data": [{"name": "action data"}]}}]}] + + +def test_import_models_without_new_meta_rules(): + from moon_utilities.auth_functions import get_api_key_for_user + + import_export_helper.clean_all() + counter = 0 + for models_description in MODEL_WITHOUT_META_RULES: + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(models_description) + , headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + data = req.data + assert all(e in data for e in models_description.keys()) + req, models = test_models.get_models() + models = models["models"] + assert len(list(models.keys())) == 1 + values = list(models.values()) + assert values[0]["name"] == "test model" + if counter == 0: + assert len(values[0]["description"]) == 0 + if counter == 1 or counter == 2: + assert values[0]["description"] == "new description" + counter = counter + 1 + import_export_helper.clean_all() + + +def test_import_policies(): + from moon_utilities.auth_functions import get_api_key_for_user + + import_export_helper.clean_all() + counter = -1 + for policy_description in POLICIES: + counter = counter + 1 + from moon_manager.api import json_import + if counter == 2: + with pytest.raises(exceptions.UnknownName): + req = hug.test.post(json_import, "/import", body=json.dumps(policy_description), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + continue + else: + req = hug.test.post(json_import, "/import", body=json.dumps(policy_description), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + data = req.data + assert all(e in data for e in policy_description.keys()) + + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = test_policies.get_policies(auth_headers) + policies = req.data + policies = policies["policies"] + assert len(list(policies.keys())) == 1 + values = list(policies.values()) + assert values[0]["name"] == "test policy" + if counter < 3: + assert values[0]["genre"] == "authz" + assert values[0]["description"] == "description" + else: + assert values[0]["genre"] == "not authz ?" + assert values[0]["description"] == "changes taken into account" + assert len(values[0]["model_id"]) > 0 + import_export_helper.clean_all() + + +def test_import_subject_object_action(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + type_elements = ["object", "action"] + perimeter_id = None + + for type_element in type_elements: + import_export_helper.clean_all() + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECTS + clean_method = import_export_helper.clean_subjects + name = "testuser" + key_extra = "email" + value_extra = "new-email@test.com" + elif type_element == "object": + elements = OBJECTS + clean_method = import_export_helper.clean_objects + name = "test object" + key_extra = "test" + value_extra = "test extra" + else: + elements = ACTIONS + clean_method = import_export_helper.clean_actions + name = "test action" + key_extra = "test" + value_extra = "test extra" + + for element in elements: + counter = counter + 1 + if counter == 2 or counter == 4: + clean_method() + + from moon_manager.api import perimeter + if counter == 3: + req = hug.test.patch(perimeter, "/{}s/{}".format(type_element, perimeter_id), + body=json.dumps(element["{}s".format(type_element)][0]), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + elif counter < 2: + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req = hug.test.patch(perimeter, "/{}s/{}".format(type_element, perimeter_id), + body=json.dumps(element["{}s".format(type_element)][0]), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + # assert req.status == hug.HTTP_400 + assert '400: Perimeter content is invalid.' == str(exception_info.value) + continue + else: + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(element), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + + try: + data = req.data + except Exception as e: + assert False + # assert counter < 2 # Â this is an expected failure + # continue + + if counter != 3: + assert any(e in data for e in element["{}s".format(type_element)][0].keys()) #NOTE: logs are skipped for some elements + + from moon_manager.api import perimeter + get_elements = hug.test.get(perimeter, "/" + type_element + "s", headers=auth_headers ).data + get_elements = get_elements[type_element + "s"] + + perimeter_id = list(get_elements.keys())[0] + + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == name + if counter == 2 or counter == 4: + assert values[0]["description"] == "description of the " + type_element + # assert not values[0]["extra"] + if counter == 3: + assert values[0]["description"] == "new description of the " + type_element + assert values[0]["extra"][key_extra] == value_extra + + # Â assert len(values[0]["policy_list"]) == 1 + import_export_helper.clean_all() + + +def test_import_subject_object_action_categories(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_CATEGORIES + get_method = test_categories.get_subject_categories + elif type_element == "object": + elements = OBJECT_CATEGORIES + get_method = test_categories.get_object_categories + else: + elements = ACTION_CATEGORIES + get_method = test_categories.get_action_categories + + for element in elements: + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(element), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")} ) + counter = counter + 1 + data = req.data + assert all(e in data for e in element.keys()) + req, get_elements = get_method() + get_elements = get_elements[type_element + "_categories"] + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == "test " + type_element + " categories" + assert values[0]["description"] == type_element + " category description" + + +def test_import_meta_rules(): + from moon_utilities.auth_functions import get_api_key_for_user + + import_export_helper.clean_all() + # import some categories + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(PRE_META_RULES), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + data = req.data + assert all(e in data for e in PRE_META_RULES.keys()) + + counter = -1 + for meta_rule in META_RULES: + counter = counter + 1 + if counter != 3: + with pytest.raises(exceptions.UnknownName) as exception_info: + req = hug.test.post(json_import, "/import", body=json.dumps(meta_rule), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + # assert req.status == hug.HTTP_400 + assert '400: Unknown Name.' == str(exception_info.value) + continue + else: + req = hug.test.post(json_import, "/import", body=json.dumps(meta_rule), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + data = req.data + assert all(e in data for e in meta_rule.keys()) + assert req.status == hug.HTTP_200 + + req, meta_rules = test_meta_rules.get_meta_rules() + meta_rules = meta_rules["meta_rules"] + key = list(meta_rules.keys())[0] + assert isinstance(meta_rules, dict) + assert meta_rules[key]["name"] == "good meta rule" + assert meta_rules[key]["description"] == "valid meta rule" + assert len(meta_rules[key]["subject_categories"]) == 1 + assert len(meta_rules[key]["object_categories"]) == 1 + assert len(meta_rules[key]["action_categories"]) == 1 + + subject_category_key = meta_rules[key]["subject_categories"][0] + object_category_key = meta_rules[key]["object_categories"][0] + action_category_key = meta_rules[key]["action_categories"][0] + + req, sub_cat = test_categories.get_subject_categories() + sub_cat = sub_cat["subject_categories"] + assert sub_cat[subject_category_key]["name"] == "test subject categories" + + req, ob_cat = test_categories.get_object_categories() + ob_cat = ob_cat["object_categories"] + assert ob_cat[object_category_key]["name"] == "test object categories" + + req, ac_cat = test_categories.get_action_categories() + ac_cat = ac_cat["action_categories"] + assert ac_cat[action_category_key]["name"] == "test action categories" + + import_export_helper.clean_all() + + +def test_import_subject_object_action_assignments(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + import_export_helper.clean_all() + + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(PRE_ASSIGNMENTS), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")} ) + data = req.data + assert any(e in data for e in PRE_ASSIGNMENTS.keys()) #NOTE: note assignment logs are skipped + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + counter = -1 + if type_element == "subject": + datas = SUBJECT_ASSIGNMENTS + get_method = test_assignments.get_subject_assignment + elif type_element == "object": + datas = OBJECT_ASSIGNMENTS + get_method = test_assignments.get_object_assignment + else: + datas = ACTION_ASSIGNMENTS + get_method = test_assignments.get_action_assignment + + for assignments in datas: + counter = counter + 1 + my_exception = assignments.pop("exception") + if my_exception: + with pytest.raises(my_exception) as exception_info: + req = hug.test.post(json_import, "/import", body=json.dumps(assignments), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + assert '400:' in str(exception_info.value) + else: + req = hug.test.post(json_import, "/import", body=json.dumps(assignments), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + assert len(assignments.keys()) > 0 #NOTE logs for assignments are skipped + assert req.status == hug.HTTP_200 + req = test_policies.get_policies(auth_headers=auth_headers) + policies = req.data + for policy_key in policies["policies"]: + req, get_assignments = get_method(policy_key) + get_assignments = get_assignments[type_element + "_assignments"] + assert len(get_assignments) == 1 + + +def test_import_rules(): + from moon_utilities.auth_functions import get_api_key_for_user + + import_export_helper.clean_all() + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(PRE_ASSIGNMENTS), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + data = req.data + assert all(e in data for e in PRE_ASSIGNMENTS.keys()) + + counter = -1 + for rule in RULES: + counter = counter + 1 + from moon_manager.api import json_import + if counter < 5: + with pytest.raises(exceptions.UnknownName) as exception_info: + req = hug.test.post(json_import, "/import", body=json.dumps(rule), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + + # assert req.status == hug.HTTP_400 + assert '400: Unknown Name.' == str(exception_info.value) + continue + req = hug.test.post(json_import, "/import", body=json.dumps(rule), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + + assert req.status == hug.HTTP_200 + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies = test_policies.get_policies(auth_headers=auth_headers).data + for policy in policies['policies']: + if policies['policies'][policy]['name'] == rule['rules'][0]['policy']['name']: + policy_id = policy + break + + req, rules = test_rules.test_get_rules(policy_id) + rules = rules["rules"] + rules = rules["rules"] + assert len(rules) == 1 + rules = rules[0] + assert rules["enabled"] + assert rules["instructions"][0]["decision"] == "grant" + + req, meta_rules = test_meta_rules.get_meta_rules() + assert meta_rules["meta_rules"][list(meta_rules["meta_rules"].keys())[0]][ + "name"] == "good meta rule" + + +def test_import_subject_object_action_data(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_helper.clean_all() + from moon_manager.api import json_import + req = hug.test.post(json_import, "/import", body=json.dumps(PRE_DATA), + headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_DATA + get_method = test_data.get_subject_data + get_categories = test_categories.get_subject_categories + elif type_element == "object": + elements = OBJECT_DATA + get_method = test_data.get_object_data + get_categories = test_categories.get_object_categories + else: + elements = ACTION_DATA + get_method = test_data.get_action_data + get_categories = test_categories.get_action_categories + + for element in elements: + from moon_manager.api import json_import + counter = counter + 1 + if counter == 0 or counter == 1: + with pytest.raises(exceptions.MissingIdOrName) as exception_info: + req = hug.test.post(json_import, "/import", body=json.dumps(element), headers={ + 'Content-Type': 'application/json', "X-Api-Key": get_api_key_for_user("admin")}) + # assert req.status == hug.HTTP_400 + assert '400: Missing ID or Name.' == str(exception_info.value) + continue + else: + req = hug.test.post(json_import, "/import", body=json.dumps(element), headers={ + 'Content-Type': 'application/json', "X-Api-Key": get_api_key_for_user("admin")}) + assert req.status == hug.HTTP_200 + data = req.data + assert all(e in data for e in element.keys()) + + req = test_policies.get_policies(auth_headers=auth_headers) + policies = req.data + policies = policies["policies"] + req, categories = get_categories() + categories = categories[type_element + "_categories"] + case_tested = False + for policy_key in policies.keys(): + policy = policies[policy_key] + for category_key in categories: + req, get_elements = get_method(policy_id=policy_key, + category_id=category_key) + if len(get_elements[type_element + "_data"]) == 0: + continue + + # do this because the backend gives an element with empty data if the policy_key, + # category_key couple does not have any data... + get_elements = get_elements[type_element + "_data"] + if len(get_elements[0]["data"]) == 0: + continue + + if policy["name"] == "test policy": + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + if counter == 2: + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "one valid " + type_element + " data" + if counter == 3: + assert len(el["data"].keys()) == 2 + el1 = el["data"][list(el["data"].keys())[0]] + el2 = el["data"][list(el["data"].keys())[1]] + if "value" in el1: + el1 = el1["value"] + el2 = el2["value"] + assert (el1["name"] == "one valid " + type_element + " data" and el2[ + "name"] == "valid " + type_element + " data") or (el2[ + "name"] == "one valid " + type_element + " data" and + el1[ + "name"] == "valid " + type_element + " data") + assert el1["description"] == "description" + assert el2["description"] == "description" + + case_tested = True + + if policy["name"] == "test other policy": + if counter == 4: + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "valid " + type_element + " data" + assert el["description"] == "new description" + case_tested = True + + assert case_tested is True + + +def test_clean(): + import_export_helper.clean_all() + # restore the database as previously + policy_helper.add_policies() diff --git a/moon_manager/tests/unit_python/api/test_keystone.py b/moon_manager/tests/unit_python/api/test_keystone.py new file mode 100644 index 00000000..5ed08ca7 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_keystone.py @@ -0,0 +1,63 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + + +def create_project(tenant_dict): + from moon_manager.pip_driver import InformationManager + return InformationManager["subjects"][0].create_project(**tenant_dict) + + +def list_projects(): + from moon_manager.pip_driver import InformationManager + return InformationManager["subjects"][0].get_projects() + + +def create_user(subject_dict): + from moon_manager.pip_driver import InformationManager + return InformationManager["subjects"][0].add_item(**subject_dict) + + +def test_create_project(): + tenant_dict = { + "description": "test_project", + "domain": ['domain_id_1'], + "enabled": True, + "is_domain": False, + "name": 'project_1' + } + project = create_project(tenant_dict) + assert project + assert project.get('name') == tenant_dict.get('name') + +# TODO TO BE UPDATED +# def test_create_project_without_name(): +# tenant_dict = { +# "description": "test_project", +# "domain_id": ['domain_id_1'], +# "enabled": True, +# "is_domain": False, +# } +# with pytest.raises(Exception) as exception_info: +# create_project(tenant_dict) +# assert '400: Keystone project error' == str(exception_info.value) + + +def test_create_user(): + subject_dict = { + "password": "password", + "domain": ['domain_id_1'], + "enabled": True, + "project": 'test_project', + "name": 'user_id_1' + } + user = create_user(subject_dict) + assert user diff --git a/moon_manager/tests/unit_python/api/test_meta_data.py b/moon_manager/tests/unit_python/api/test_meta_data.py new file mode 100644 index 00000000..1d37ab70 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_meta_data.py @@ -0,0 +1,370 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import json +from helpers import data_builder +from uuid import uuid4 +import pytest +from moon_utilities import exceptions + +# subject_categories_test + + +def get_subject_categories(): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(meta_data, "/subject_categories", headers=auth_headers ) + subject_categories = req.data + return req, subject_categories + + +def add_subject_categories(name): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + + data = { + "name": name, + "description": "description of {}".format(name) + } + req = hug.test.post(meta_data, "/subject_categories", body=json.dumps(data), + headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")}) + + subject_categories = req.data + return req, subject_categories + + +def delete_subject_categories(name): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + request, subject_categories = get_subject_categories() + for key, value in subject_categories['subject_categories'].items(): + if value['name'] == name: + return hug.test.delete(meta_data, "/subject_categories/{}".format(key), headers=auth_headers ) + return hug.test.delete(meta_data, "/subject_categories/{}".format(name), headers=auth_headers ) + + +def test_get_subject_categories(): + req, subject_categories = get_subject_categories() + assert req.status == hug.HTTP_200 + assert isinstance(subject_categories, dict) + assert "subject_categories" in subject_categories + + +def test_add_subject_categories(): + name = "testuser" + uuid4().hex + req, subject_categories = add_subject_categories(name) + assert req.status == hug.HTTP_200 + assert isinstance(subject_categories, dict) + value = list(subject_categories["subject_categories"].values())[0] + assert "subject_categories" in subject_categories + assert value['name'] == name + assert value['description'] == "description of {}".format(name) + + +def test_add_subject_categories_with_existed_name(): + name = uuid4().hex + req, subject_categories = add_subject_categories(name) + assert req.status == hug.HTTP_200 + with pytest.raises(exceptions.SubjectCategoryExisting) as exception_info: + req, subject_categories = add_subject_categories(name) + assert '409: Subject Category Existing' == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data['message'] == '409: Subject Category Existing' + + +def test_add_subject_categories_name_contain_space(): + with pytest.raises(exceptions.CategoryNameInvalid) as exception_info: + req, subject_categories = add_subject_categories(" ") + assert '400: Category Name Invalid' == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == '400: Category Name Invalid' + + +def test_add_subject_categories_with_empty_name(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, subject_categories = add_subject_categories("<a>") + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]" + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + + +def test_add_subject_categories_with_name_contain_space(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, subject_categories = add_subject_categories("test<z>user") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_subject_categories(): + name = "testuser" + uuid4().hex + add_subject_categories(name) + req = delete_subject_categories(name) + assert req.status == hug.HTTP_200 + + +def test_delete_subject_categories_without_id(): + with pytest.raises(exceptions.SubjectCategoryUnknown) as exception_info: + req = delete_subject_categories(uuid4().hex) + assert "400: Subject Category Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "400: Subject Category Unknown" + + +# --------------------------------------------------------------------------- +# object_categories_test + +def get_object_categories(): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(meta_data, "/object_categories", headers=auth_headers ) + object_categories = req.data + return req, object_categories + + +def add_object_categories(name): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + + data = { + "name": name, + "description": "description of {}".format(name) + } + req = hug.test.post(meta_data, "/object_categories", body=json.dumps(data), + headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")} ) + object_categories = req.data + return req, object_categories + + +def delete_object_categories(name): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + request, object_categories = get_object_categories() + for key, value in object_categories['object_categories'].items(): + if value['name'] == name: + return hug.test.delete(meta_data, "/object_categories/{}".format(key), + headers=auth_headers ) + return hug.test.delete(meta_data, "/object_categories/{}".format(name), headers=auth_headers ) + + +def test_get_object_categories(): + req, object_categories = get_object_categories() + assert req.status == hug.HTTP_200 + assert isinstance(object_categories, dict) + assert "object_categories" in object_categories + + +def test_add_object_categories(): + name="testuser"+uuid4().hex + req, object_categories = add_object_categories(name) + assert req.status == hug.HTTP_200 + assert isinstance(object_categories, dict) + value = list(object_categories["object_categories"].values())[0] + assert "object_categories" in object_categories + assert value['name'] == name + assert value['description'] == "description of {}".format(name) + + +def test_add_object_categories_with_existed_name(): + name = uuid4().hex + req, object_categories = add_object_categories(name) + assert req.status == hug.HTTP_200 + with pytest.raises(exceptions.ObjectCategoryExisting) as exception_info: + req, object_categories = add_object_categories(name) + assert "409: Object Category Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data['message'] == '409: Object Category Existing' + + +def test_add_object_categories_name_contain_space(): + with pytest.raises(exceptions.CategoryNameInvalid) as exception_info: + req, subject_categories = add_object_categories(" ") + assert "400: Category Name Invalid" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == '400: Category Name Invalid' + + +def test_add_object_categories_with_empty_name(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, object_categories = add_object_categories("<a>") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_object_categories_with_name_contain_space(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, object_categories = add_object_categories("test<a>user") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_object_categories(): + name = uuid4().hex + add_object_categories(name) + req = delete_object_categories(name) + assert req.status == hug.HTTP_200 + + +def test_delete_object_categories_without_id(): + with pytest.raises(exceptions.ObjectCategoryUnknown) as exception_info: + req = delete_object_categories(uuid4().hex) + assert "400: Object Category Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "400: Object Category Unknown" + + +# --------------------------------------------------------------------------- +# action_categories_test + +def get_action_categories(): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(meta_data, "/action_categories", headers=auth_headers ) + action_categories = req.data + return req, action_categories + + +def add_action_categories(name): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + + data = { + "name": name, + "description": "description of {}".format(name) + } + req = hug.test.post(meta_data, "/action_categories", body=json.dumps(data), + headers={'Content-Type': 'application/json', "X-Api-Key": + get_api_key_for_user("admin")} ) + action_categories = req.data + return req, action_categories + + +def delete_action_categories(name): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + request, action_categories = get_action_categories() + for key, value in action_categories['action_categories'].items(): + if value['name'] == name: + return hug.test.delete(meta_data, "/action_categories/{}".format(key), headers=auth_headers ) + return hug.test.delete(meta_data, "/action_categories/{}".format(name), headers=auth_headers ) + + +def test_get_action_categories(): + req, action_categories = get_action_categories() + assert req.status == hug.HTTP_200 + assert isinstance(action_categories, dict) + assert "action_categories" in action_categories + + +def test_add_action_categories(): + name = "testuser" + uuid4().hex + req, action_categories = add_action_categories(name) + assert req.status == hug.HTTP_200 + assert isinstance(action_categories, dict) + value = list(action_categories["action_categories"].values())[0] + assert "action_categories" in action_categories + assert value['name'] == name + assert value['description'] == "description of {}".format(name) + + +def test_add_action_categories_with_existed_name(): + name = uuid4().hex + req, action_categories = add_action_categories(name) + assert req.status == hug.HTTP_200 + with pytest.raises(exceptions.ActionCategoryExisting) as exception_info: + req, action_categories = add_action_categories(name) + assert "409: Action Category Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data['message'] == '409: Action Category Existing' + + +def test_add_action_categories_name_contain_space(): + with pytest.raises(exceptions.CategoryNameInvalid) as exception_info: + req, subject_categories = add_action_categories(" ") + assert "400: Category Name Invalid" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == '400: Category Name Invalid' + + +def test_add_action_categories_with_empty_name(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, action_categories = add_action_categories("<a>") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_action_categories_with_name_contain_space(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, action_categories = add_action_categories("test<a>user") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_action_categories(): + name = "testuser" + uuid4().hex + add_action_categories(name) + req = delete_action_categories(name) + assert req.status == hug.HTTP_200 + + +def test_delete_action_categories_without_id(): + with pytest.raises(exceptions.ActionCategoryUnknown) as exception_info: + req = delete_action_categories(uuid4().hex) + assert "400: Action Category Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == "400: Action Category Unknown" + + +def test_delete_data_categories_connected_to_meta_rule(): + from moon_manager.api import meta_data + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + + with pytest.raises(exceptions.DeleteSubjectCategoryWithMetaRule) as exception_info: + req = hug.test.delete(meta_data, "/subject_categories/{}".format(subject_category_id), + headers=auth_headers ) + assert "400: Subject Category With Meta Rule Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == '400: Subject Category With Meta Rule Error' + + with pytest.raises(exceptions.DeleteObjectCategoryWithMetaRule) as exception_info: + req = hug.test.delete(meta_data, "/object_categories/{}".format(object_category_id), headers=auth_headers) + assert "400: Object Category With Meta Rule Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == '400: Object Category With Meta Rule Error' + + with pytest.raises(exceptions.DeleteActionCategoryWithMetaRule) as exception_info: + req = hug.test.delete(meta_data, "/action_categories/{}".format(action_category_id), headers=auth_headers) + assert "400: Action Category With Meta Rule Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data['message'] == '400: Action Category With Meta Rule Error' diff --git a/moon_manager/tests/unit_python/api/test_meta_rules.py b/moon_manager/tests/unit_python/api/test_meta_rules.py new file mode 100644 index 00000000..6c6797f5 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_meta_rules.py @@ -0,0 +1,687 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import api.utilities as utilities +from helpers import category_helper +from helpers import data_builder +from helpers import policy_helper +from helpers import model_helper +from helpers import meta_rule_helper +from uuid import uuid4 +import pytest +from moon_utilities import exceptions + + +def get_meta_rules(): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(meta_rules, "/meta_rules", headers=auth_headers) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules(name, data=None): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if not data: + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id = list(action_category.keys())[0] + + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + req = hug.test.post(meta_rules, "/meta_rules", body=data, + headers=auth_headers) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules_without_category_ids(name): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + data = { + "name": name + uuid4().hex, + "subject_categories": [], + "object_categories": [], + "action_categories": [] + } + req = hug.test.post(meta_rules, "/meta_rules", body=data, + headers=auth_headers) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules(name, metaRuleId, data=None): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if not data: + subject_category = category_helper.add_subject_category( + value={"name": "subject category name update" + uuid4().hex, + "description": "description 1"}) + subject_category_id = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name update" + uuid4().hex, + "description": "description 1"}) + object_category_id = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name update" + uuid4().hex, + "description": "description 1"}) + action_category_id = list(action_category.keys())[0] + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + + req = hug.test.patch(meta_rules, "/meta_rules/{}".format(metaRuleId), body=data, + headers=auth_headers) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules_with_categories(name, data=None, meta_rule_id=None): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if not meta_rule_id: + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + + req = hug.test.patch(meta_rules, "/meta_rules/{}".format(meta_rule_id), body=data, + headers=auth_headers) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def delete_meta_rules(name): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + request, meta_rules_data = get_meta_rules() + for key, value in meta_rules_data['meta_rules'].items(): + if value['name'] == name: + return hug.test.delete(meta_rules, "/meta_rules/{}".format(key), headers=auth_headers) + + +def delete_meta_rules_without_id(): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.delete(meta_rules, "/meta_rules/{}".format(""), headers=auth_headers) + return req + + +def test_get_meta_rules(): + req, meta_rules = get_meta_rules() + assert req.status == hug.HTTP_200 + assert isinstance(meta_rules, dict) + assert "meta_rules" in meta_rules + + +def test_add_meta_rules(): + meta_rule_name = uuid4().hex + req, meta_rules = add_meta_rules(meta_rule_name) + assert req.status == hug.HTTP_200 + assert isinstance(meta_rules, dict) + value = list(meta_rules["meta_rules"].values())[0] + assert "meta_rules" in meta_rules + assert value['name'] == meta_rule_name + + +def test_add_meta_rules_space_name(): + with pytest.raises(exceptions.MetaRuleContentError) as exception_info: + req, meta_rules = add_meta_rules(" ") + assert "400: Meta Rule Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Meta Rule Error' + + +def test_add_meta_rules_empty_name(): + with pytest.raises(exceptions.MetaRuleContentError) as exception_info: + req, meta_rules = add_meta_rules("") + assert "400: Meta Rule Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Meta Rule Error' + + +def test_add_two_meta_rules_with_same_categories_combination(): + meta_rule_name = uuid4().hex + req, meta_rules = add_meta_rules(meta_rule_name) + data = None + assert req.status == hug.HTTP_200 + for meta_rule_id in meta_rules['meta_rules']: + if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name: + data = meta_rules['meta_rules'][meta_rule_id] + + assert data + data['name'] = uuid4().hex + with pytest.raises(exceptions.MetaRuleExisting) as exception_info: + req, meta_rules = add_meta_rules(name=data['name'], data=data) + assert "409: Meta Rule Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Meta Rule Existing' + + +def test_add_three_meta_rules_with_different_combination_but_similar_items(): + meta_rule_name1 = uuid4().hex + req, meta_rules = add_meta_rules(meta_rule_name1) + assert req.status == hug.HTTP_200 + for meta_rule_id in meta_rules['meta_rules']: + if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name1: + data = meta_rules['meta_rules'][meta_rule_id] + break + + meta_rule_name2 = uuid4().hex + + req, meta_rules = add_meta_rules(meta_rule_name2) + + for meta_rule_id in meta_rules['meta_rules']: + if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name2: + data['subject_categories'] += meta_rules['meta_rules'][meta_rule_id][ + 'subject_categories'] + data['object_categories'] += meta_rules['meta_rules'][meta_rule_id]['object_categories'] + data['action_categories'] += meta_rules['meta_rules'][meta_rule_id]['action_categories'] + break + + data['name'] = uuid4().hex + + req, meta_rules = add_meta_rules(name=data['name'], data=data) + assert req.status == hug.HTTP_200 + + +def test_add_two_meta_rules_with_different_combination_but_similar_items(): + meta_rule_name1 = uuid4().hex + meta_rule_name2 = uuid4().hex + + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id1 = list(subject_category.keys())[0] + + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id1 = list(object_category.keys())[0] + + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id1 = list(action_category.keys())[0] + + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id2 = list(subject_category.keys())[0] + + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id2 = list(object_category.keys())[0] + + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id2 = list(action_category.keys())[0] + + data = { + "name": meta_rule_name1, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1, object_category_id2], + "action_categories": [action_category_id1, action_category_id2] + } + req, meta_rules = add_meta_rules(meta_rule_name1, data=data) + assert req.status == hug.HTTP_200 + data = { + "name": meta_rule_name2, + "subject_categories": [subject_category_id2], + "object_categories": [object_category_id1], + "action_categories": [action_category_id2] + } + + req, meta_rules = add_meta_rules(meta_rule_name1, data=data) + assert req.status == hug.HTTP_200 + + +# This test Succeed as it's okay to have empty id in adding meta rule, as it is not attached to model yet +def test_add_meta_rules_with_empty_subject_in_mid(): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + value = meta_rule_helper.get_body_meta_rule_with_empty_category_in_mid('subject') + with pytest.raises(exceptions.SubjectCategoryUnknown) as exception_info: + req = hug.test.post(meta_rules, "/meta_rules", body=value, + headers=auth_headers) + # assert req.status == hug.HTTP_200 + assert str(exception_info.value) == "400: Subject Category Unknown" + + +def test_add_meta_rules_with_empty_object_in_mid(): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + value = meta_rule_helper.get_body_meta_rule_with_empty_category_in_mid('object') + with pytest.raises(exceptions.ObjectCategoryUnknown) as exception_info: + req = hug.test.post(meta_rules, "/meta_rules", body=value, + headers=auth_headers) + assert str(exception_info.value) == "400: Object Category Unknown" + + +def test_add_meta_rules_with_empty_action_in_mid(): + from moon_manager.api import meta_rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + value = meta_rule_helper.get_body_meta_rule_with_empty_category_in_mid('action') + with pytest.raises(exceptions.ActionCategoryUnknown) as exception_info: + req = hug.test.post(meta_rules, "/meta_rules", body=value, + headers=auth_headers) + assert str(exception_info.value) == "400: Action Category Unknown" + + +def test_add_meta_rule_with_existing_name_error(): + name = uuid4().hex + req, meta_rules = add_meta_rules(name) + assert req.status == hug.HTTP_200 + with pytest.raises(exceptions.MetaRuleExisting) as exception_info: + req, meta_rules = add_meta_rules(name) + assert "409: Meta Rule Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Meta Rule Existing' + + +def test_add_meta_rules_with_forbidden_char_in_name(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, meta_rules = add_meta_rules("<a>") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_meta_rules_with_blank_name(): + with pytest.raises(exceptions.MetaRuleContentError) as exception_info: + req, meta_rules = add_meta_rules("") + assert "400: Meta Rule Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Meta Rule Error' + + +def test_add_meta_rules_without_subject_categories(): + name_meta_rule = uuid4().hex + req, meta_rules = add_meta_rules_without_category_ids(name_meta_rule) + assert req.status == hug.HTTP_200 + + +def test_delete_meta_rules(): + name_meta_rule = uuid4().hex + req, meta_rules = add_meta_rules_without_category_ids(name_meta_rule) + meta_rule_id = next(iter(meta_rules['meta_rules'])) + req = delete_meta_rules(meta_rules['meta_rules'][meta_rule_id]['name']) + assert req.status == hug.HTTP_200 + + +def test_delete_meta_rules_without_id(): + with pytest.raises(exceptions.MetaRuleUnknown) as exception_info: + req = delete_meta_rules_without_id() + assert "400: Meta Rule Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Meta Rule Unknown" + + +def test_update_meta_rules(): + name = "testuser" + uuid4().hex + req = add_meta_rules(name) + meta_rule_id = list(req[1]['meta_rules'])[0] + req_update = update_meta_rules(name, meta_rule_id) + assert req_update[0].status == hug.HTTP_200 + delete_meta_rules("testuser") + get_meta_rules() + + +def test_update_meta_rules_empty_name(): + req = add_meta_rules("testuser" + uuid4().hex) + meta_rule_id = list(req[1]['meta_rules'])[0] + with pytest.raises(exceptions.MetaRuleContentError) as exception_info: + req_update = update_meta_rules("", meta_rule_id) + assert "400: Meta Rule Error" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Meta Rule Error' + + +def test_update_meta_rules_space_name(): + req = add_meta_rules("testuser" + uuid4().hex) + meta_rule_id = list(req[1]['meta_rules'])[0] + with pytest.raises(exceptions.MetaRuleContentError) as exception_info: + req_update = update_meta_rules(" ", meta_rule_id) + assert "400: Meta Rule Error" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Meta Rule Error' + + +def test_update_meta_rule_with_combination_existed(): + meta_rule_name1 = uuid4().hex + req, meta_rules = add_meta_rules(meta_rule_name1) + meta_rule_id1 = next(iter(meta_rules['meta_rules'])) + data1 = meta_rules['meta_rules'][meta_rule_id1] + + meta_rule_name2 = uuid4().hex + req, meta_rules = add_meta_rules(meta_rule_name2) + meta_rule_id2 = next(iter(meta_rules['meta_rules'])) + data2 = meta_rules['meta_rules'][meta_rule_id2] + data1['name'] = data2['name'] + with pytest.raises(exceptions.MetaRuleExisting) as exception_info: + req_update = update_meta_rules(name=meta_rule_name2, metaRuleId=meta_rule_id2, + data=data1) + assert "409: Meta Rule Existing" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_409 + # assert req_update[1]['message'] == '409: Meta Rule Existing' + + +def test_update_meta_rule_with_different_combination_but_same_data(): + meta_rule_name1 = uuid4().hex + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id1 = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id1 = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id1 = list(action_category.keys())[0] + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id2 = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id2 = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id2 = list(action_category.keys())[0] + + data = { + "name": meta_rule_name1, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1, object_category_id2], + "action_categories": [action_category_id1, action_category_id2] + } + req, meta_rules = add_meta_rules(meta_rule_name1, data=data) + assert req.status == hug.HTTP_200 + + meta_rule_name2 = uuid4().hex + req, meta_rules = add_meta_rules(meta_rule_name2) + meta_rule_id2 = next(iter(meta_rules['meta_rules'])) + data2 = { + "name": meta_rule_name2, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1], + "action_categories": [action_category_id1, action_category_id2] + } + + req_update = update_meta_rules(name=meta_rule_name2, metaRuleId=meta_rule_id2, + data=data2) + assert req_update[0].status == hug.HTTP_200 + + +def test_update_meta_rules_without_id(): + with pytest.raises(exceptions.MetaRuleUnknown) as exception_info: + req_update = update_meta_rules("testuser", "") + assert "400: Meta Rule Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[0].data["message"] == "400: Meta Rule Unknown" + + +def test_update_meta_rules_without_name(): + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req_update = update_meta_rules("<br/>", "1234567") + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[0].data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_update_meta_rules_without_categories(): + req_update = update_meta_rules_with_categories("testuser" + uuid4().hex) + assert req_update[0].status == hug.HTTP_200 + + +def test_update_meta_rules_with_empty_categories(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [""], + "object_categories": [""], + "action_categories": [""] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status == hug.HTTP_200 + # assert "400: Subject Category Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Subject Category Unknown' + + +def test_update_meta_rules_with_blank_subject_categories(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser1", + "subject_categories": [], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + + assert req_update[0].status == hug.HTTP_200 + + +def test_update_meta_rules_with_blank_object_categories(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser1", + "subject_categories": [subject_category_id], + "object_categories": [], + "action_categories": [action_category_id] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + + assert req_update[0].status == hug.HTTP_200 + + +def test_update_meta_rules_with_blank_action_categories(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser1", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + + assert req_update[0].status == hug.HTTP_200 + + +def test_update_meta_rules_with_empty_subject_category(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [""], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status == hug.HTTP_200 + # assert "400: Subject Category Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Subject Category Unknown' + + +def test_update_meta_rules_with_empty_action_category(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [""] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status == hug.HTTP_200 + # assert "400: Action Category Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Action Category Unknown' + + +def test_update_meta_rules_with_empty_object_category(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [subject_category_id], + "object_categories": [""], + "action_categories": [action_category_id] + } + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + + assert req_update[0].status == hug.HTTP_200 + # assert "400: Object Category Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Object Category Unknown' + + +def test_update_meta_rules_with_categories_and_one_empty(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [subject_category_id, ""], + "object_categories": [object_category_id, ""], + "action_categories": [action_category_id, ""] + } + with pytest.raises(exceptions.SubjectCategoryUnknown) as exception_info: + req_update = update_meta_rules_with_categories("testuser", data=data, + meta_rule_id=meta_rule_id) + assert "400: Subject Category Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Subject Category Unknown' + + +def test_add_one_meta_rules_with_different_combination_but_similar_items(): + meta_rule_name1 = uuid4().hex + + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id1 = list(subject_category.keys())[0] + + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id1 = list(object_category.keys())[0] + + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id1 = list(action_category.keys())[0] + + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id2 = list(subject_category.keys())[0] + + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id2 = list(object_category.keys())[0] + + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id2 = list(action_category.keys())[0] + + data = { + "name": meta_rule_name1, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1, object_category_id2], + "action_categories": [action_category_id1, action_category_id2] + } + req, meta_rules = add_meta_rules(meta_rule_name1, data=data) + assert req.status == hug.HTTP_200 + + value = { + "name": "name_model", + "description": "test", + "meta_rules": [next(iter(meta_rules['meta_rules']))] + } + mode_id = next(iter(model_helper.add_model(value=value))) + + value = { + "name": "test_policy" + uuid4().hex, + "model_id": mode_id, + "genre": "authz", + "description": "test", + } + + policy_id = next(iter(policy_helper.add_policies(value=value))) + + data_id_1 = data_builder.create_subject_data(policy_id, subject_category_id1) + data_id_2 = data_builder.create_subject_data(policy_id, subject_category_id2) + data_id_3 = data_builder.create_object_data(policy_id, object_category_id2) + data_id_4 = data_builder.create_object_data(policy_id, object_category_id1) + data_id_5 = data_builder.create_action_data(policy_id, action_category_id1) + data_id_5 = data_builder.create_action_data(policy_id, action_category_id2) + + from moon_utilities.auth_functions import get_api_key_for_user + from falcon import HTTP_200, HTTP_400, HTTP_405, HTTP_409 + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + + req = hug.test.delete(policy, "policies/{}".format(policy_id), headers=auth_headers) + assert req.status == HTTP_200 + + +def test_update_meta_rules_with_blank_action_categories_assigned_to_used_model(): + from moon_utilities.auth_functions import get_api_key_for_user + from moon_manager.api import meta_rules + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies_list = policy_helper.add_policies_with_model() + policy_id = list(policies_list.keys())[0] + model_id = policies_list[policy_id]['model_id'] + models_list = model_helper.get_models(model_id=model_id) + meta_rule_id = models_list[model_id]["meta_rules"][0] + meta_rules_list = meta_rule_helper.get_meta_rules(meta_rule_id=meta_rule_id); + data = meta_rules_list[meta_rule_id] + + data["action_categories"] = [] + + with pytest.raises(exceptions.MetaRuleUpdateError) as exception_info: + hug.test.patch(meta_rules, "/meta_rules/{}".format(meta_rule_id), body=data, + headers=auth_headers) + assert "400: Meta_Rule Update Error" == str(exception_info.value) diff --git a/moon_manager/tests/unit_python/api/test_models.py b/moon_manager/tests/unit_python/api/test_models.py index 3c205d1d..569fe1b4 100644 --- a/moon_manager/tests/unit_python/api/test_models.py +++ b/moon_manager/tests/unit_python/api/test_models.py @@ -1,67 +1,475 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + import json -import api.utilities as utilities +import hug +import pytest +from moon_utilities import exceptions +from helpers import data_builder as builder +from helpers import policy_helper +from helpers import model_helper +from uuid import uuid4 + + +def get_models(): + from moon_manager.api import models + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(models, "/models", headers=auth_headers) + models = req.data + return req, models + + +def add_models(name, headers, data=None, ): + from moon_manager.api import models + subject_category_id, object_category_id, action_category_id, meta_rule_id = \ + builder.create_new_meta_rule() + if not data: + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + headers['Content-Type'] = 'application/json' + req = hug.test.post(models, "/models", body=json.dumps(data), + headers=headers) + models = req.data + return req, models + + +def update_model(name, model_id, headers): + from moon_manager.api import models + subject_category_id, object_category_id, action_category_id, meta_rule_id = \ + builder.create_new_meta_rule() + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + headers['Content-Type'] = 'application/json' + req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data), + headers=headers) + if req.status == hug.HTTP_405: + return req + models = req.data + return req, models + + +def add_model_without_meta_rules_ids(name, headers): + from moon_manager.api import models + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [] + } + headers['Content-Type'] = 'application/json' + req = hug.test.post(models, "/models", body=json.dumps(data), + headers=headers) + models = req.data + return req, models -def get_models(client): - req = client.get("/models") - models = utilities.get_json(req.data) +def add_model_with_empty_meta_rule_id(name, headers): + from moon_manager.api import models + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [""] + } + headers['Content-Type'] = 'application/json' + req = hug.test.post(models, "/models", body=json.dumps(data), + headers=headers) + models = req.data return req, models -def add_models(client, name): +def update_model_without_meta_rules_ids(model_id, headers): + from moon_manager.api import models + name = "model_id" + uuid4().hex data = { "name": name, "description": "description of {}".format(name), - "meta_rules": ["meta_rule_id1", "meta_rule_id2"] + "meta_rules": [] } - req = client.post("/models", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - models = utilities.get_json(req.data) + headers['Content-Type'] = 'application/json' + req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data), + headers=headers) + models = req.data return req, models -def delete_models(client, name): - request, models = get_models(client) +def delete_models(name, headers): + request, models = get_models() for key, value in models['models'].items(): if value['name'] == name: - req = client.delete("/models/{}".format(key)) + from moon_manager.api import models + req = hug.test.delete(models, "/models/{}".format(key), headers=headers) break return req -def delete_models_without_id(client): - req = client.delete("/models/{}".format("")) +def delete_models_without_id(headers): + from moon_manager.api import models + req = hug.test.delete(models, "/models/{}".format(""), headers=headers) return req +def clean_models(headers): + req, models = get_models() + for key, value in models['models'].items(): + print(key) + print(value) + from moon_manager.api import models + hug.test.delete(models, "/models/{}".format(key), headers=headers) + + +def test_delete_model_assigned_to_policy(): + policy_name = "testuser" + uuid4().hex + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + from moon_manager.api import policy + from moon_manager.api import models + from moon_utilities.auth_functions import get_api_key_for_user + headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'} + hug.test.post(policy, "/policies", body=json.dumps(data), headers=headers) + with pytest.raises(exceptions.DeleteModelWithPolicy) as exception_info: + req = hug.test.delete(models, "/models/{}".format(model_id), headers={"X-Api-Key": + get_api_key_for_user("admin")}) + assert "400: Model With Policy Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Model With Policy Error' + + def test_get_models(): - client = utilities.register_client() - req, models= get_models(client) - assert req.status_code == 200 + req, models = get_models() + assert req.status == hug.HTTP_200 assert isinstance(models, dict) assert "models" in models def test_add_models(): - client = utilities.register_client() - req, models = add_models(client, "testuser") - assert req.status_code == 200 + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req, models = add_models("testuser", auth_headers) + assert req.status == hug.HTTP_200 assert isinstance(models, dict) - value = list(models["models"].values())[0] + model_id = list(models["models"])[0] assert "models" in models - assert value['name'] == "testuser" - assert value["description"] == "description of {}".format("testuser") - assert value["meta_rules"][0] == "meta_rule_id1" + assert models['models'][model_id]['name'] == "testuser" + assert models['models'][model_id]["description"] == "description of {}".format("testuser") + + +def test_add_models_with_meta_rule_has_blank_subject(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + name = "testuser1" + from moon_manager.api import models + subject_category_id, object_category_id, action_category_id, meta_rule_id = \ + builder.create_new_meta_rule(empty="subject") + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + auth_headers['Content-Type'] = 'application/json' + req = hug.test.post(models, "/models", body=json.dumps(data), + headers=auth_headers) + assert req.status == hug.HTTP_200 + + +def test_add_models_with_meta_rule_has_blank_object(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + name = "testuser1" + from moon_manager.api import models + subject_category_id, object_category_id, action_category_id, meta_rule_id = \ + builder.create_new_meta_rule(empty="object") + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + auth_headers['Content-Type'] = 'application/json' + req = hug.test.post(models, "/models", body=json.dumps(data), + headers=auth_headers) + assert req.status == hug.HTTP_200 + + +def test_add_models_with_meta_rule_has_blank_action(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + name = "testuser1" + from moon_manager.api import models + subject_category_id, object_category_id, action_category_id, meta_rule_id = \ + builder.create_new_meta_rule(empty="action") + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + auth_headers['Content-Type'] = 'application/json' + req = hug.test.post(models, "/models", body=json.dumps(data), + headers=auth_headers) + assert req.status == hug.HTTP_200 def test_delete_models(): - client = utilities.register_client() - req = delete_models(client, "testuser") - assert req.status_code == 200 + name = uuid4().hex + "testuser" + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + add_models(name, auth_headers) + req = delete_models(name, headers=auth_headers) + assert req.status == hug.HTTP_200 + + +def test_update_models_with_assigned_policy(): + from moon_manager.api import models + model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(model.keys())[0] + value = { + "name": "test_policy" + uuid4().hex, + "model_id": model_id, + "description": "test", + } + policy_helper.add_policies(value=value) + data = { + "name": "model_" + uuid4().hex, + "description": "description of model_2", + "meta_rules": [] + } + from moon_utilities.auth_functions import get_api_key_for_user + headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'} + with pytest.raises(exceptions.DeleteModelWithPolicy) as exception_info: + req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data), + headers=headers) + assert "400: Model With Policy Error" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Model With Policy Error" + + +def test_update_models_with_no_assigned_policy(): + from moon_manager.api import models + model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(model.keys())[0] + data = { + "name": "model_" + uuid4().hex, + "description": "description of model_2", + "meta_rules": [] + } + from moon_utilities.auth_functions import get_api_key_for_user + headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'} + req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data), + headers=headers) + assert req.status == hug.HTTP_200 + + +def test_update_models_without_meta_rule_key(): + from moon_manager.api import models + model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(model.keys())[0] + + data = { + "name": "model_" + uuid4().hex, + "description": "description of model_2", + } + from moon_utilities.auth_functions import get_api_key_for_user + headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'} + with pytest.raises(exceptions.MetaRuleUnknown) as exception_info: + req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data), + headers=headers) + assert "400: Meta Rule Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Meta Rule Unknown" def test_delete_models_without_id(): - client = utilities.register_client() - req = delete_models_without_id(client) - assert req.status_code == 500 + from moon_utilities.auth_functions import get_api_key_for_user + headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = delete_models_without_id(headers=headers) + assert req.status == hug.HTTP_405 + + +def test_add_model_with_empty_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, models = add_models("<br/>", headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_model_with_name_contain_space(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req, models = add_models("test<br>user", headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_model_with_name_space(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + with pytest.raises(exceptions.ModelContentError) as exception_info: + req, models = add_models(" ", headers=auth_headers) + assert "400: Model Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Model Unknown' + + +def test_add_model_with_empty_meta_rule_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + with pytest.raises(exceptions.MetaRuleUnknown) as exception_info: + req, meta_rules = add_model_with_empty_meta_rule_id("testuser", headers=auth_headers) + assert "400: Meta Rule Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Meta Rule Unknown' + +def test_add_model_with_existed_name(): + name = uuid4().hex + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req, models = add_models(name, headers=auth_headers) + assert req.status == hug.HTTP_200 + with pytest.raises(exceptions.ModelExisting) as exception_info: + req, models = add_models(name, headers=auth_headers) + assert "409: Model Error" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Model Error' + + +def test_add_model_with_existed_meta_rules_list(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + name = uuid4().hex + subject_category_id, object_category_id, action_category_id, meta_rule_id = \ + builder.create_new_meta_rule() + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + name = uuid4().hex + req, models = add_models(name=name, headers=auth_headers, data=data) + assert req.status == hug.HTTP_200 + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + with pytest.raises(exceptions.ModelExisting) as exception_info: + req, models = add_models(name=name, headers=auth_headers, data=data) + assert "409: Model Error" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Model Error' + + +def test_add_model_without_meta_rules(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req, meta_rules = add_model_without_meta_rules_ids("testuser", headers=auth_headers) + assert req.status == hug.HTTP_200 + + +def test_update_model(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req = add_models("testuser", headers=auth_headers) + model_id = list(req[1]['models'])[0] + req_update = update_model("testuser", model_id, headers=auth_headers) + assert req_update[0].status == hug.HTTP_200 + model_id = list(req_update[1]["models"])[0] + assert req_update[1]["models"][model_id]["meta_rules"][0] is not None + delete_models("testuser", headers=auth_headers) + + +def test_update_model_name_with_space(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req = add_models("testuser", headers=auth_headers) + model_id = list(req[1]['models'])[0] + with pytest.raises(exceptions.ModelContentError) as exception_info: + req_update = update_model(" ", model_id, headers=auth_headers) + assert "400: Model Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]["message"] == '400: Model Unknown' + + +def test_update_model_with_empty_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req = add_models("testuser", headers=auth_headers) + model_id = list(req[1]['models'])[0] + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + with pytest.raises(exceptions.ModelContentError) as exception_info: + req_update = update_model("", model_id, headers=auth_headers) + assert "400: Model Unknown" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[1]['message'] == '400: Model Unknown' + + +def test_update_meta_rules_without_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + clean_models(headers=auth_headers) + req_update = update_model("testuser", "", headers=auth_headers) + assert req_update.status == hug.HTTP_405 + + +def test_update_meta_rules_without_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req_update = update_model("<a></a>", "1234567", headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req_update[0].status == hug.HTTP_400 + # assert req_update[0].data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_update_meta_rules_without_meta_rules(): + value = { + "name": "mls_model_id" + uuid4().hex, + "description": "test", + "meta_rules": [] + } + model = model_helper.add_model(value=value) + model_id = list(model.keys())[0] + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req_update = update_model_without_meta_rules_ids(model_id, headers=auth_headers) + assert req_update[0].status == hug.HTTP_200 diff --git a/moon_manager/tests/unit_python/api/test_nova.py b/moon_manager/tests/unit_python/api/test_nova.py new file mode 100644 index 00000000..10118cc3 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_nova.py @@ -0,0 +1,58 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + + +def create_project(tenant_dict): + from moon_manager.pip_driver import InformationManager + return InformationManager["objects"][0].create_project(**tenant_dict) + + +def list_projects(): + from moon_manager.pip_driver import InformationManager + return InformationManager["objects"][0].get_projects() + + +def list_objects(): + from moon_manager.pip_driver import InformationManager + print(f"IM : {InformationManager['objects'][0].driver.__dict__}") + return InformationManager["objects"][0].get_items() + + +def test_create_project(): + tenant_dict = { + "description": "test_project", + "domain": ['domain_id_1'], + "enabled": True, + "is_domain": False, + "name": 'project_1' + } + project = create_project(tenant_dict) + assert project + assert project.get('name') == tenant_dict.get('name') + + +def test_list_objects(): + objects = list_objects() + assert objects + assert objects["servers"][0].get('name') == "vm1" + +# TODO TO BE UPDATED +# def test_create_project_without_name(): +# tenant_dict = { +# "description": "test_project", +# "domain_id": ['domain_id_1'], +# "enabled": True, +# "is_domain": False, +# } +# with pytest.raises(Exception) as exception_info: +# create_project(tenant_dict) +# assert '400: Keystone project error' == str(exception_info.value) diff --git a/moon_manager/tests/unit_python/api/test_pdp.py b/moon_manager/tests/unit_python/api/test_pdp.py index a2d0cb5a..32b75726 100644 --- a/moon_manager/tests/unit_python/api/test_pdp.py +++ b/moon_manager/tests/unit_python/api/test_pdp.py @@ -1,62 +1,479 @@ -import json -import api.utilities as utilities +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from falcon import HTTP_200, HTTP_400, HTTP_405 +import hug import pytest +from moon_utilities import exceptions +from uuid import uuid4 +from helpers import data_builder as builder + + +def test_get_pdp(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + req = hug.test.get(pdp, 'pdp/', headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + assert "pdps" in req.data + +def test_add_pdp_invalid_security_pipeline(mocker): + from moon_manager.api import pdp + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data_no_pipeline = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + data_no_project_no_pipeline = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [], + "vim_project_id": None, + "description": "description of testuser" + } + data_no_project = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [policy_id], + "vim_project_id": None, + "description": "description of testuser" + } + + req = hug.test.post(pdp, "pdp/", data_no_project_no_pipeline, headers=auth_headers) + assert req.status == HTTP_200 + + with pytest.raises(exceptions.PdpContentError) as exception_info: + req = hug.test.post(pdp, "pdp/", data_no_pipeline, headers=auth_headers) + assert "400: Pdp Error" == str(exception_info.value) + + with pytest.raises(exceptions.PdpContentError) as exception_info: + req = hug.test.post(pdp, "pdp/", data_no_project, headers=auth_headers) + assert "400: Pdp Error" == str(exception_info.value) + +def test_update_pdp_invalid_security_pipeline(mocker): + from moon_manager.api import pdp + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data_no_pipeline = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + data_no_project_no_pipeline = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [], + "vim_project_id": None, + "description": "description of testuser" + } + data_no_project = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [policy_id], + "vim_project_id": None, + "description": "description of testuser" + } + + data_valid = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [policy_id], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + req = hug.test.post(pdp, "pdp/", data_valid, headers=auth_headers) + assert req.status == HTTP_200 + pip_id = list(req.data['pdps'])[0] + + req = hug.test.patch(pdp, "pdp/{}".format(pip_id), data_no_project_no_pipeline, headers=auth_headers) + assert req.status == HTTP_200 + with pytest.raises(exceptions.PdpContentError) as exception_info: + req = hug.test.patch(pdp, "pdp/{}".format(pip_id), data_no_pipeline, headers=auth_headers) + assert "400: Pdp Error" == str(exception_info.value) -def get_pdp(client): - req = client.get("/pdp") - pdp = utilities.get_json(req.data) - return req, pdp + with pytest.raises(exceptions.PdpContentError) as exception_info: + req = hug.test.patch(pdp, "pdp/{}".format(pip_id), data_no_project, headers=auth_headers) + assert "400: Pdp Error" == str(exception_info.value) + +def test_add_pdp(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [policy_id], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + req = hug.test.post(pdp, "pdp/", data, headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + found = False + assert "pdps" in req.data + for value in req.data["pdps"].values(): + if value['name'] == data['name']: + found = True + assert value["description"] == "description of {}".format("testuser") + assert value["vim_project_id"] == "vim_project_id" + break + assert found -def add_pdp(client, data): - req = client.post("/pdp", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - pdp = utilities.get_json(req.data) - return req, pdp +def test_add_pdp_name_existed(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + name = "testuser" + uuid4().hex + data = { + "name": name, + "security_pipeline": [policy_id1], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + req = hug.test.post(pdp, "pdp/", data, headers=auth_headers) + assert req.status == HTTP_200 -def delete_pdp(client, key): - req = client.delete("/pdp/{}".format(key)) - return req + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data = { + "name": name, + "security_pipeline": [policy_id2], + "vim_project_id": "vim_project_id" + uuid4().hex, + "description": "description of testuser" + uuid4().hex + } + with pytest.raises(exceptions.PdpExisting) as exception_info: + req = hug.test.post(pdp, "pdp/", data, headers=auth_headers) + assert "409: Pdp Error" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data['message'] == '409: Pdp Error' -def delete_pdp_without_id(client): - req = client.delete("/pdp/{}".format("")) - return req +def test_add_pdp_policy_used(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) -def test_get_pdp(): - client = utilities.register_client() - req, pdp = get_pdp(client) - assert req.status_code == 200 - assert isinstance(pdp, dict) - assert "pdps" in pdp + data = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [policy_id1], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + req = hug.test.post(pdp, "pdp/", data, headers=auth_headers) + assert req.status == HTTP_200 + + name_uuid = "testuser" + uuid4().hex + data = { + "name": name_uuid, + "security_pipeline": [policy_id1], + "vim_project_id": "vim_project_id " + name_uuid, + "description": "description of testuser " + name_uuid + } + with pytest.raises(exceptions.PdpInUse) as exception_info: + req = hug.test.post(pdp, "pdp/", data, headers=auth_headers) + assert "400: Pdp Inuse" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data['message'] == '409: Pdp Conflict' -def test_add_pdp(): + +def test_delete_pdp(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) data = { + "name": "testuser" + uuid4().hex, + "security_pipeline": [policy_id], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + req = hug.test.post(pdp, "pdp/", data, headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + req = hug.test.get(pdp, 'pdp/', headers=auth_headers) + success_req = None + for key, value in req.data['pdps'].items(): + if value['name'] == data['name']: + success_req = hug.test.delete(pdp, 'pdp/{}'.format(key), headers=auth_headers) + break + assert success_req + assert success_req.status == HTTP_200 + + +# Fixme: should re-enabled the input validation for those tests +# def test_add_pdp_with_forbidden_char_in_user(): +# data = { +# "name": "<a>", +# "security_pipeline": ["policy_id_1", "policy_id_2"], +# "vim_project_id": "vim_project_id", +# "description": "description of testuser" +# } +# req = hug.test.post(pdp, "pdp/", data) +# assert req.status == HTTP_400 +# print(req.data) +# assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" +# +# +# def test_add_pdp_with_forbidden_char_in_keystone(): +# data = { +# "name": "testuser", +# "security_pipeline": ["policy_id_1", "policy_id_2"], +# "vim_project_id": "<a>", +# "description": "description of testuser" +# } +# req = hug.test.post(pdp, "pdp/", data) +# assert req.status == 400 +# assert req.data["message"] == "Key: 'vim_project_id', [Forbidden characters in string]" + + +def test_update_pdp(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data_add = { "name": "testuser", + "security_pipeline": [policy_id], + "vim_project_id": "vim_project_id", + "description": "description of testuser" + } + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id_update = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data_update = { + "name": "testuser_updated", + "security_pipeline": [policy_id_update], + "vim_project_id": "vim_project_id_update", + "description": "description of testuser_updated" + } + req = hug.test.post(pdp, "pdp/", data_add, headers=auth_headers) + pdp_id = list(req.data['pdps'])[0] + req_update = hug.test.patch(pdp, "pdp/{}".format(pdp_id), data_update, headers=auth_headers) + assert req_update.status == HTTP_200 + value = list(req_update.data["pdps"].values())[0] + assert value["vim_project_id"] == data_update["vim_project_id"] + assert value["description"] == data_update["description"] + assert value["name"] == data_update['name'] + assert value["security_pipeline"] == data_update['security_pipeline'] + req = hug.test.get(pdp, 'pdp/', headers=auth_headers) + for key, value in req.data['pdps'].items(): + if value['name'] == "testuser": + hug.test.delete(pdp, 'pdp/{}'.format(key), headers=auth_headers) + break + + +def test_update_pdp_without_id(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + req = hug.test.patch(pdp, "pdp/", "testuser", headers=auth_headers) + assert req.status == HTTP_405 + # assert req.data["message"] == 'Invalid Key :name not found' + + +def test_update_pdp_without_user(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + data = { + "name": "", "security_pipeline": ["policy_id_1", "policy_id_2"], - "keystone_project_id": "keystone_project_id", + "vim_project_id": "vim_project_id", "description": "description of testuser" } - client = utilities.register_client() - req, pdp = add_pdp(client, data) - assert req.status_code == 200 - assert isinstance(pdp, dict) - value = list(pdp["pdps"].values())[0] - assert "pdps" in pdp - assert value['name'] == "testuser" - assert value["description"] == "description of {}".format("testuser") - assert value["keystone_project_id"] == "keystone_project_id" + req = hug.test.patch(pdp, "pdp/<a>", data, headers=auth_headers) + assert req.status == HTTP_400 + print(req.data) + assert req.data["errors"] == {'uuid': 'Invalid UUID provided'} -def test_delete_pdp(): - client = utilities.register_client() - request, pdp = get_pdp(client) - for key, value in pdp['pdps'].items(): - if value['name'] == "testuser": - success_req = delete_pdp(client, key) +def test_update_pdp_name_existed(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + uuid1 = uuid4().hex + data1 = { + "name": "testuser1" + uuid1, + "security_pipeline": [policy_id1], + "vim_project_id": "vim_project_id" + uuid1, + "description": "description of testuser1" + uuid1 + } + req = hug.test.post(pdp, "pdp/", data1, headers=auth_headers) + assert req.status == HTTP_200 + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + + uuid2 = uuid4().hex + data2 = { + "name": "testuser2" + uuid2, + "security_pipeline": [policy_id2], + "vim_project_id": "vim_project_id" + uuid2, + "description": "description of testuser2" + uuid2 + } + req = hug.test.post(pdp, "pdp/", data2, headers=auth_headers) + pdp_id = list(req.data['pdps'])[0] + for item in list(req.data['pdps']): + if req.data['pdps'][item]['name']==data2['name']: + pdp_id=item + break + data2['name'] = data1['name'] + with pytest.raises(exceptions.PdpExisting) as exception_info: + req_update = hug.test.patch(pdp, "pdp/{}".format(pdp_id), data2, headers=auth_headers) + # assert req_update.data['message'] == '409: Pdp Error' + assert "409: Pdp Error" == str(exception_info.value) + + + +def test_update_pdp_policy_used(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import pdp + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:20000") + mocker.patch("subprocess.Popen", return_value=True) + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + uuid1 = uuid4().hex + data1 = { + "name": "testuser1" + uuid1, + "security_pipeline": [policy_id1], + "vim_project_id": "vim_project_id" + uuid1, + "description": "description of testuser1" + uuid1 + } + req = hug.test.post(pdp, "pdp/", data1, headers=auth_headers) + assert req.status == HTTP_200 + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + + uuid2 = uuid4().hex + data2 = { + "name": "testuser2" + uuid2, + "security_pipeline": [policy_id2], + "vim_project_id": "vim_project_id" + uuid2, + "description": "description of testuser2" + uuid2 + } + req = hug.test.post(pdp, "pdp/", data2, headers=auth_headers) + pdp_id = list(req.data['pdps'])[0] + for item in list(req.data['pdps']): + if req.data['pdps'][item]['name']==data2['name']: + pdp_id=item break - assert success_req.status_code == 200 + data2['security_pipeline'] = data1['security_pipeline'] + + with pytest.raises(exceptions.PdpInUse) as exception_info: + req_update = hug.test.patch(pdp, "pdp/{}".format(pdp_id), data2, headers=auth_headers) + assert "400: Pdp Inuse" == str(exception_info.value) + # assert req_update.data['message'] == '409: Pdp Conflict' + + diff --git a/moon_manager/tests/unit_python/api/test_perimeter.py b/moon_manager/tests/unit_python/api/test_perimeter.py index db09780f..c741adf7 100644 --- a/moon_manager/tests/unit_python/api/test_perimeter.py +++ b/moon_manager/tests/unit_python/api/test_perimeter.py @@ -1,157 +1,1304 @@ -# import moon_manager -# import moon_manager.api -import json +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug import api.utilities as utilities +from helpers import data_builder as builder +import helpers.policy_helper as policy_helper +from uuid import uuid4 +import pytest +from moon_utilities import exceptions -def get_subjects(client): - req = client.get("/subjects") - assert req.status_code == 200 +def get_subjects(subject_id=None): + from moon_manager.api import perimeter + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + if not subject_id: + req = hug.test.get(perimeter, 'subjects/', headers=auth_headers) + else: + req = hug.test.get(perimeter, 'subjects/{}'.format(subject_id), headers=auth_headers) subjects = utilities.get_json(req.data) + return req, subjects + + +def add_subjects(policy_id, name, perimeter_id=None, data=None, auth_headers=None): + from moon_manager.api import perimeter + if not data: + name = name + uuid4().hex + data = { + "name": name, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + if not perimeter_id: + req = hug.test.post(perimeter, "/policies/{}/subjects".format(policy_id), + body=data, headers=auth_headers) + else: + req = hug.test.post(perimeter, "/policies/{}/subjects/{}".format(policy_id, perimeter_id), + body=data, headers=auth_headers) + subjects = utilities.get_json(req.data) + return req, subjects + + +def delete_subjects_without_perimeter_id(auth_headers=None): + from moon_manager.api import perimeter + req = hug.test.delete(perimeter, "/subjects/{}".format(""), headers=auth_headers) + return req + + +def test_perimeter_get_subject(): + req, subjects = get_subjects() + assert req.status == hug.HTTP_200 assert isinstance(subjects, dict) assert "subjects" in subjects - return subjects -def add_subjects(client, name): +def test_perimeter_add_subject(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers) + value = list(subjects["subjects"].values())[0] + assert req.status == hug.HTTP_200 + assert value["name"] + assert value["email"] + + +def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + name = "testuser" + perimeter_id = uuid4().hex + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + add_subjects(policy_id1, data['name'], perimeter_id=perimeter_id, data=data, + auth_headers=auth_headers) + policies2 = policy_helper.add_policies() + policy_id2 = list(policies2.keys())[0] + req, subjects = add_subjects(policy_id2, data['name'], + perimeter_id=perimeter_id, data=data, + auth_headers=auth_headers) + value = list(subjects["subjects"].values())[0] + assert req.status == hug.HTTP_200 + assert value["name"] + assert value["email"] + assert len(value['policy_list']) == 2 + assert policy_id1 in value['policy_list'] + assert policy_id2 in value['policy_list'] + + +def test_perimeter_add_same_subject_perimeter_id_with_different_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id = uuid4().hex + add_subjects(policy_id1, "testuser", perimeter_id=perimeter_id, auth_headers=auth_headers) + policies2 = policy_helper.add_policies() + policy_id2 = list(policies2.keys())[0] + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req, subjects = add_subjects(policy_id2, "testuser", perimeter_id=perimeter_id, + auth_headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_add_same_subject_name_with_new_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id = uuid4().hex + name = "testuser" + uuid4().hex data = { "name": name, "description": "description of {}".format(name), "password": "password for {}".format(name), "email": "{}@moon".format(name) } - req = client.post("/subjects", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - assert req.status_code == 200 - subjects = utilities.get_json(req.data) - assert isinstance(subjects, dict) - key = list(subjects["subjects"].keys())[0] + req, subjects = add_subjects(policy_id1, None, perimeter_id=perimeter_id, data=data, + auth_headers=auth_headers) + policies2 = policy_helper.add_policies() + policy_id2 = list(policies2.keys())[0] value = list(subjects["subjects"].values())[0] - assert "subjects" in subjects - assert key == "1111111111111" - assert value['id'] == "1111111111111" - assert value['name'] == name - assert value["description"] == "description of {}".format(name) - assert value["email"] == "{}@moon".format(name) - return subjects + data = { + "name": value['name'], + "description": "description of {}".format(value['name']), + "password": "password for {}".format(value['name']), + "email": "{}@moon".format(value['name']) + } + req, subjects = add_subjects(policy_id2, None, data=data, auth_headers=auth_headers) + value = list(subjects["subjects"].values())[0] + assert req.status == hug.HTTP_200 + assert value["name"] + assert value["email"] + assert len(value['policy_list']) == 2 + assert policy_id1 in value['policy_list'] + assert policy_id2 in value['policy_list'] -def add_subjects_without_name(client, name): +def test_perimeter_add_same_subject_name_with_same_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id = uuid4().hex + name = "testuser" + uuid4().hex data = { "name": name, "description": "description of {}".format(name), "password": "password for {}".format(name), "email": "{}@moon".format(name) } - req = client.post("/subjects", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - assert req.status_code == 500 + req, subjects = add_subjects(policy_id1, None, perimeter_id=perimeter_id, + data=data, auth_headers=auth_headers) + value = list(subjects["subjects"].values())[0] + data = { + "name": value['name'], + "description": "description of {}".format(value['name']), + "password": "password for {}".format(value['name']), + "email": "{}@moon".format(value['name']) + } + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req, subjects = add_subjects(policy_id1, None, data=data, auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' -def delete_subject(client, name): - subjects = get_subjects(client) - for key, value in subjects['subjects'].items(): - if value['name'] == name: - req = client.delete("/subjects/{}".format(key)) - assert req.status_code == 200 - break - subjects = get_subjects(client) - assert name not in [x['name'] for x in subjects["subjects"].values()] +def test_perimeter_add_same_subject_perimeter_id_with_existed_policy_id_in_list(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + name = "testuser" + uuid4().hex + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + subj_id = "b34e5a29-5494-4cc5-9356-daa244b8c254" + req, subjects = get_subjects(subj_id) + if subjects['subjects']: + for __policy_id in subjects['subjects'][subj_id]['policy_list']: + req = hug.test.delete(perimeter, + "/policies/{}/subjects/{}".format(__policy_id, subj_id), + headers=auth_headers) + req, subjects = add_subjects(policy_id, name, data=data, auth_headers=auth_headers) + perimeter_id = list(subjects["subjects"].values())[0]['id'] + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req, subjects = add_subjects(policy_id, name, perimeter_id=perimeter_id, data=data, + auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_subject_invalid_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + name = "testuser" + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req, subjects = add_subjects( policy_id + "0", "testuser", data, auth_headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Policy Unknown' + + +def test_perimeter_add_subject_blank_data(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + with pytest.raises(exceptions.ValidationKeyError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/subjects".format(policy_id), body={'test':"aa"}, + headers=auth_headers) + assert "Invalid Key :name not found" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == 'Invalid Key :name not found' + + +def test_perimeter_add_subject_with_forbidden_char_in_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "<a>", + "description": "description of {}".format(""), + "password": "password for {}".format(""), + "email": "{}@moon".format("") + } + subj_id = "a34e5a29-5494-4cc5-9356-daa244b8c888" + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/subjects".format(subj_id), body=data, + headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_update_subject_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers) + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update" + } + req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data, + headers=auth_headers) + subjects = utilities.get_json(req.data) + value2 = list(subjects["subjects"].values())[0] + assert req.status == hug.HTTP_200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] == value2['description'] + + +def test_perimeter_update_subject_description(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers) + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update", + } + req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data, + headers=auth_headers) + subjects = utilities.get_json(req.data) + value2 = list(subjects["subjects"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + +def test_perimeter_update_subject_description_and_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] -def test_subject(): - client = utilities.register_client() - get_subjects(client) - add_subjects(client, "testuser") - add_subjects_without_name(client, "") - delete_subject(client, "testuser") + req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers) + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update", + 'name': value1['name'] + "update" + } + from moon_manager.api import perimeter + req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data, + headers=auth_headers) + subjects = utilities.get_json(req.data) + value2 = list(subjects["subjects"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_subject_wrong_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, subjects = add_subjects(policy_id=policy_id1, name='testuser', data=data, + auth_headers=auth_headers) + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id + "wrong"), + body=data, headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Perimeter content is invalid.' -def get_objects(client): - req = client.get("/objects") - assert req.status_code == 200 +def test_perimeter_update_subject_name_with_existed_one(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name1 = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id1 = uuid4().hex + req, subjects = add_subjects(policy_id=policy_id1, name=name1, + perimeter_id=perimeter_id1, auth_headers=auth_headers) + value1 = list(subjects["subjects"].values())[0] + perimeter_id2 = uuid4().hex + name2 = 'testuser' + uuid4().hex + req, subjects = add_subjects(policy_id=policy_id1, name=name2, + perimeter_id=perimeter_id2, auth_headers=auth_headers) + data = { + 'name': value1['name'], + } + with pytest.raises(exceptions.SubjectExisting) as exception_info: + req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id2), body=data, + headers=auth_headers) + assert "409: Subject Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + + +def test_perimeter_delete_subject(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers) + subject_id = list(subjects["subjects"].values())[0]["id"] + req = hug.test.delete(perimeter, "/policies/{}/subjects/{}".format(policy_id, subject_id), + headers=auth_headers) + assert req.status == hug.HTTP_200 + + +def test_perimeter_delete_subjects_without_perimeter_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + with pytest.raises(exceptions.SubjectUnknown) as exception_info: + req = delete_subjects_without_perimeter_id(auth_headers) + assert "400: Subject Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Subject Unknown" + + +def get_objects(): + from moon_manager.api import perimeter + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(perimeter, "/objects", headers=auth_headers) objects = utilities.get_json(req.data) + return req, objects + + +def add_objects(name, policyId=None, data=None, perimeter_id=None, auth_headers=None): + from moon_manager.api import perimeter + if not policyId: + subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + if not data: + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + } + if not perimeter_id: + req = hug.test.post(perimeter, "/policies/{}/objects/".format(policyId), body=data, + headers=auth_headers) + else: + req = hug.test.post(perimeter, "/policies/{}/objects/{}".format(policyId, perimeter_id), + body=data, headers=auth_headers) + + objects = utilities.get_json(req.data) + return req, objects + + +def delete_objects_without_perimeter_id(auth_headers=None): + from moon_manager.api import perimeter + req = hug.test.delete(perimeter, "/objects/{}".format(""), headers=auth_headers) + return req + + +def test_perimeter_get_object(): + + req, objects = get_objects() + assert req.status == hug.HTTP_200 assert isinstance(objects, dict) assert "objects" in objects - return objects -def add_objects(client, name): +def test_perimeter_add_object(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, objects = add_objects("testuser", auth_headers=auth_headers) + value = list(objects["objects"].values())[0] + assert req.status == hug.HTTP_200 + assert value['name'] + + +def test_perimeter_add_object_with_wrong_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req, objects = add_objects("testuser", policyId='wrong', auth_headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Policy Unknown' + + +def test_perimeter_add_object_with_policy_id_none(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "testuser" + uuid4().hex, + "description": "description of {}".format("testuser"), + } + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/objects/".format(None), body=data, + headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Policy Unknown' + + +def test_perimeter_add_same_object_name_with_new_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, objects = add_objects("testuser", auth_headers=auth_headers) + value1 = list(objects["objects"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers) + value2 = list(objects["objects"].values())[0] + assert req.status == hug.HTTP_200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_object_perimeter_id_with_new_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, objects = add_objects( "testuser", auth_headers=auth_headers) + value1 = list(objects["objects"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + perimeter_id=value1['id'],auth_headers=auth_headers) + value2 = list(objects["objects"].values())[0] + assert req.status == hug.HTTP_200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_object_perimeter_id_with_different_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, objects = add_objects( "testuser", auth_headers=auth_headers) + value1 = list(objects["objects"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'] + 'different', + "description": "description of {}".format('testuser'), + } + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + perimeter_id=value1['id'], auth_headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_add_same_object_name_with_same_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] data = { "name": name, - "description": "description of {}".format(name), + "description": "description of {}".format('testuser'), } - req = client.post("/objects", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - assert req.status_code == 200 - objects = utilities.get_json(req.data) - assert isinstance(objects, dict) - key = list(objects["objects"].keys())[0] + req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers) value = list(objects["objects"].values())[0] - assert "objects" in objects - assert value['name'] == name - assert value["description"] == "description of {}".format(name) - return objects + assert req.status == hug.HTTP_200 + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' -def delete_objects(client, name): - objects = get_objects(client) - for key, value in objects['objects'].items(): - if value['name'] == name: - req = client.delete("/objects/{}".format(key)) - assert req.status_code == 200 - break - objects = get_objects(client) - assert name not in [x['name'] for x in objects["objects"].values()] +def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects( 'testuser', policyId=policy_id1, data=data, + auth_headers=auth_headers) + value = list(objects["objects"].values())[0] + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + perimeter_id=value['id'], auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' -def test_objects(): - client = utilities.register_client() - get_objects(client) - add_objects(client, "testuser") - delete_objects(client, "testuser") +def test_perimeter_update_object_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + auth_headers=auth_headers) -def get_actions(client): - req = client.get("/actions") - assert req.status_code == 200 - actions = utilities.get_json(req.data) - assert isinstance(actions, dict) - assert "actions" in actions - return actions + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update" + } + req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data, + headers=auth_headers) + + objects = utilities.get_json(req.data) + value2 = list(objects["objects"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] == value2['description'] -def add_actions(client, name): +def test_perimeter_update_object_description(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] data = { "name": name, - "description": "description of {}".format(name), + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + auth_headers=auth_headers) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update" + } + req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data, + headers=auth_headers) + + objects = utilities.get_json(req.data) + value2 = list(objects["objects"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_object_description_and_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + auth_headers=auth_headers) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data, + headers=auth_headers) + + objects = utilities.get_json(req.data) + value2 = list(objects["objects"].values())[0] + assert req.status == hug.HTTP_200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_object_wrong_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data, + auth_headers=auth_headers) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id + "wrong"), body=data, + headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + + +def test_perimeter_update_object_name_with_existed_one(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data1 = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data1, + auth_headers=auth_headers) + value1 = list(objects["objects"].values())[0] + + name = 'testuser' + uuid4().hex + + data2 = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects('testuser', policyId=policy_id1, data=data2, + auth_headers=auth_headers) + + value2 = list(objects["objects"].values())[0] + perimeter_id2 = value2['id'] + + data3 = { + 'name': value1['name'] } - req = client.post("/actions", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - assert req.status_code == 200 + with pytest.raises(exceptions.ObjectExisting) as exception_info: + req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id2), body=data3, + headers=auth_headers) + assert "409: Object Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Object Existing' + + +def test_perimeter_add_object_without_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "<br/>", + "description": "description of {}".format(""), + } + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/objects/".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body=data, headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_add_object_blank_data(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + with pytest.raises(exceptions.ValidationKeyError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/objects/".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body={}, headers=auth_headers) + assert "Invalid Key :name not found" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == 'Invalid Key :name not found' + + +def test_perimeter_add_object_with_name_contain_spaces(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "test<a>user", + "description": "description of {}".format("test user"), + } + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/objects/".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), body=data, + headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_add_object_with_name_space(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": " ", + "description": "description of {}".format("test user"), + } + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/objects/".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body =data, headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_delete_object(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + object_id = builder.create_object(policy_id) + req = hug.test.delete(perimeter, "/policies/{}/objects/{}".format(policy_id, object_id), headers=auth_headers) + + assert req.status == hug.HTTP_200 + + +def test_perimeter_delete_objects_without_perimeter_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + with pytest.raises(exceptions.ObjectUnknown) as exception_info: + req = delete_objects_without_perimeter_id(auth_headers=auth_headers) + assert "400: Object Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Object Unknown" + + +def get_actions(): + from moon_manager.api import perimeter + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req = hug.test.get(perimeter, "/actions", headers=auth_headers) + actions = utilities.get_json(req.data) + return req, actions + + +def add_actions(name, policy_id=None, data=None, perimeter_id=None, auth_headers=None): + from moon_manager.api import perimeter + if not policy_id: + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + + if not data: + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + } + if not perimeter_id: + req = hug.test.post(perimeter, "/policies/{}/actions/".format(policy_id), body=data, + headers=auth_headers) + else: + req = hug.test.post(perimeter, "/policies/{}/actions/{}".format(policy_id, perimeter_id), + body=data, headers=auth_headers) + actions = utilities.get_json(req.data) + return req, actions + + +def delete_actions_without_perimeter_id(auth_headers=None): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + req = hug.test.delete(perimeter, "/actions/{}".format(""), headers=auth_headers) + return req + + +def test_perimeter_get_actions(): + + req, actions = get_actions() + + assert req.status == hug.HTTP_200 assert isinstance(actions, dict) - key = list(actions["actions"].keys())[0] - value = list(actions["actions"].values())[0] assert "actions" in actions - assert value['name'] == name - assert value["description"] == "description of {}".format(name) - return actions - - -def delete_actions(client, name): - actions = get_actions(client) - for key, value in actions['actions'].items(): - if value['name'] == name: - req = client.delete("/actions/{}".format(key)) - assert req.status_code == 200 - break - actions = get_actions(client) - assert name not in [x['name'] for x in actions["actions"].values()] - - -def test_actions(): - client = utilities.register_client() - get_actions(client) - add_actions(client, "testuser") - delete_actions(client, "testuser") + + +def test_perimeter_add_actions(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, actions = add_actions("testuser", auth_headers=auth_headers) + value = list(actions["actions"].values())[0] + assert req.status == hug.HTTP_200 + assert value['name'] + + +def test_perimeter_add_action_with_wrong_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req, actions = add_actions("testuser", policy_id="wrong", auth_headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Policy Unknown' + + +def test_perimeter_add_action_with_policy_id_none(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "testuser" + uuid4().hex, + "description": "description of {}".format("testuser"), + } + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/actions/".format(None), body=data, + headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Policy Unknown' + + +def test_perimeter_add_same_action_name_with_new_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, action = add_actions("testuser", auth_headers=auth_headers) + value1 = list(action["actions"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, action = add_actions('testuser', policy_id=policy_id1, data=data, + auth_headers=auth_headers) + value2 = list(action["actions"].values())[0] + assert req.status == hug.HTTP_200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_action_perimeter_id_with_new_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, action = add_actions("testuser", auth_headers=auth_headers) + value1 = list(action["actions"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, action = add_actions('testuser', policy_id=policy_id1, data=data, + perimeter_id=value1['id'], auth_headers=auth_headers) + value2 = list(action["actions"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_action_perimeter_id_with_different_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req, action = add_actions("testuser", auth_headers=auth_headers) + value1 = list(action["actions"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'] + 'different', + "description": "description of {}".format('testuser'), + } + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req, action = add_actions('testuser', policy_id=policy_id1, data=data, + perimeter_id=value1['id'], auth_headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_add_same_action_name_with_same_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + req, action = add_actions("testuser", policy_id=policy_id1, auth_headers=auth_headers) + value1 = list(action["actions"].values())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req, action = add_actions('testuser', policy_id=policy_id1, data=data, + auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_same_action_perimeter_id_with_existed_policy_id_in_list(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + req, action = add_actions("testuser", policy_id=policy_id1, auth_headers=auth_headers) + value1 = list(action["actions"].values())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req, action = add_actions('testuser', policy_id=policy_id1, data=data, + perimeter_id=value1['id'], auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_actions_without_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "<a>", + "description": "description of {}".format(""), + } + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/actions".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body=data, headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_add_actions_with_name_contain_spaces(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "test<a>user", + "description": "description of {}".format("test user"), + } + with pytest.raises(exceptions.ValidationContentError) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/actions".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body=data, headers=auth_headers) + assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_subjects_without_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "testuser", + "description": "description of {}".format("test user"), + } + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/subjects".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body=data, headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +def test_add_objects_without_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "testuser", + "description": "description of {}".format("test user"), + } + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/objects".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), + body=data, headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +def test_add_action_without_policy_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + data = { + "name": "testuser", + "description": "description of {}".format("test user"), + } + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.post(perimeter, "/policies/{}/actions".format( + "a34e5a29-5494-4cc5-9356-daa244b8c888"), body=data, + headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +def test_perimeter_update_action_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + req, actions = add_actions("testuser", auth_headers=auth_headers) + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update" + } + req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data, + headers=auth_headers) + subjects = utilities.get_json(req.data) + value2 = list(subjects["actions"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] == value2['description'] + + +def test_perimeter_update_actions_description(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + req, actions = add_actions("testuser", auth_headers=auth_headers) + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update" + } + req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data, + headers=auth_headers) + subjects = utilities.get_json(req.data) + value2 = list(subjects["actions"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_actions_description_and_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + req, actions = add_actions("testuser", auth_headers=auth_headers) + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data, + headers=auth_headers) + subjects = utilities.get_json(req.data) + value2 = list(subjects["actions"].values())[0] + + assert req.status == hug.HTTP_200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_action_wrong_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + req, actions = add_actions("testuser", auth_headers=auth_headers) + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + with pytest.raises(exceptions.PerimeterContentError) as exception_info: + req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id + "wrong"), body=data, + headers=auth_headers) + assert "400: Perimeter content is invalid." == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_update_action_name_with_existed_one(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + req, actions = add_actions("testuser", auth_headers=auth_headers) + value1 = list(actions["actions"].values())[0] + req, actions = add_actions("testuser", auth_headers=auth_headers) + value2 = list(actions["actions"].values())[0] + perimeter_id2 = value2['id'] + data = { + 'name': value1['name'], + } + with pytest.raises(exceptions.ActionExisting) as exception_info: + req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id2), body=data, + headers=auth_headers) + assert "409: Action Existing" == str(exception_info.value) + # assert req.status == hug.HTTP_409 + # assert req.data["message"] == '409: Action Existing' + + +def test_perimeter_delete_actions(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + action_id = builder.create_action(policy_id) + req = hug.test.delete(perimeter, "/policies/{}/actions/{}".format(policy_id, action_id), + headers=auth_headers) + + + assert req.status == hug.HTTP_200 + +def test_delete_subject_assigned_to_policy(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + from moon_manager.db_driver import PolicyManager + + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + subject_id = builder.create_subject(policy_id) + PolicyManager.delete_policy(moon_user_id="admin", policy_id=policy_id) + PolicyManager.delete_subject(moon_user_id="admin", policy_id=None ,perimeter_id=subject_id) + + req = hug.test.get(perimeter, "subjects/{}".format(subject_id), headers=auth_headers) + assert req.data['subjects'] == {} + + +def test_delete_subject_without_policy(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + subject_id = builder.create_subject(policy_id) + + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.delete(perimeter, "/subjects/{}".format(subject_id), headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +def test_delete_objects_without_policy(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + object_id = builder.create_object(policy_id) + + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.delete(perimeter, "/objects/{}".format(object_id), headers=auth_headers) + + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +def test_delete_actions_without_policy(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import perimeter + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + action_id = builder.create_action(policy_id) + + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.delete(perimeter, "/actions/{}".format(action_id), headers=auth_headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +def test_perimeter_delete_actions_without_perimeter_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + with pytest.raises(exceptions.ActionUnknown) as exception_info: + req = delete_actions_without_perimeter_id(auth_headers=auth_headers) + assert "400: Action Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Action Unknown" diff --git a/moon_manager/tests/unit_python/api/test_perimeter_examples.py b/moon_manager/tests/unit_python/api/test_perimeter_examples.py new file mode 100644 index 00000000..0598629c --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_perimeter_examples.py @@ -0,0 +1,55 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import json + + +# def test_local_perimeter_get_subject(): +# from moon_manager.api import perimeter +# subjects = perimeter.Subjects.get() +# assert isinstance(subjects, dict) +# assert "subjects" in subjects + + + +# def test_http_perimeter_post_subject(): +# from moon_manager.api import perimeter +# result = hug.test.post(perimeter, 'subjects/b34e5a2954944cc59356daa244b8c254', +# body={'name': 'ha'}, +# headers={'Content-Type': 'application/json'}) +# assert result.status == hug.HTTP_200 +# assert isinstance(result.data, dict) +# assert "subjects" in result.data +# +# +# def test_http_perimeter_get_subject_2(): +# from moon_manager.api import perimeter +# result = hug.test.get(perimeter, 'subjects/b34e5a29-5494-4cc5-9356-daa244b8c254') +# assert result.status == hug.HTTP_200 +# assert isinstance(result.data, dict) +# assert "subjects" in result.data +# +# def test_http_perimeter_get_subject_3(): +# from moon_manager.api import perimeter +# result = hug.test.get(perimeter, 'policies/b34e5a29-5494-4cc5-9356-daa244b8c254/subjects/') +# assert result.status == hug.HTTP_200 +# assert isinstance(result.data, dict) +# assert "subjects" in result.data +# +# +# def test_http_perimeter_get_subject_4(): +# from moon_manager.api import perimeter +# result = hug.test.get(perimeter, 'policies/b34e5a29-5494-4cc5-9356-daa244b8c254/subjects/b34e5a29-5494-4cc5-9356-daa244b8c254') +# assert result.status == hug.HTTP_200 +# assert isinstance(result.data, dict) +# assert "subjects" in result.data diff --git a/moon_manager/tests/unit_python/api/test_policies.py b/moon_manager/tests/unit_python/api/test_policies.py index 4d4e387e..a07ba725 100644 --- a/moon_manager/tests/unit_python/api/test_policies.py +++ b/moon_manager/tests/unit_python/api/test_policies.py @@ -1,69 +1,424 @@ -import json -import api.utilities as utilities +# Software Name: MOON +# Version: 5.4 -def get_policies(client): - req = client.get("/policies") - policies = utilities.get_json(req.data) - return req, policies +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. -def add_policies(client, name): + +from falcon import HTTP_200, HTTP_400, HTTP_405, HTTP_409 +import hug +from uuid import uuid4 +import pytest +from moon_utilities import exceptions +from helpers import model_helper +from helpers import policy_helper + + +def get_policies(auth_headers): + from moon_manager.api import policy + req = hug.test.get(policy, "policies", headers=auth_headers) + return req + + +def add_policies(name, auth_headers): + from moon_manager.api import policy + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] data = { "name": name, "description": "description of {}".format(name), - "model_id": "modelId", + "model_id": model_id, "genre": "genre" } - req = client.post("/policies", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - policies = utilities.get_json(req.data) - return req, policies - - -def delete_policies(client, name): - request, policies = get_policies(client) - for key, value in policies['policies'].items(): - if value['name'] == name: - req = client.delete("/policies/{}".format(key)) - break + req = hug.test.post(policy, "policies", data, headers=auth_headers) return req -def delete_policies_without_id(client): - req = client.delete("/policies/{}".format("")) +def delete_policies_without_id(auth_headers): + from moon_manager.api import policy + req = hug.test.delete(policy, "policies/{}".format(""), headers=auth_headers) return req def test_get_policies(): - client = utilities.register_client() - req, policies = get_policies(client) - assert req.status_code == 200 - assert isinstance(policies, dict) - assert "policies" in policies + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = get_policies(auth_headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + assert "policies" in req.data def test_add_policies(): - client = utilities.register_client() - req, policies = add_policies(client, "testuser") - assert req.status_code == 200 - assert isinstance(policies, dict) - value = list(policies["policies"].values())[0] - assert "policies" in policies - assert value['name'] == "testuser" - assert value["description"] == "description of {}".format("testuser") - assert value["model_id"] == "modelId" - assert value["genre"] == "genre" + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policy_name = "testuser" + uuid4().hex + req = add_policies(policy_name, auth_headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + value = list(req.data["policies"].values())[0] + assert "policies" in req.data + assert value['name'] == policy_name + assert value["description"] == "description of {}".format(policy_name) + + +def test_add_policies_without_model(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + policy_name = "testuser" + uuid4().hex + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": "", + "genre": "genre" + } + req = hug.test.post(policy, "policies/", data, headers=auth_headers) + + assert req.status == HTTP_200 + + +def test_add_policies_with_same_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + name = uuid4().hex + policy_name = name + req = add_policies(policy_name, auth_headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + value = list(req.data["policies"].values())[0] + assert "policies" in req.data + assert value['name'] == policy_name + assert value["description"] == "description of {}".format(policy_name) + with pytest.raises(exceptions.PolicyExisting) as exception_info: + req = add_policies(policy_name, auth_headers=auth_headers) + assert "409: Policy Already Exists" == str(exception_info.value) + # assert req.status == HTTP_409 + # assert req.data["message"] == '409: Policy Already Exists' + + +def test_add_policy_with_empty_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policy_name = "" + with pytest.raises(exceptions.PolicyContentError) as exception_info: + req = add_policies(policy_name, auth_headers=auth_headers) + assert "400: Policy Content Error" == str(exception_info.value) + # assert req.status == HTTP_400 + # assert req.data["message"] == '400: Policy Content Error' + + +def test_add_policy_with_model_has_no_meta_rule(): + from moon_utilities.auth_functions import get_api_key_for_user + from moon_manager.api import policy + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policy_name = "testuser" + uuid4().hex + req = model_helper.add_model_without_meta_rule() + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + with pytest.raises(exceptions.MetaRuleUnknown) as exception_info: + hug.test.post(policy, "policies/", data, headers=auth_headers) + assert "400: Meta Rule Unknown" == str(exception_info.value) + + +def test_add_policy_with_model_has_blank_subject_meta_rule(): + from moon_utilities.auth_functions import get_api_key_for_user + from moon_manager.api import policy + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + policy_name = "testuser" + uuid4().hex + req = model_helper.add_model_with_blank_subject_meta_rule() + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + with pytest.raises(exceptions.MetaRuleContentError) as exception_info: + hug.test.post(policy, "policies/", data, headers=auth_headers) + assert "400: Meta Rule Error" == str(exception_info.value) + + + +# FIXME: uncomment when model API is re-inserted +# def test_update_policies_with_model(): +# from moon_manager.api import policy +# policy_name = "testuser" + uuid4().hex +# data = { +# "name": policy_name, +# "description": "description of {}".format(policy_name), +# "model_id": "", +# "genre": "genre" +# } +# req = hug.test.post(policy, "policies/", data) +# policy_id = next(iter(req.data['policies'])) +# req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) +# model_id = list(req.data.keys())[0] +# data = { +# "name": policy_name + "-2", +# "description": "description of {}".format(policy_name), +# "model_id": model_id, +# "genre": "genre" +# } +# req = hug.test.patch("policies/{}".format(policy_id), data) +# assert req.status == HTTP_200 +# assert req.data['policies'][policy_id]['name'] == policy_name + '-2' + + +def test_update_policies_name_success(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + policy_name = "testuser" + uuid4().hex + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = hug.test.post(policy, "policies/", data, headers=auth_headers) + policy_id = next(iter(req.data['policies'])) + + data = { + "name": policy_name + "-2", + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers) + assert req.status == HTTP_200 + assert req.data['policies'][policy_id]['name'] == policy_name + '-2' + + +def test_update_blank_policies_with_model(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + policy_name = uuid4().hex + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": "", + "genre": "genre" + } + req = hug.test.post(policy, "policies/", data, headers=auth_headers) + policy_id = next(iter(req.data['policies'])) + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers) + assert req.status == HTTP_200 + + +def test_update_policies_model_unused(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + policy_name = uuid4().hex + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = hug.test.post(policy, "policies/", data, headers=auth_headers) + policy_id = next(iter(req.data['policies'])) + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + + with pytest.raises(exceptions.PolicyUpdateError) as exception_info: + req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers) + assert "400: Policy update error" == str(exception_info.value) + + +# FIXME: uncomment when model API is re-inserted +# def test_update_policy_name_with_existed_one(): +# from moon_manager.api import policy +# policy_name1 = "testuser" + uuid4().hex +# req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) +# model_id = list(req.keys())[0] +# data = { +# "name": policy_name1, +# "description": "description of {}".format(policy_name1), +# "model_id": model_id, +# "genre": "genre" +# } +# req = hug.test.post(policy, "policies/", data) +# policy_id1 = next(iter(req.data['policies'])) +# +# policy_name2 = "testuser" + uuid4().hex +# eq = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) +# model_id = list(req.data.keys())[0] +# data = { +# "name": policy_name2, +# "description": "description of {}".format(policy_name2), +# "model_id": model_id, +# "genre": "genre" +# } +# req = hug.test.post(policy, "policies/", data) +# policy_id2 = next(iter(req.data['policies'])) +# +# data = { +# "name": policy_name1, +# "description": "description of {}".format(policy_name1), +# "model_id": model_id, +# "genre": "genre" +# } +# req = hug.test.patch(policy, "policies/{}".format(policy_id2), data) +# assert req.status == HTTP_409 +# assert req.data["message"] == '409: Policy Already Exists' + + +def test_update_policies_with_empty_name(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + policy_name = "testuser" + uuid4().hex + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = hug.test.post(policy, "policies/", data, headers=auth_headers) + policy_id = next(iter(req.data['policies'])) + + data = { + "name": "", + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + with pytest.raises(exceptions.PolicyContentError) as exception_info: + req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers) + assert "400: Policy Content Error" == str(exception_info.value) + # assert req.status == HTTP_400 + # assert req.data["message"] == '400: Policy Content Error' + + +def test_update_policies_with_blank_model(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + policy_name = "testuser" + uuid4().hex + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = hug.test.post(policy, "policies/", data, headers=auth_headers) + policy_id = next(iter(req.data['policies'])) + + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": "", + "genre": "genre" + } + + with pytest.raises(exceptions.PolicyUpdateError) as exception_info: + req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers) + assert "400: Policy update error" == str(exception_info.value) + + +# FIXME: uncomment when model API is re-inserted +# def test_update_policies_connected_to_rules_with_blank_model(): +# from moon_manager.api import policy +# req, rules, policy_id = data_builder.add_rules() +# req = hug.test.get(policy, "policies") +# for policy_obj_id in req.data['policies']: +# if policy_obj_id == policy_id: +# policy = req.data['policies'][policy_obj_id] +# policy['model_id'] = '' +# req = hug.test.patch("/policies/{}".format(policy_id), req.data) +# assert req.status == HTTP_400 +# assert req.data["message"] == '400: Policy update error' def test_delete_policies(): - client = utilities.register_client() - req = delete_policies(client, "testuser") - assert req.status_code == 200 + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import policy + _policy = policy_helper.add_policies() + policy_id = list(_policy.keys())[0] + req = hug.test.delete(policy, "policies/{}".format(policy_id), headers=auth_headers) + assert req.status == HTTP_200 -def test_delete_policies_without_id(): - client = utilities.register_client() - req = delete_policies_without_id(client) - assert req.status_code == 500 +# FIXME: uncomment when rule API is re-inserted +# def test_delete_policy_with_dependencies_rule(): +# from moon_manager.api import policy +# req, rules, policy_id = data_builder.add_rules() +# req = hug.test.delete(policy, "policies/{}".format(policy_id)) +# assert req.status == HTTP_400 +# assert req.data["message"] == '400: Policy With Rule Error' + +# FIXME: uncomment when perimeter API is re-inserted +# def test_delete_policy_with_dependencies_subject_data(): +# from moon_manager.api import policy +# req, rules, policy_id = data_builder.add_rules() +# req = hug.test.delete(policy, "policies/{}/rules/{}".format(policy_id, next(iter(rules['rules'])))) +# assert req.status == HTTP_200 +# req = hug.test.delete(policy, "policies/{}".format(policy_id)) +# assert req.status == HTTP_400 +# assert req.data["message"] == '400: Policy With Data Error' + + +# FIXME: uncomment when perimeter API is re-inserted +# def test_delete_policy_with_dependencies_perimeter(): +# from moon_manager.api import policy +# _policy = policy_helper.add_policies() +# policy_id = next(iter(_policy)) +# +# data = { +# "name": 'testuser'+uuid4().hex, +# "description": "description of {}".format(uuid4().hex), +# "password": "password for {}".format(uuid4().hex), +# "email": "{}@moon".format(uuid4().hex) +# } +# req = hug.test.post(policy, "policies/{}/subjects".format(policy_id), data) +# +# assert req.status == HTTP_200 +# req = hug.test.delete(policy, "policies/{}".format(policy_id)) +# assert req.status == HTTP_400 +# assert req.data["message"] == '400: Policy With Perimeter Error' + + +def test_delete_policies_without_id(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = delete_policies_without_id(auth_headers=auth_headers) + assert req.status == HTTP_405 diff --git a/moon_manager/tests/unit_python/api/test_rules.py b/moon_manager/tests/unit_python/api/test_rules.py index 86a3d390..2bb7a96f 100644 --- a/moon_manager/tests/unit_python/api/test_rules.py +++ b/moon_manager/tests/unit_python/api/test_rules.py @@ -1,58 +1,333 @@ -import api.utilities as utilities +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import pytest +from moon_utilities import exceptions import json +from helpers import data_builder as builder +from helpers import policy_helper +from helpers import rules_helper +import hug + +def get_rules(policy_id): + from moon_manager.api import rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} -def get_rules(client, policy_id): - req = client.get("/policies/{}/rules".format(policy_id)) - rules = utilities.get_json(req.data) + req = hug.test.get(rules, "/policies/{}/rules".format(policy_id), headers=auth_headers) + rules = req.data return req, rules -def add_rules(client, policy_id): +def add_rules_without_policy_id(headers): + from moon_manager.api import rules + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() data = { - "meta_rule_id": "meta_rule_id1", + "meta_rule_id": meta_rule_id, + "rule": [subject_category_id, object_category_id, action_category_id], + "instructions": [ + {"decision": "grant"}, + ], + "enabled": True + } + headers['Content-Type'] = 'application/json' + req = hug.test.post(rules, "/policies/{}/rules".format(None), body=json.dumps(data), + headers=headers) + rules = req.data + return req, rules + + +def add_rules_without_meta_rule_id(policy_id, headers): + from moon_manager.api import rules + data = { + "meta_rule_id": "", "rule": ["subject_data_id2", "object_data_id2", "action_data_id2"], - "instructions": ( + "instructions": [ {"decision": "grant"}, - ), + ], "enabled": True } - req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - rules = utilities.get_json(req.data) + headers['Content-Type'] = 'application/json' + req = hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers=headers) + rules = req.data return req, rules -def delete_rules(client, policy_id, meta_rule_id): - req = client.delete("/policies/{}/rules/{}".format(policy_id, meta_rule_id)) +def add_rules_without_rule(policy_id, headers): + from moon_manager.api import rules + data = { + "meta_rule_id": "meta_rule_id1", + "instructions": [ + {"decision": "grant"}, + ], + "enabled": True + } + headers['Content-Type'] = 'application/json' + req = hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers=headers) + rules = req.data + return req, rules + + +def delete_rules(policy_id, meta_rule_id, headers): + from moon_manager.api import rules + req = hug.test.delete(rules, "/policies/{}/rules/{}".format(policy_id, meta_rule_id), + headers=headers) + return req + + +def update_rule(policy_id, rule_id, instructions, headers): + from moon_manager.api import rules + req = hug.test.patch(rules, "/policies/{}/rules/{}".format(policy_id, rule_id), + headers=headers, + body=instructions) return req -def test_get_rules(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, rules = get_rules(client, policy_id) - assert req.status_code == 200 +def test_add_rules_with_invalid_decision_instructions(): + from moon_manager.api import rules + + auth_headers = rules_helper.get_headers() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + + data = { + "meta_rule_id": meta_rule_id, + "rule": [subject_category_id, object_category_id, action_category_id], + "instructions": [ + {"decision": "invalid"}, + ], + "enabled": True + } + + with pytest.raises(exceptions.RuleContentError) as exception_info: + hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers=auth_headers) + assert "400: Rule Error" == str(exception_info.value) + + +def test_add_rules_with_meta_rule_not_linked_with_policy_model(): + from moon_manager.api import rules + + auth_headers = rules_helper.get_headers() + policy_id = builder.create_new_policy()[-1] + meta_rule_id = builder.create_new_meta_rule()[-1] + + data = { + "meta_rule_id": meta_rule_id, + "rule": ["subject_data_id2", "object_data_id2", "action_data_id2"], + "instructions": [ + {"decision": "grant"}, + ], + "enabled": True + } + + with pytest.raises(exceptions.MetaRuleNotLinkedWithPolicyModel) as exception_info: + hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers=auth_headers) + assert "400: MetaRule Not Linked With Model - Policy" == str(exception_info.value) + + +def test_add_rules_with_invalid_rule(): + from moon_manager.api import rules + + auth_headers = rules_helper.get_headers() + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + sub_data_id = builder.create_subject_data(policy_id, subject_category_id) + obj_data_id = builder.create_object_data(policy_id, object_category_id) + act_data_id = builder.create_action_data(policy_id, action_category_id) + + data = { + "meta_rule_id": meta_rule_id, + "rule": [obj_data_id, sub_data_id, act_data_id], + "instructions": [ + {"decision": "grant"}, + ], + "enabled": True + } + + with pytest.raises(exceptions.RuleContentError) as exception_info: + hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers=auth_headers) + assert "400: Rule Error" == str(exception_info.value) + + +def test_add_rules_with_no_given_decision_instructions(policy_id=None): + from moon_manager.api import rules + + auth_headers = rules_helper.get_headers() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + sub_data_id = builder.create_subject_data(policy_id, subject_category_id) + obj_data_id = builder.create_object_data(policy_id, object_category_id) + act_data_id = builder.create_action_data(policy_id, action_category_id) + + data = { + "meta_rule_id": meta_rule_id, + "rule": [sub_data_id, obj_data_id, act_data_id], + "instructions": [], + "enabled": True + } + + req = hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers=auth_headers) + + assert req.status == hug.HTTP_200 + + default_instruction = {"decision": "grant"} + rules = req.data['rules'] + rule_id = next(iter(req.data['rules'])) + assert rules[rule_id]["instructions"][0] == default_instruction + + +def test_get_rules(policy_id=None): + if policy_id == None: + policy = policy_helper.add_policies() + policy_id = next(iter(policy)) + + req, rules = get_rules(policy_id) + assert req.status == hug.HTTP_200 assert isinstance(rules, dict) assert "rules" in rules + return req, rules def test_add_rules(): - policy_id = utilities.get_policy_id() - client = utilities.register_client() - req, rules = add_rules(client, policy_id) - assert req.status_code == 200 - assert isinstance(rules, dict) - value = rules["rules"] - assert "rules" in rules - id = list(value.keys())[0] - assert value[id]["meta_rule_id"] == "meta_rule_id1" + req, rules, policy = builder.add_rules() + assert req.status == hug.HTTP_200 + + +def test_add_rules_without_policy_id(): + from moon_manager.api import rules + + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() + data = { + "meta_rule_id": meta_rule_id, + "rule": [subject_category_id, object_category_id, action_category_id], + "instructions": [ + {"decision": "grant"}, + ], + "enabled": True + } + + headers = rules_helper.get_headers() + with pytest.raises(exceptions.PolicyUnknown) as exception_info: + req = hug.test.post(rules, "/policies/{}/rules".format(None), body=json.dumps(data), + headers=headers) + assert "400: Policy Unknown" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == "400: Policy Unknown" + + +# +# def test_add_rules_without_meta_rule_id(): +# policy_id = utilities.get_policy_id() +# client = utilities.register_client() +# req, rules = add_rules_without_meta_rule_id(client, policy_id) +# assert req.status == 400 +# assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]" + +def test_add_rules_without_rule(): + from moon_utilities.auth_functions import get_api_key_for_user + policy = policy_helper.add_policies() + policy_id = next(iter(policy)) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + with pytest.raises(exceptions.ValidationKeyError) as exception_info: + req, rules = add_rules_without_rule(policy_id, headers=auth_headers) + assert "Invalid Key :rule not found" == str(exception_info.value) + # assert req.status == hug.HTTP_400 + # assert req.data["message"] == 'Invalid Key :rule not found' -def test_delete_rules(): - client = utilities.register_client() - policy_id = utilities.get_policy_id() - req, added_rules = get_rules(client, policy_id) - id = added_rules["rules"]['rules'][0]['id'] - rules = delete_rules(client, policy_id, id) - assert rules.status_code == 200 + +def test_update_rule_without_body(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req, rules, policy_id = builder.add_rules() + rule_id = list(rules['rules'].keys())[0] + + req = update_rule(policy_id, rule_id, instructions=None, headers=auth_headers) + + assert req.status == hug.HTTP_400 + + +def test_update_rule_without_instructions_in_body(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + data = {"instruction": [ # faute de frappe + {"decision": "deny"}, + ]} + + req, rules, policy_id = builder.add_rules() + rule_id = list(rules['rules'].keys())[0] + + req = update_rule(policy_id, rule_id, instructions=None, headers=auth_headers) + + assert req.status == hug.HTTP_400 + + +def test_update_rule(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + req, rules, policy_id = builder.add_rules() + rule_id = list(rules['rules'].keys())[0] + + data = {"instructions": [ + {"decision": "deny"}, + ]} + req = update_rule(policy_id, rule_id, data, headers=auth_headers) + + rules = get_rules(policy_id)[1]['rules']['rules'] + + rule = None + for rule_ in rules: + if rule_['id'] == rule_id: + rule = rule_ + break + + assert req.status == hug.HTTP_200 and rule['instructions'][0]['decision'] == "deny" + + +def test_delete_rules_with_invalid_parameters(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = delete_rules("", "", headers=auth_headers) + assert req.status == hug.HTTP_405 + + +def test_delete_rules_without_policy_id(): + from moon_manager.api import rules + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + sub_data_id = builder.create_subject_data(policy_id, subject_category_id) + obj_data_id = builder.create_object_data(policy_id, object_category_id) + act_data_id = builder.create_action_data(policy_id, action_category_id) + data = { + "meta_rule_id": meta_rule_id, + "rule": [sub_data_id, obj_data_id, act_data_id], + "instructions": [ + {"decision": "grant"}, + ], + "enabled": True + } + hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data), + headers={'Content-Type': 'application/json', + "X-Api-Key": get_api_key_for_user("admin")}) + req, added_rules = get_rules(policy_id) + id = list(added_rules["rules"]["rules"])[0]["id"] + rules = delete_rules(None, id, headers=auth_headers) + assert rules.status == hug.HTTP_200 diff --git a/moon_manager/tests/unit_python/api/test_slaves.py b/moon_manager/tests/unit_python/api/test_slaves.py new file mode 100644 index 00000000..29d5e62e --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_slaves.py @@ -0,0 +1,90 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from falcon import HTTP_200, HTTP_400, HTTP_405 +import hug +from uuid import uuid4 +from helpers import data_builder as builder + + +def test_get_slaves(): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import slave + req = hug.test.get(slave, 'slaves/', headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + assert "slaves" in req.data + for slave in req.data.get("slaves"): + assert "name" in slave + assert "description" in slave + assert "status" in slave + assert "server_ip" in slave + assert "port" in slave + + +def test_add_slave(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import slave + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:10000") + mocker.patch("subprocess.Popen", return_value=True) + data = { + "name": "test_slave_" + uuid4().hex, + "description": "description of test_slave" + } + req = hug.test.post(slave, "slave/", data, headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + found = False + assert "slaves" in req.data + for value in req.data["slaves"].values(): + assert "name" in value + assert "description" in value + assert "api_key" in value + assert "process" in value + assert "log" in value + assert "extra" in value + if value['name'] == data['name']: + found = True + assert value["description"] == "description of test_slave" + assert "port" in value.get("extra") + assert "status" in value.get("extra") + assert "server_ip" in value.get("extra") + break + assert found + + +def test_delete_slave(mocker): + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + from moon_manager.api import slave + mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url', + return_value="http://127.0.0.1:10000") + mocker.patch("subprocess.Popen", return_value=True) + data = { + "name": "test_slave_" + uuid4().hex, + "description": "description of test_slave" + } + req = hug.test.post(slave, "slave/", data, headers=auth_headers) + assert req.status == HTTP_200 + assert isinstance(req.data, dict) + req = hug.test.get(slave, 'slaves/', headers=auth_headers) + success_req = None + for key, value in req.data['slaves'].items(): + if value['name'] == data['name']: + success_req = hug.test.delete(slave, 'slave/{}'.format(key), headers=auth_headers) + break + assert success_req + assert success_req.status == HTTP_200 + diff --git a/moon_manager/tests/unit_python/api/utilities.py b/moon_manager/tests/unit_python/api/utilities.py index 66ca30c5..baf59a51 100644 --- a/moon_manager/tests/unit_python/api/utilities.py +++ b/moon_manager/tests/unit_python/api/utilities.py @@ -1,26 +1,16 @@ -import json - +# Software Name: MOON -def get_json(data): - return json.loads(data.decode("utf-8")) +# Version: 5.4 +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 -def register_client(): - import moon_manager.server - server = moon_manager.server.create_server() - client = server.app.test_client() - return client +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. -def get_policy_id(): - import api.test_policies as policies - client = register_client() - policy_id = '' - req, policy = policies.get_policies(client) - for id in policy['policies']: - if id: - policy_id = id - break - if not policy_id: - policies.add_policies(client, "testuser") - return policy_id +import json +from uuid import uuid4 +def get_json(data): + return data;#json.loads(data.decode("utf-8")) |