diff options
Diffstat (limited to 'moon_engine/tests/unit_python/api/wrapper')
5 files changed, 911 insertions, 0 deletions
diff --git a/moon_engine/tests/unit_python/api/wrapper/__init__.py b/moon_engine/tests/unit_python/api/wrapper/__init__.py new file mode 100644 index 00000000..582be686 --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/__init__.py @@ -0,0 +1,11 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + diff --git a/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py new file mode 100644 index 00000000..770e9bb6 --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py @@ -0,0 +1,209 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import json +import requests + + +def get_subject_object_action(): + from moon_engine.api.configuration import get_configuration + from moon_cache.cache import Cache + from moon_utilities.auth_functions import get_api_key_for_user + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + # Note: patching the cache for the test + CACHE.add_pipeline("b3d3e18a-bf33-40e8-b635-fd49e6634ccd", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + CACHE.add_pipeline("f8f49a77-9ceb-47b3-ac81-0f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + pdp = response.json() + pdp_id = next(iter(pdp['pdps'])) + policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0] + project_id = pdp['pdps'][pdp_id].get("vim_project_id") + + # response = requests.get("{}/policies".format(get_configuration("management").get("url")), + # headers=auth_headers) + # policies = response.json() + # policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subjects".format( + get_configuration("management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + + response = requests.get("{}/policies/{}/objects".format( + get_configuration("management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + + response = requests.get("{}/policies/{}/actions".format( + get_configuration("management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + return subjects, objects, actions, project_id + + +def test_post_authz(): + from moon_engine.plugins import oslowrapper + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + pdp = response.json() + pdp_id = next(iter(pdp['pdps'])) + policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0] + project_id = pdp['pdps'][pdp_id].get("vim_project_id") + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + + subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name'] + objects_name = objects['objects'][next(iter(objects['objects']))]['name'] + actions_name = actions['actions'][next(iter(actions['actions']))]['name'] + + # Note: patching the cache for the test + from moon_cache.cache import Cache + CACHE = Cache.getInstance() + CACHE.add_pipeline("b3d3e18abf3340e8b635fd49e6634ccd", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + CACHE.add_pipeline("f8f49a779ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + + _target = { + 'target': { + "name": objects_name, + }, + "project_id": project_id, + "user_id": subjects_name + } + _credentials = { + "project_id": project_id, + "user_id": subjects_name + } + + authz_data = { + 'rule': actions_name, + 'target': json.dumps(_target), + 'credentials': json.dumps(_credentials)} + req = hug.test.post(oslowrapper, "/authz/oslo", authz_data) + assert req.status == hug.HTTP_200 + + +def test_authz_true(): + from moon_engine.plugins import oslowrapper + + subjects, objects, actions, project_id = get_subject_object_action() + + _target = { + 'target': { + "name": objects['objects'][next(iter(objects['objects']))]['name'], + }, + "project_id": project_id, + "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name'] + } + _credentials = { + "project_id": project_id, + "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name'] + } + authz_data = { + 'rule': actions['actions'][next(iter(actions['actions']))]['name'], + 'target': json.dumps(_target), + 'credentials': json.dumps(_credentials)} + + req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data) + + assert req.status == hug.HTTP_200 and req.data is not None and req.data + + +def test_authz_error_response_code(): + from moon_engine.plugins import oslowrapper + + subjects, objects, actions, project_id = get_subject_object_action() + + _target = { + 'target': { + "name": objects['objects'][next(iter(objects['objects']))]['name'], + }, + "project_id": "a64beb1cc224474fb4badd431f3e7106", # invalid project id + "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name'] + } + authz_data = { + 'rule': actions['actions'][next(iter(actions['actions']))]['name'], + 'target': json.dumps(_target), + 'credentials': 'null'} + + print(authz_data) + req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data) + + assert req.status != hug.HTTP_200 + +# def test_authz_error_no_interface_key(context): +# import moon_wrapper.server +# server = moon_wrapper.server.main() +# client = server.app.test_client() +# _target = { +# 'target': { +# "name": context.get('object_name'), +# }, +# "project_id": context.get('project_with_no_interface_key'), +# "user_id": context.get('subject_name') +# } +# authz_data = { +# 'rule': context.get('action_name'), +# 'target': json.dumps(_target), +# 'credentials': 'null'} +# req = client.post("/authz/oslo", data=json.dumps(authz_data)) +# +# assert req.data == b"False" diff --git a/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py new file mode 100644 index 00000000..ed16883d --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py @@ -0,0 +1,32 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug + + +def test_get_pipelines(): + from moon_engine.api.wrapper.api import pipeline + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = hug.test.get(pipeline, "/pipelines", headers=auth_headers ) + assert req.status == hug.HTTP_200 + assert isinstance(req.data, dict) + assert "pipelines" in req.data diff --git a/moon_engine/tests/unit_python/api/wrapper/test_update.py b/moon_engine/tests/unit_python/api/wrapper/test_update.py new file mode 100644 index 00000000..75e7ef2d --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_update.py @@ -0,0 +1,602 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import requests + + +def test_wrapper_update_policy_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + key = next(iter(policies['policies'])) + policies['policies'][key]['name'] = "new " + policies['policies'][key]['name'] + req = hug.test.put(update, "/update/policy/{}".format(key), + policies['policies'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_policy(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + key = next(iter(policies['policies'])) + req = hug.test.delete(update, "/update/policy/{}".format(key), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_policy_not_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + key = next(iter(policies['policies'])) + policies['policies'][key]['name'] = "new " + policies['policies'][key]['name'] + req = hug.test.put(update, "/update/policy/{}".format("eac0ecd09ceb47b3ac810f01ef71b4e0"), + policies['policies'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_wrapper_update_pdp_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name'] + req = hug.test.put(update, "/update/pdp/{}".format(key), + pdps['pdps'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_pdp(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + req = hug.test.delete(update, "/update/pdp/{}".format(key), headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_assignments(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subject_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + subject_assignments = response.json() + assert 'subject_assignments' in subject_assignments and len( + subject_assignments['subject_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "subject"), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/object_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + object_assignments = response.json() + assert 'object_assignments' in object_assignments and len( + object_assignments['object_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "object"), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/action_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + action_assignments = response.json() + assert 'action_assignments' in action_assignments and len( + action_assignments['action_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "action"), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_perimeter_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + assert 'subjects' in subjects and len(subjects['subjects']) + + for key in subjects['subjects']: + subjects['subjects'][key]['name'] = "new " + subjects['subjects'][key]['name'] + req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'), + subjects['subjects'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + assert 'objects' in objects and len(objects['objects']) + + for key in objects['objects']: + objects['objects'][key]['name'] = "new " + objects['objects'][key]['name'] + req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'), + objects['objects'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + assert 'actions' in actions and len(actions['actions']) + + for key in actions['actions']: + actions['actions'][key]['name'] = "new " + actions['actions'][key]['name'] + req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'), + actions['actions'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_perimeter(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + assert 'subjects' in subjects and len(subjects['subjects']) + + for key in subjects['subjects']: + req = hug.test.delete(update, + "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + assert 'objects' in objects and len(objects['objects']) + + for key in objects['objects']: + req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + assert 'actions' in actions and len(actions['actions']) + + for key in actions['actions']: + req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'), + headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_rule(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get( + "{}/policies/{}/rules".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + rules = response.json() + assert 'rules' in rules and 'policy_id' in rules['rules'] + assert rules['rules']['policy_id'] == policy_id + assert len(rules['rules']['rules']) + for i in range(0, len(rules['rules']['rules'])): + req = hug.test.delete(update, "/update/rule/{}/{}".format(policy_id, + rules['rules']['rules'][i]['id']), headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_model_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = response.json() + key = next(iter(models['models'])) + models['models'][key]['name'] = "new " + models['models'][key]['name'] + req = hug.test.put(update, "/update/model/{}".format(key), + models['models'][key], headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_model(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = response.json() + key = next(iter(models['models'])) + req = hug.test.delete(update, "/update/model/{}".format(key), headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_category(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/subject_categories".format(get_configuration("management").get("url")), + headers=auth_headers) + subject_categories = response.json() + category_id = next(iter(subject_categories['subject_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'subject'), + headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/action_categories".format(get_configuration("management").get("url")), + headers=auth_headers) + action_categories = response.json() + category_id = next(iter(action_categories['action_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'action'), + headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/object_categories".format(get_configuration("management").get("url")), + headers=auth_headers) + object_categories = response.json() + category_id = next(iter(object_categories['object_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'object'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_meta_rule_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = response.json() + key = next(iter(meta_rules['meta_rules'])) + meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name'] + req = hug.test.put(update, "/update/meta_rule/{}".format(key), + meta_rules['meta_rules'][key], headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_meta_rule(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = response.json() + key = next(iter(meta_rules['meta_rules'])) + req = hug.test.delete(update, "/update/meta_rule/{}".format(key), headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_data(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get( + "{}/policies/{}/subject_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + subject_data_id = next(iter(response.json()['subject_data'][0]['data'])) + req = hug.test.delete(update, "/update/data/{}/{}".format(subject_data_id,'subject'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get( + "{}/policies/{}/object_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + object_data_id = next(iter(response.json()['object_data'][0]['data'])) + req = hug.test.delete(update, "/update/data/{}/{}".format(object_data_id, 'object'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get( + "{}/policies/{}/action_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + action_data_id = next(iter(response.json()['action_data'][0]['data'])) + req = hug.test.delete(update, "/update/data/{}/{}".format(action_data_id, 'action'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) diff --git a/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py new file mode 100644 index 00000000..c4df249f --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py @@ -0,0 +1,57 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import requests + + +def test_wrapper_get_authz(): + from moon_engine.api.wrapper.api import authz + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdp = response.json() + pdp_id = next(iter(pdp['pdps'])) + policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0] + project_id = pdp['pdps'][pdp_id].get("vim_project_id") + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + + subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name'] + objects_name = objects['objects'][next(iter(objects['objects']))]['name'] + actions_name = actions['actions'][next(iter(actions['actions']))]['name'] + + req = hug.test.get(authz, "/authz/{}/{}/{}/{}".format( + project_id, subjects_name, objects_name, actions_name)) + assert req.status == hug.HTTP_200 |