diff options
author | Thomas Duval <thomas.duval@orange.com> | 2020-06-03 10:06:52 +0200 |
---|---|---|
committer | Thomas Duval <thomas.duval@orange.com> | 2020-06-03 10:06:52 +0200 |
commit | 7bb53c64da2dcf88894bfd31503accdd81498f3d (patch) | |
tree | 4310e12366818af27947b5e2c80cb162da93a4b5 /moon_engine/tests/unit_python/api | |
parent | cbea4e360e9bfaa9698cf7c61c83c96a1ba89b8c (diff) |
Update to new version 5.4HEADstable/jermamaster
Signed-off-by: Thomas Duval <thomas.duval@orange.com>
Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea
Diffstat (limited to 'moon_engine/tests/unit_python/api')
11 files changed, 1571 insertions, 0 deletions
diff --git a/moon_engine/tests/unit_python/api/__init__.py b/moon_engine/tests/unit_python/api/__init__.py new file mode 100644 index 00000000..1856aa2c --- /dev/null +++ b/moon_engine/tests/unit_python/api/__init__.py @@ -0,0 +1,12 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + diff --git a/moon_engine/tests/unit_python/api/moon.yaml b/moon_engine/tests/unit_python/api/moon.yaml new file mode 100644 index 00000000..9272ccce --- /dev/null +++ b/moon_engine/tests/unit_python/api/moon.yaml @@ -0,0 +1,54 @@ +type: "pipeline" +uuid: "a64beb1c-c224-474f-b4ba-dd43173e7101" +manager_url: "http://127.0.0.1:20000" +incremental_updates: false +api_token: +data: conf/policy_example.json +debug: true +management: + token_file: moon_engine_users.json + +orchestration: + driver: moon_engine.plugins.pyorchestrator + connection: local + port: 20000...20100 + config_dir: /tmp + +authorization: + driver: moon_engine.plugins.authz + +plugins: + directory: /tmp + +logging: + version: 1 + + formatters: + brief: + format: "%(levelname)s %(name)s %(message)-30s" + custom: + format: "%(asctime)-15s %(levelname)s %(name)s %(message)s" + + handlers: + console: + class : logging.StreamHandler + formatter: custom + level : INFO + stream : ext://sys.stdout + file: + class : logging.handlers.RotatingFileHandler + formatter: custom + level : DEBUG + filename: /tmp/moon_engine.log + maxBytes: 1048576 + backupCount: 3 + + loggers: + moon: + level: DEBUG + handlers: [console, file] + propagate: no + + root: + level: ERROR + handlers: [console]
\ No newline at end of file diff --git a/moon_engine/tests/unit_python/api/pipeline/__init__.py b/moon_engine/tests/unit_python/api/pipeline/__init__.py new file mode 100644 index 00000000..582be686 --- /dev/null +++ b/moon_engine/tests/unit_python/api/pipeline/__init__.py @@ -0,0 +1,11 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + diff --git a/moon_engine/tests/unit_python/api/pipeline/test_authz.py b/moon_engine/tests/unit_python/api/pipeline/test_authz.py new file mode 100644 index 00000000..c405ddfd --- /dev/null +++ b/moon_engine/tests/unit_python/api/pipeline/test_authz.py @@ -0,0 +1,37 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug + + +def test_get_authz_authorized(): + from moon_engine.api.pipeline import authz + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + # FIXME: https://gitlab.forge.orange-labs.fr/moon/moon_cache/issues/1 + # req = benchmark(hug.test.get, authz, + # "/authz/89ba91c1-8dd5-4abf-bfde-7a66936c51a6/67b8008a-3f8d-4f8e-847e-b628f0f7ca0e/cdb3df22-0dc0" + # "-5a6e-a333-4b994827b068", headers=auth_headers) + # # req = hug.test.get(authz, "/authz/test/test/test", headers=auth_headers) + # assert req.status == hug.HTTP_204 + + +def test_get_authz_unauthorized(): + from moon_engine.api.pipeline import authz + from moon_utilities.auth_functions import get_api_key_for_user + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + # FIXME: https://gitlab.forge.orange-labs.fr/moon/moon_cache/issues/1 + # req = benchmark(hug.test.get, authz, + # "/authz/31fd15ad-1478-4a96-96fc-c887dddbfaf9/67b8008a-3f8d-4f8e-847e-b628f0f7ca0e/cdb3df22-0dc0" + # "-5a6e-a333-4b994827b068", headers=auth_headers) + # # req = hug.test.get(authz, "/authz/test/test/test", headers=auth_headers) + # assert req.status == hug.HTTP_403 diff --git a/moon_engine/tests/unit_python/api/pipeline/test_update.py b/moon_engine/tests/unit_python/api/pipeline/test_update.py new file mode 100644 index 00000000..94d5a4e4 --- /dev/null +++ b/moon_engine/tests/unit_python/api/pipeline/test_update.py @@ -0,0 +1,525 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import requests +from uuid import uuid4 + + +def test_pipeline_update_policy_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + key = next(iter(policies['policies'])) + policies['policies'][key]['name'] = "new " + policies['policies'][key]['name'] + req = hug.test.put(update, "/update/policy/{}".format(key), + policies['policies'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_update_policy_not_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + key = next(iter(policies['policies'])) + policies['policies'][key]['name'] = "new " + policies['policies'][key]['name'] + req = hug.test.put(update, "/update/policy/{}".format(uuid4().hex), + policies['policies'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_delete_policy(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + key = next(iter(policies['policies'])) + req = hug.test.delete(update, "/update/policy/{}".format(key), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_update_pdp_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name'] + req = hug.test.put(update, "/update/pdp/{}".format(key), + pdps['pdps'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_update_pdp_not_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name'] + req = hug.test.put(update, "/update/pdp/{}".format(uuid4().hex), + pdps['pdps'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_delete_pdp_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + req = hug.test.delete(update, "/update/pdp/{}".format(key), headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_update_perimeter_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_key = next(iter(policies['policies'])) + + subject_response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + subjects = subject_response.json() + subj_key = next(iter(subjects['subjects'])) + subjects['subjects'][subj_key]['name'] = "updated_" + subjects['subjects'][subj_key]['name'] + + subject_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(subj_key, policy_key, + "subject"), + subjects['subjects'][subj_key], headers=auth_headers) + assert subject_req.status == hug.HTTP_208 + + """ object category """ + object_response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + objects = object_response.json() + obj_key = next(iter(objects['objects'])) + objects['objects'][obj_key]['name'] = "updated_" + objects['objects'][obj_key]['name'] + + object_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(obj_key, policy_key, "object") + , objects['objects'][obj_key], headers=auth_headers) + assert object_req.status == hug.HTTP_208 + + """ action category """ + action_response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + actions = action_response.json() + action_key = next(iter(actions['actions'])) + actions['actions'][action_key]['name'] = "updated_" + actions['actions'][action_key]['name'] + + action_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(action_key, policy_key, + "action"), + actions['actions'][action_key], headers=auth_headers) + assert action_req.status == hug.HTTP_208 + + +def test_pipeline_update_perimeter_not_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_key = next(iter(policies['policies'])) + + subject_response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + subjects = subject_response.json() + subj_key = next(iter(subjects['subjects'])) + subjects['subjects'][subj_key]['name'] = "updated_" + subjects['subjects'][subj_key]['name'] + + subject_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(uuid4().hex, policy_key, + "subject"), + subjects['subjects'][subj_key], headers=auth_headers) + assert subject_req.status == hug.HTTP_208 + + """ object category """ + object_response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + objects = object_response.json() + obj_key = next(iter(objects['objects'])) + objects['objects'][obj_key]['name'] = "updated_" + objects['objects'][obj_key]['name'] + + object_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(uuid4().hex, policy_key, "object") + , objects['objects'][obj_key], headers=auth_headers) + assert object_req.status == hug.HTTP_208 + + """ action category """ + action_response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + actions = action_response.json() + action_key = next(iter(actions['actions'])) + actions['actions'][action_key]['name'] = "updated_" + actions['actions'][action_key]['name'] + + action_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(uuid4().hex, policy_key, + "action"), + actions['actions'][action_key], headers=auth_headers) + assert action_req.status == hug.HTTP_208 + + +def test_pipeline_delete_perimeter_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + policies_response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = policies_response.json() + policy_key = next(iter(policies['policies'])) + + subject_response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + subjects = subject_response.json() + subj_key = next(iter(subjects['subjects'])) + subjects['subjects'][subj_key]['name'] = "updated_" + subjects['subjects'][subj_key]['name'] + + delete_subject_response = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(subj_key, policy_key, + "subject"), + headers=auth_headers) + assert delete_subject_response.status == hug.HTTP_202 + + object_response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + objects = object_response.json() + assert 'objects' in objects and len(objects['objects']) + + for key in objects['objects']: + delete_object_response = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_key, 'object'), + headers=auth_headers) + assert delete_object_response.status == hug.HTTP_202 + + action_response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_key), headers=auth_headers) + actions = action_response.json() + assert 'actions' in actions and len(actions['actions']) + + for key in actions['actions']: + delete_action_response = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_key, 'action'), + headers=auth_headers) + assert delete_action_response.status == hug.HTTP_202 + + +def test_pipeline_delete_assignments(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + policy_id = next(iter(policies['policies'])) + + sub_assig_response = requests.get("{}/policies/{}/subject_assignments".format(get_configuration("management").get("url"), + policy_id), headers=auth_headers) + + subject_assignments = sub_assig_response.json() + assert 'subject_assignments' in subject_assignments and len( + subject_assignments['subject_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "subject"), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + obj_assig_response = requests.get("{}/policies/{}/object_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + object_assignments = obj_assig_response.json() + assert 'object_assignments' in object_assignments and len( + object_assignments['object_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "object"), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + action_assig_response = requests.get("{}/policies/{}/action_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + action_assignments = action_assig_response.json() + assert 'action_assignments' in action_assignments and len( + action_assignments['action_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "action"), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_delete_rule(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + policy_response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = policy_response.json() + policy_id = next(iter(policies['policies'])) + + rules_response = requests.get( + "{}/policies/{}/rules".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + rules = rules_response.json() + + assert len(rules['rules']['rules']) + for i in range(0, len(rules['rules']['rules'])): + req = hug.test.delete(update, "/update/rule/{}/{}".format(policy_id, rules['rules'][ + 'rules'][i]['id']), headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_update_model_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + model_response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = model_response.json() + key = next(iter(models['models'])) + models['models'][key]['name'] = "new " + models['models'][key]['name'] + req = hug.test.put(update, "/update/model/{}".format(key), + models['models'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_update_model_not_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + model_response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = model_response.json() + key = next(iter(models['models'])) + models['models'][key]['name'] = "new " + models['models'][key]['name'] + req = hug.test.put(update, "/update/model/{}".format(uuid4().hex), + models['models'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_delete_model_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + model_response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = model_response.json() + key = next(iter(models['models'])) + req = hug.test.delete(update, "/update/model/{}".format(key), headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_delete_category(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + subject_cat_response = requests.get("{}/subject_categories".format(get_configuration( + "management").get("url")), headers=auth_headers) + subject_categories = subject_cat_response.json() + category_id = next(iter(subject_categories['subject_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'subject'), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + action_cat_response = requests.get("{}/action_categories".format(get_configuration( + "management").get("url")), + headers=auth_headers) + action_categories = action_cat_response.json() + category_id = next(iter(action_categories['action_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'action'), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + obj_cat_response = requests.get("{}/object_categories".format(get_configuration( + "management").get("url")), + headers=auth_headers) + object_categories = obj_cat_response.json() + category_id = next(iter(object_categories['object_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'object'), + headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_update_meta_rule_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + meta_rules_response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = meta_rules_response.json() + key = next(iter(meta_rules['meta_rules'])) + meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name'] + req = hug.test.put(update, "/update/meta_rule/{}".format(key), + meta_rules['meta_rules'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_update_meta_rule_not_existed(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + meta_rules_response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = meta_rules_response.json() + key = next(iter(meta_rules['meta_rules'])) + meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name'] + req = hug.test.put(update, "/update/meta_rule/{}".format(uuid4().hex), + meta_rules['meta_rules'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_pipeline_delete_meta_rule(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + meta_rules_response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = meta_rules_response.json() + key = next(iter(meta_rules['meta_rules'])) + req = hug.test.delete(update, "/update/meta_rule/{}".format(key), headers=auth_headers) + assert req.status == hug.HTTP_202 + + +def test_pipeline_delete_data(): + from moon_engine.api.pipeline import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_engine.api.configuration import get_configuration + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + policies_response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = policies_response.json() + policy_id = next(iter(policies['policies'])) + + subject_data_response = requests.get( + "{}/policies/{}/subject_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + subject_data_id = next(iter(subject_data_response.json()['subject_data'][0]['data'])) + delete_subject_data_response = hug.test.delete(update, "/update/data/{}/{}".format(subject_data_id, 'subject'), + headers=auth_headers) + assert delete_subject_data_response.status == hug.HTTP_202 + + object_data_response = requests.get( + "{}/policies/{}/object_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + object_data_id = next(iter(object_data_response.json()['object_data'][0]['data'])) + delete_object_data_response = hug.test.delete(update, "/update/data/{}/{}".format(object_data_id, 'object'), + headers=auth_headers) + assert delete_object_data_response.status == hug.HTTP_202 + + action_data_response = requests.get( + "{}/policies/{}/action_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + action_data_id = next(iter(action_data_response.json()['action_data'][0]['data'])) + delete_action_data_response = hug.test.delete(update, "/update/data/{}/{}".format(action_data_id, 'action'), + headers=auth_headers) + assert delete_action_data_response.status == hug.HTTP_202 + + +# def test_pipeline_delete_attributes(): +# from moon_engine.api.pipeline import update +# from moon_utilities.auth_functions import get_api_key_for_user +# from moon_engine.api.configuration import get_configuration +# +# auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} +# +# attributes_response = requests.get( +# "{}/attributes".format(get_configuration("management").get("url")), +# headers=auth_headers) +# +# attributes = attributes_response.json() +# key = next(iter(attributes['attributes'])) +# req = hug.test.delete(update, "/update/attributes/{}".format(key), headers=auth_headers) +# assert req.status == hug.HTTP_202 + + diff --git a/moon_engine/tests/unit_python/api/test_status.py b/moon_engine/tests/unit_python/api/test_status.py new file mode 100644 index 00000000..e38f35e6 --- /dev/null +++ b/moon_engine/tests/unit_python/api/test_status.py @@ -0,0 +1,21 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug + + +def test_status(benchmark): + from moon_engine.api import status + + # Fixme: Add tests on enforce function to have a benchmark on enforce rapidity + req = benchmark(hug.test.get, status, "/status") + assert isinstance(req.data, dict) diff --git a/moon_engine/tests/unit_python/api/wrapper/__init__.py b/moon_engine/tests/unit_python/api/wrapper/__init__.py new file mode 100644 index 00000000..582be686 --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/__init__.py @@ -0,0 +1,11 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + diff --git a/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py new file mode 100644 index 00000000..770e9bb6 --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py @@ -0,0 +1,209 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import json +import requests + + +def get_subject_object_action(): + from moon_engine.api.configuration import get_configuration + from moon_cache.cache import Cache + from moon_utilities.auth_functions import get_api_key_for_user + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + # Note: patching the cache for the test + CACHE.add_pipeline("b3d3e18a-bf33-40e8-b635-fd49e6634ccd", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + CACHE.add_pipeline("f8f49a77-9ceb-47b3-ac81-0f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + pdp = response.json() + pdp_id = next(iter(pdp['pdps'])) + policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0] + project_id = pdp['pdps'][pdp_id].get("vim_project_id") + + # response = requests.get("{}/policies".format(get_configuration("management").get("url")), + # headers=auth_headers) + # policies = response.json() + # policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subjects".format( + get_configuration("management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + + response = requests.get("{}/policies/{}/objects".format( + get_configuration("management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + + response = requests.get("{}/policies/{}/actions".format( + get_configuration("management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + return subjects, objects, actions, project_id + + +def test_post_authz(): + from moon_engine.plugins import oslowrapper + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + pdp = response.json() + pdp_id = next(iter(pdp['pdps'])) + policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0] + project_id = pdp['pdps'][pdp_id].get("vim_project_id") + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + + subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name'] + objects_name = objects['objects'][next(iter(objects['objects']))]['name'] + actions_name = actions['actions'][next(iter(actions['actions']))]['name'] + + # Note: patching the cache for the test + from moon_cache.cache import Cache + CACHE = Cache.getInstance() + CACHE.add_pipeline("b3d3e18abf3340e8b635fd49e6634ccd", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + CACHE.add_pipeline("f8f49a779ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + + _target = { + 'target': { + "name": objects_name, + }, + "project_id": project_id, + "user_id": subjects_name + } + _credentials = { + "project_id": project_id, + "user_id": subjects_name + } + + authz_data = { + 'rule': actions_name, + 'target': json.dumps(_target), + 'credentials': json.dumps(_credentials)} + req = hug.test.post(oslowrapper, "/authz/oslo", authz_data) + assert req.status == hug.HTTP_200 + + +def test_authz_true(): + from moon_engine.plugins import oslowrapper + + subjects, objects, actions, project_id = get_subject_object_action() + + _target = { + 'target': { + "name": objects['objects'][next(iter(objects['objects']))]['name'], + }, + "project_id": project_id, + "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name'] + } + _credentials = { + "project_id": project_id, + "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name'] + } + authz_data = { + 'rule': actions['actions'][next(iter(actions['actions']))]['name'], + 'target': json.dumps(_target), + 'credentials': json.dumps(_credentials)} + + req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data) + + assert req.status == hug.HTTP_200 and req.data is not None and req.data + + +def test_authz_error_response_code(): + from moon_engine.plugins import oslowrapper + + subjects, objects, actions, project_id = get_subject_object_action() + + _target = { + 'target': { + "name": objects['objects'][next(iter(objects['objects']))]['name'], + }, + "project_id": "a64beb1cc224474fb4badd431f3e7106", # invalid project id + "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name'] + } + authz_data = { + 'rule': actions['actions'][next(iter(actions['actions']))]['name'], + 'target': json.dumps(_target), + 'credentials': 'null'} + + print(authz_data) + req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data) + + assert req.status != hug.HTTP_200 + +# def test_authz_error_no_interface_key(context): +# import moon_wrapper.server +# server = moon_wrapper.server.main() +# client = server.app.test_client() +# _target = { +# 'target': { +# "name": context.get('object_name'), +# }, +# "project_id": context.get('project_with_no_interface_key'), +# "user_id": context.get('subject_name') +# } +# authz_data = { +# 'rule': context.get('action_name'), +# 'target': json.dumps(_target), +# 'credentials': 'null'} +# req = client.post("/authz/oslo", data=json.dumps(authz_data)) +# +# assert req.data == b"False" diff --git a/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py new file mode 100644 index 00000000..ed16883d --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py @@ -0,0 +1,32 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug + + +def test_get_pipelines(): + from moon_engine.api.wrapper.api import pipeline + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + req = hug.test.get(pipeline, "/pipelines", headers=auth_headers ) + assert req.status == hug.HTTP_200 + assert isinstance(req.data, dict) + assert "pipelines" in req.data diff --git a/moon_engine/tests/unit_python/api/wrapper/test_update.py b/moon_engine/tests/unit_python/api/wrapper/test_update.py new file mode 100644 index 00000000..75e7ef2d --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_update.py @@ -0,0 +1,602 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import requests + + +def test_wrapper_update_policy_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + key = next(iter(policies['policies'])) + policies['policies'][key]['name'] = "new " + policies['policies'][key]['name'] + req = hug.test.put(update, "/update/policy/{}".format(key), + policies['policies'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_policy(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + key = next(iter(policies['policies'])) + req = hug.test.delete(update, "/update/policy/{}".format(key), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_policy_not_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + key = next(iter(policies['policies'])) + policies['policies'][key]['name'] = "new " + policies['policies'][key]['name'] + req = hug.test.put(update, "/update/policy/{}".format("eac0ecd09ceb47b3ac810f01ef71b4e0"), + policies['policies'][key], headers=auth_headers) + assert req.status == hug.HTTP_208 + + +def test_wrapper_update_pdp_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name'] + req = hug.test.put(update, "/update/pdp/{}".format(key), + pdps['pdps'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_pdp(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdps = response.json() + key = next(iter(pdps['pdps'])) + req = hug.test.delete(update, "/update/pdp/{}".format(key), headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_assignments(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subject_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + subject_assignments = response.json() + assert 'subject_assignments' in subject_assignments and len( + subject_assignments['subject_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "subject"), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/object_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + object_assignments = response.json() + assert 'object_assignments' in object_assignments and len( + object_assignments['object_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "object"), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/action_assignments".format(get_configuration( + "management").get("url"), policy_id), + headers=auth_headers) + action_assignments = response.json() + assert 'action_assignments' in action_assignments and len( + action_assignments['action_assignments']) + req = hug.test.delete(update, + "/update/assignment/{}/{}/".format(policy_id, "action"), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_perimeter_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + assert 'subjects' in subjects and len(subjects['subjects']) + + for key in subjects['subjects']: + subjects['subjects'][key]['name'] = "new " + subjects['subjects'][key]['name'] + req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'), + subjects['subjects'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + assert 'objects' in objects and len(objects['objects']) + + for key in objects['objects']: + objects['objects'][key]['name'] = "new " + objects['objects'][key]['name'] + req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'), + objects['objects'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + assert 'actions' in actions and len(actions['actions']) + + for key in actions['actions']: + actions['actions'][key]['name'] = "new " + actions['actions'][key]['name'] + req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'), + actions['actions'][key], headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_perimeter(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + assert 'subjects' in subjects and len(subjects['subjects']) + + for key in subjects['subjects']: + req = hug.test.delete(update, + "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + assert 'objects' in objects and len(objects['objects']) + + for key in objects['objects']: + req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + assert 'actions' in actions and len(actions['actions']) + + for key in actions['actions']: + req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'), + headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_rule(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get( + "{}/policies/{}/rules".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + rules = response.json() + assert 'rules' in rules and 'policy_id' in rules['rules'] + assert rules['rules']['policy_id'] == policy_id + assert len(rules['rules']['rules']) + for i in range(0, len(rules['rules']['rules'])): + req = hug.test.delete(update, "/update/rule/{}/{}".format(policy_id, + rules['rules']['rules'][i]['id']), headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_model_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = response.json() + key = next(iter(models['models'])) + models['models'][key]['name'] = "new " + models['models'][key]['name'] + req = hug.test.put(update, "/update/model/{}".format(key), + models['models'][key], headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_model(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/models".format(get_configuration("management").get("url")), + headers=auth_headers) + + models = response.json() + key = next(iter(models['models'])) + req = hug.test.delete(update, "/update/model/{}".format(key), headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_category(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/subject_categories".format(get_configuration("management").get("url")), + headers=auth_headers) + subject_categories = response.json() + category_id = next(iter(subject_categories['subject_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'subject'), + headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/action_categories".format(get_configuration("management").get("url")), + headers=auth_headers) + action_categories = response.json() + category_id = next(iter(action_categories['action_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'action'), + headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get("{}/object_categories".format(get_configuration("management").get("url")), + headers=auth_headers) + object_categories = response.json() + category_id = next(iter(object_categories['object_categories'])) + req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'object'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_update_meta_rule_existed(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = response.json() + key = next(iter(meta_rules['meta_rules'])) + meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name'] + req = hug.test.put(update, "/update/meta_rule/{}".format(key), + meta_rules['meta_rules'][key], headers=auth_headers) + + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_meta_rule(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")), + headers=auth_headers) + + meta_rules = response.json() + key = next(iter(meta_rules['meta_rules'])) + req = hug.test.delete(update, "/update/meta_rule/{}".format(key), headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + +def test_wrapper_delete_data(): + from moon_engine.api.wrapper.api import update + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user( + "admin")) + + CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", { + "name": "test", + "description": "test", + "host": "127.0.0.1", + "port": 20000, + }) + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/policies".format(get_configuration("management").get("url")), + headers=auth_headers) + + policies = response.json() + policy_id = next(iter(policies['policies'])) + + response = requests.get( + "{}/policies/{}/subject_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + subject_data_id = next(iter(response.json()['subject_data'][0]['data'])) + req = hug.test.delete(update, "/update/data/{}/{}".format(subject_data_id,'subject'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get( + "{}/policies/{}/object_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + object_data_id = next(iter(response.json()['object_data'][0]['data'])) + req = hug.test.delete(update, "/update/data/{}/{}".format(object_data_id, 'object'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) + + response = requests.get( + "{}/policies/{}/action_data".format(get_configuration("management").get("url"), policy_id), + headers=auth_headers) + action_data_id = next(iter(response.json()['action_data'][0]['data'])) + req = hug.test.delete(update, "/update/data/{}/{}".format(action_data_id, 'action'), + headers=auth_headers) + assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200) diff --git a/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py new file mode 100644 index 00000000..c4df249f --- /dev/null +++ b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py @@ -0,0 +1,57 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +import hug +import requests + + +def test_wrapper_get_authz(): + from moon_engine.api.wrapper.api import authz + from moon_utilities.auth_functions import get_api_key_for_user + from moon_cache.cache import Cache + from moon_engine.api.configuration import get_configuration + CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"), + incremental=get_configuration("incremental_updates"), + manager_api_key=get_configuration("api_token")) + + CACHE.set_current_server(url=get_configuration("management").get("url"), + api_key=get_api_key_for_user("admin")) + + auth_headers = {"X-Api-Key": get_api_key_for_user("admin")} + + response = requests.get("{}/pdp".format(get_configuration("management").get("url")), + headers=auth_headers) + + pdp = response.json() + pdp_id = next(iter(pdp['pdps'])) + policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0] + project_id = pdp['pdps'][pdp_id].get("vim_project_id") + + response = requests.get("{}/policies/{}/subjects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + subjects = response.json() + + response = requests.get("{}/policies/{}/objects".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + objects = response.json() + + response = requests.get("{}/policies/{}/actions".format(get_configuration( + "management").get("url"), policy_id), headers=auth_headers) + actions = response.json() + + subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name'] + objects_name = objects['objects'][next(iter(objects['objects']))]['name'] + actions_name = actions['actions'][next(iter(actions['actions']))]['name'] + + req = hug.test.get(authz, "/authz/{}/{}/{}/{}".format( + project_id, subjects_name, objects_name, actions_name)) + assert req.status == hug.HTTP_200 |