aboutsummaryrefslogtreecommitdiffstats
path: root/moon_engine/tests/unit_python/api
diff options
context:
space:
mode:
Diffstat (limited to 'moon_engine/tests/unit_python/api')
-rw-r--r--moon_engine/tests/unit_python/api/__init__.py12
-rw-r--r--moon_engine/tests/unit_python/api/moon.yaml54
-rw-r--r--moon_engine/tests/unit_python/api/pipeline/__init__.py11
-rw-r--r--moon_engine/tests/unit_python/api/pipeline/test_authz.py37
-rw-r--r--moon_engine/tests/unit_python/api/pipeline/test_update.py525
-rw-r--r--moon_engine/tests/unit_python/api/test_status.py21
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/__init__.py11
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py209
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_pipeline.py32
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_update.py602
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py57
11 files changed, 1571 insertions, 0 deletions
diff --git a/moon_engine/tests/unit_python/api/__init__.py b/moon_engine/tests/unit_python/api/__init__.py
new file mode 100644
index 00000000..1856aa2c
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/__init__.py
@@ -0,0 +1,12 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
diff --git a/moon_engine/tests/unit_python/api/moon.yaml b/moon_engine/tests/unit_python/api/moon.yaml
new file mode 100644
index 00000000..9272ccce
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/moon.yaml
@@ -0,0 +1,54 @@
+type: "pipeline"
+uuid: "a64beb1c-c224-474f-b4ba-dd43173e7101"
+manager_url: "http://127.0.0.1:20000"
+incremental_updates: false
+api_token:
+data: conf/policy_example.json
+debug: true
+management:
+ token_file: moon_engine_users.json
+
+orchestration:
+ driver: moon_engine.plugins.pyorchestrator
+ connection: local
+ port: 20000...20100
+ config_dir: /tmp
+
+authorization:
+ driver: moon_engine.plugins.authz
+
+plugins:
+ directory: /tmp
+
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: "%(levelname)s %(name)s %(message)-30s"
+ custom:
+ format: "%(asctime)-15s %(levelname)s %(name)s %(message)s"
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ formatter: custom
+ level : INFO
+ stream : ext://sys.stdout
+ file:
+ class : logging.handlers.RotatingFileHandler
+ formatter: custom
+ level : DEBUG
+ filename: /tmp/moon_engine.log
+ maxBytes: 1048576
+ backupCount: 3
+
+ loggers:
+ moon:
+ level: DEBUG
+ handlers: [console, file]
+ propagate: no
+
+ root:
+ level: ERROR
+ handlers: [console] \ No newline at end of file
diff --git a/moon_engine/tests/unit_python/api/pipeline/__init__.py b/moon_engine/tests/unit_python/api/pipeline/__init__.py
new file mode 100644
index 00000000..582be686
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/pipeline/__init__.py
@@ -0,0 +1,11 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
diff --git a/moon_engine/tests/unit_python/api/pipeline/test_authz.py b/moon_engine/tests/unit_python/api/pipeline/test_authz.py
new file mode 100644
index 00000000..c405ddfd
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/pipeline/test_authz.py
@@ -0,0 +1,37 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+
+
+def test_get_authz_authorized():
+ from moon_engine.api.pipeline import authz
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ # FIXME: https://gitlab.forge.orange-labs.fr/moon/moon_cache/issues/1
+ # req = benchmark(hug.test.get, authz,
+ # "/authz/89ba91c1-8dd5-4abf-bfde-7a66936c51a6/67b8008a-3f8d-4f8e-847e-b628f0f7ca0e/cdb3df22-0dc0"
+ # "-5a6e-a333-4b994827b068", headers=auth_headers)
+ # # req = hug.test.get(authz, "/authz/test/test/test", headers=auth_headers)
+ # assert req.status == hug.HTTP_204
+
+
+def test_get_authz_unauthorized():
+ from moon_engine.api.pipeline import authz
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ # FIXME: https://gitlab.forge.orange-labs.fr/moon/moon_cache/issues/1
+ # req = benchmark(hug.test.get, authz,
+ # "/authz/31fd15ad-1478-4a96-96fc-c887dddbfaf9/67b8008a-3f8d-4f8e-847e-b628f0f7ca0e/cdb3df22-0dc0"
+ # "-5a6e-a333-4b994827b068", headers=auth_headers)
+ # # req = hug.test.get(authz, "/authz/test/test/test", headers=auth_headers)
+ # assert req.status == hug.HTTP_403
diff --git a/moon_engine/tests/unit_python/api/pipeline/test_update.py b/moon_engine/tests/unit_python/api/pipeline/test_update.py
new file mode 100644
index 00000000..94d5a4e4
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/pipeline/test_update.py
@@ -0,0 +1,525 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import requests
+from uuid import uuid4
+
+
+def test_pipeline_update_policy_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ policies['policies'][key]['name'] = "new " + policies['policies'][key]['name']
+ req = hug.test.put(update, "/update/policy/{}".format(key),
+ policies['policies'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_update_policy_not_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ policies['policies'][key]['name'] = "new " + policies['policies'][key]['name']
+ req = hug.test.put(update, "/update/policy/{}".format(uuid4().hex),
+ policies['policies'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_delete_policy():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ req = hug.test.delete(update, "/update/policy/{}".format(key),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_update_pdp_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name']
+ req = hug.test.put(update, "/update/pdp/{}".format(key),
+ pdps['pdps'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_update_pdp_not_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name']
+ req = hug.test.put(update, "/update/pdp/{}".format(uuid4().hex),
+ pdps['pdps'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_delete_pdp_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ req = hug.test.delete(update, "/update/pdp/{}".format(key), headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_update_perimeter_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_key = next(iter(policies['policies']))
+
+ subject_response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ subjects = subject_response.json()
+ subj_key = next(iter(subjects['subjects']))
+ subjects['subjects'][subj_key]['name'] = "updated_" + subjects['subjects'][subj_key]['name']
+
+ subject_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(subj_key, policy_key,
+ "subject"),
+ subjects['subjects'][subj_key], headers=auth_headers)
+ assert subject_req.status == hug.HTTP_208
+
+ """ object category """
+ object_response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ objects = object_response.json()
+ obj_key = next(iter(objects['objects']))
+ objects['objects'][obj_key]['name'] = "updated_" + objects['objects'][obj_key]['name']
+
+ object_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(obj_key, policy_key, "object")
+ , objects['objects'][obj_key], headers=auth_headers)
+ assert object_req.status == hug.HTTP_208
+
+ """ action category """
+ action_response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ actions = action_response.json()
+ action_key = next(iter(actions['actions']))
+ actions['actions'][action_key]['name'] = "updated_" + actions['actions'][action_key]['name']
+
+ action_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(action_key, policy_key,
+ "action"),
+ actions['actions'][action_key], headers=auth_headers)
+ assert action_req.status == hug.HTTP_208
+
+
+def test_pipeline_update_perimeter_not_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_key = next(iter(policies['policies']))
+
+ subject_response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ subjects = subject_response.json()
+ subj_key = next(iter(subjects['subjects']))
+ subjects['subjects'][subj_key]['name'] = "updated_" + subjects['subjects'][subj_key]['name']
+
+ subject_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(uuid4().hex, policy_key,
+ "subject"),
+ subjects['subjects'][subj_key], headers=auth_headers)
+ assert subject_req.status == hug.HTTP_208
+
+ """ object category """
+ object_response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ objects = object_response.json()
+ obj_key = next(iter(objects['objects']))
+ objects['objects'][obj_key]['name'] = "updated_" + objects['objects'][obj_key]['name']
+
+ object_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(uuid4().hex, policy_key, "object")
+ , objects['objects'][obj_key], headers=auth_headers)
+ assert object_req.status == hug.HTTP_208
+
+ """ action category """
+ action_response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ actions = action_response.json()
+ action_key = next(iter(actions['actions']))
+ actions['actions'][action_key]['name'] = "updated_" + actions['actions'][action_key]['name']
+
+ action_req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(uuid4().hex, policy_key,
+ "action"),
+ actions['actions'][action_key], headers=auth_headers)
+ assert action_req.status == hug.HTTP_208
+
+
+def test_pipeline_delete_perimeter_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ policies_response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = policies_response.json()
+ policy_key = next(iter(policies['policies']))
+
+ subject_response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ subjects = subject_response.json()
+ subj_key = next(iter(subjects['subjects']))
+ subjects['subjects'][subj_key]['name'] = "updated_" + subjects['subjects'][subj_key]['name']
+
+ delete_subject_response = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(subj_key, policy_key,
+ "subject"),
+ headers=auth_headers)
+ assert delete_subject_response.status == hug.HTTP_202
+
+ object_response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ objects = object_response.json()
+ assert 'objects' in objects and len(objects['objects'])
+
+ for key in objects['objects']:
+ delete_object_response = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_key, 'object'),
+ headers=auth_headers)
+ assert delete_object_response.status == hug.HTTP_202
+
+ action_response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_key), headers=auth_headers)
+ actions = action_response.json()
+ assert 'actions' in actions and len(actions['actions'])
+
+ for key in actions['actions']:
+ delete_action_response = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_key, 'action'),
+ headers=auth_headers)
+ assert delete_action_response.status == hug.HTTP_202
+
+
+def test_pipeline_delete_assignments():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ sub_assig_response = requests.get("{}/policies/{}/subject_assignments".format(get_configuration("management").get("url"),
+ policy_id), headers=auth_headers)
+
+ subject_assignments = sub_assig_response.json()
+ assert 'subject_assignments' in subject_assignments and len(
+ subject_assignments['subject_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "subject"),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+ obj_assig_response = requests.get("{}/policies/{}/object_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ object_assignments = obj_assig_response.json()
+ assert 'object_assignments' in object_assignments and len(
+ object_assignments['object_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "object"),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+ action_assig_response = requests.get("{}/policies/{}/action_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ action_assignments = action_assig_response.json()
+ assert 'action_assignments' in action_assignments and len(
+ action_assignments['action_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "action"),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_delete_rule():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ policy_response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = policy_response.json()
+ policy_id = next(iter(policies['policies']))
+
+ rules_response = requests.get(
+ "{}/policies/{}/rules".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ rules = rules_response.json()
+
+ assert len(rules['rules']['rules'])
+ for i in range(0, len(rules['rules']['rules'])):
+ req = hug.test.delete(update, "/update/rule/{}/{}".format(policy_id, rules['rules'][
+ 'rules'][i]['id']), headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_update_model_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ model_response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = model_response.json()
+ key = next(iter(models['models']))
+ models['models'][key]['name'] = "new " + models['models'][key]['name']
+ req = hug.test.put(update, "/update/model/{}".format(key),
+ models['models'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_update_model_not_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ model_response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = model_response.json()
+ key = next(iter(models['models']))
+ models['models'][key]['name'] = "new " + models['models'][key]['name']
+ req = hug.test.put(update, "/update/model/{}".format(uuid4().hex),
+ models['models'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_delete_model_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ model_response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = model_response.json()
+ key = next(iter(models['models']))
+ req = hug.test.delete(update, "/update/model/{}".format(key), headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_delete_category():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ subject_cat_response = requests.get("{}/subject_categories".format(get_configuration(
+ "management").get("url")), headers=auth_headers)
+ subject_categories = subject_cat_response.json()
+ category_id = next(iter(subject_categories['subject_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'subject'),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+ action_cat_response = requests.get("{}/action_categories".format(get_configuration(
+ "management").get("url")),
+ headers=auth_headers)
+ action_categories = action_cat_response.json()
+ category_id = next(iter(action_categories['action_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'action'),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+ obj_cat_response = requests.get("{}/object_categories".format(get_configuration(
+ "management").get("url")),
+ headers=auth_headers)
+ object_categories = obj_cat_response.json()
+ category_id = next(iter(object_categories['object_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'object'),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_update_meta_rule_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ meta_rules_response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = meta_rules_response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name']
+ req = hug.test.put(update, "/update/meta_rule/{}".format(key),
+ meta_rules['meta_rules'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_update_meta_rule_not_existed():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ meta_rules_response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = meta_rules_response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name']
+ req = hug.test.put(update, "/update/meta_rule/{}".format(uuid4().hex),
+ meta_rules['meta_rules'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_pipeline_delete_meta_rule():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ meta_rules_response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = meta_rules_response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ req = hug.test.delete(update, "/update/meta_rule/{}".format(key), headers=auth_headers)
+ assert req.status == hug.HTTP_202
+
+
+def test_pipeline_delete_data():
+ from moon_engine.api.pipeline import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_engine.api.configuration import get_configuration
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ policies_response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = policies_response.json()
+ policy_id = next(iter(policies['policies']))
+
+ subject_data_response = requests.get(
+ "{}/policies/{}/subject_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ subject_data_id = next(iter(subject_data_response.json()['subject_data'][0]['data']))
+ delete_subject_data_response = hug.test.delete(update, "/update/data/{}/{}".format(subject_data_id, 'subject'),
+ headers=auth_headers)
+ assert delete_subject_data_response.status == hug.HTTP_202
+
+ object_data_response = requests.get(
+ "{}/policies/{}/object_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ object_data_id = next(iter(object_data_response.json()['object_data'][0]['data']))
+ delete_object_data_response = hug.test.delete(update, "/update/data/{}/{}".format(object_data_id, 'object'),
+ headers=auth_headers)
+ assert delete_object_data_response.status == hug.HTTP_202
+
+ action_data_response = requests.get(
+ "{}/policies/{}/action_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ action_data_id = next(iter(action_data_response.json()['action_data'][0]['data']))
+ delete_action_data_response = hug.test.delete(update, "/update/data/{}/{}".format(action_data_id, 'action'),
+ headers=auth_headers)
+ assert delete_action_data_response.status == hug.HTTP_202
+
+
+# def test_pipeline_delete_attributes():
+# from moon_engine.api.pipeline import update
+# from moon_utilities.auth_functions import get_api_key_for_user
+# from moon_engine.api.configuration import get_configuration
+#
+# auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+#
+# attributes_response = requests.get(
+# "{}/attributes".format(get_configuration("management").get("url")),
+# headers=auth_headers)
+#
+# attributes = attributes_response.json()
+# key = next(iter(attributes['attributes']))
+# req = hug.test.delete(update, "/update/attributes/{}".format(key), headers=auth_headers)
+# assert req.status == hug.HTTP_202
+
+
diff --git a/moon_engine/tests/unit_python/api/test_status.py b/moon_engine/tests/unit_python/api/test_status.py
new file mode 100644
index 00000000..e38f35e6
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/test_status.py
@@ -0,0 +1,21 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+
+
+def test_status(benchmark):
+ from moon_engine.api import status
+
+ # Fixme: Add tests on enforce function to have a benchmark on enforce rapidity
+ req = benchmark(hug.test.get, status, "/status")
+ assert isinstance(req.data, dict)
diff --git a/moon_engine/tests/unit_python/api/wrapper/__init__.py b/moon_engine/tests/unit_python/api/wrapper/__init__.py
new file mode 100644
index 00000000..582be686
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/__init__.py
@@ -0,0 +1,11 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py
new file mode 100644
index 00000000..770e9bb6
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py
@@ -0,0 +1,209 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import json
+import requests
+
+
+def get_subject_object_action():
+ from moon_engine.api.configuration import get_configuration
+ from moon_cache.cache import Cache
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ # Note: patching the cache for the test
+ CACHE.add_pipeline("b3d3e18a-bf33-40e8-b635-fd49e6634ccd", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ CACHE.add_pipeline("f8f49a77-9ceb-47b3-ac81-0f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ pdp = response.json()
+ pdp_id = next(iter(pdp['pdps']))
+ policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0]
+ project_id = pdp['pdps'][pdp_id].get("vim_project_id")
+
+ # response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ # headers=auth_headers)
+ # policies = response.json()
+ # policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subjects".format(
+ get_configuration("management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+
+ response = requests.get("{}/policies/{}/objects".format(
+ get_configuration("management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+
+ response = requests.get("{}/policies/{}/actions".format(
+ get_configuration("management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+ return subjects, objects, actions, project_id
+
+
+def test_post_authz():
+ from moon_engine.plugins import oslowrapper
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ pdp = response.json()
+ pdp_id = next(iter(pdp['pdps']))
+ policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0]
+ project_id = pdp['pdps'][pdp_id].get("vim_project_id")
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+
+ subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ objects_name = objects['objects'][next(iter(objects['objects']))]['name']
+ actions_name = actions['actions'][next(iter(actions['actions']))]['name']
+
+ # Note: patching the cache for the test
+ from moon_cache.cache import Cache
+ CACHE = Cache.getInstance()
+ CACHE.add_pipeline("b3d3e18abf3340e8b635fd49e6634ccd", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ CACHE.add_pipeline("f8f49a779ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+
+ _target = {
+ 'target': {
+ "name": objects_name,
+ },
+ "project_id": project_id,
+ "user_id": subjects_name
+ }
+ _credentials = {
+ "project_id": project_id,
+ "user_id": subjects_name
+ }
+
+ authz_data = {
+ 'rule': actions_name,
+ 'target': json.dumps(_target),
+ 'credentials': json.dumps(_credentials)}
+ req = hug.test.post(oslowrapper, "/authz/oslo", authz_data)
+ assert req.status == hug.HTTP_200
+
+
+def test_authz_true():
+ from moon_engine.plugins import oslowrapper
+
+ subjects, objects, actions, project_id = get_subject_object_action()
+
+ _target = {
+ 'target': {
+ "name": objects['objects'][next(iter(objects['objects']))]['name'],
+ },
+ "project_id": project_id,
+ "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ }
+ _credentials = {
+ "project_id": project_id,
+ "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ }
+ authz_data = {
+ 'rule': actions['actions'][next(iter(actions['actions']))]['name'],
+ 'target': json.dumps(_target),
+ 'credentials': json.dumps(_credentials)}
+
+ req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data)
+
+ assert req.status == hug.HTTP_200 and req.data is not None and req.data
+
+
+def test_authz_error_response_code():
+ from moon_engine.plugins import oslowrapper
+
+ subjects, objects, actions, project_id = get_subject_object_action()
+
+ _target = {
+ 'target': {
+ "name": objects['objects'][next(iter(objects['objects']))]['name'],
+ },
+ "project_id": "a64beb1cc224474fb4badd431f3e7106", # invalid project id
+ "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ }
+ authz_data = {
+ 'rule': actions['actions'][next(iter(actions['actions']))]['name'],
+ 'target': json.dumps(_target),
+ 'credentials': 'null'}
+
+ print(authz_data)
+ req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data)
+
+ assert req.status != hug.HTTP_200
+
+# def test_authz_error_no_interface_key(context):
+# import moon_wrapper.server
+# server = moon_wrapper.server.main()
+# client = server.app.test_client()
+# _target = {
+# 'target': {
+# "name": context.get('object_name'),
+# },
+# "project_id": context.get('project_with_no_interface_key'),
+# "user_id": context.get('subject_name')
+# }
+# authz_data = {
+# 'rule': context.get('action_name'),
+# 'target': json.dumps(_target),
+# 'credentials': 'null'}
+# req = client.post("/authz/oslo", data=json.dumps(authz_data))
+#
+# assert req.data == b"False"
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py
new file mode 100644
index 00000000..ed16883d
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py
@@ -0,0 +1,32 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+
+
+def test_get_pipelines():
+ from moon_engine.api.wrapper.api import pipeline
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = hug.test.get(pipeline, "/pipelines", headers=auth_headers )
+ assert req.status == hug.HTTP_200
+ assert isinstance(req.data, dict)
+ assert "pipelines" in req.data
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_update.py b/moon_engine/tests/unit_python/api/wrapper/test_update.py
new file mode 100644
index 00000000..75e7ef2d
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_update.py
@@ -0,0 +1,602 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import requests
+
+
+def test_wrapper_update_policy_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ policies['policies'][key]['name'] = "new " + policies['policies'][key]['name']
+ req = hug.test.put(update, "/update/policy/{}".format(key),
+ policies['policies'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_policy():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ req = hug.test.delete(update, "/update/policy/{}".format(key),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_policy_not_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ policies['policies'][key]['name'] = "new " + policies['policies'][key]['name']
+ req = hug.test.put(update, "/update/policy/{}".format("eac0ecd09ceb47b3ac810f01ef71b4e0"),
+ policies['policies'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_wrapper_update_pdp_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name']
+ req = hug.test.put(update, "/update/pdp/{}".format(key),
+ pdps['pdps'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_pdp():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ req = hug.test.delete(update, "/update/pdp/{}".format(key), headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_assignments():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subject_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ subject_assignments = response.json()
+ assert 'subject_assignments' in subject_assignments and len(
+ subject_assignments['subject_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "subject"),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/object_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ object_assignments = response.json()
+ assert 'object_assignments' in object_assignments and len(
+ object_assignments['object_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "object"),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/action_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ action_assignments = response.json()
+ assert 'action_assignments' in action_assignments and len(
+ action_assignments['action_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "action"),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_perimeter_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+ assert 'subjects' in subjects and len(subjects['subjects'])
+
+ for key in subjects['subjects']:
+ subjects['subjects'][key]['name'] = "new " + subjects['subjects'][key]['name']
+ req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'),
+ subjects['subjects'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+ assert 'objects' in objects and len(objects['objects'])
+
+ for key in objects['objects']:
+ objects['objects'][key]['name'] = "new " + objects['objects'][key]['name']
+ req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'),
+ objects['objects'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+ assert 'actions' in actions and len(actions['actions'])
+
+ for key in actions['actions']:
+ actions['actions'][key]['name'] = "new " + actions['actions'][key]['name']
+ req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'),
+ actions['actions'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_perimeter():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+ assert 'subjects' in subjects and len(subjects['subjects'])
+
+ for key in subjects['subjects']:
+ req = hug.test.delete(update,
+ "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+ assert 'objects' in objects and len(objects['objects'])
+
+ for key in objects['objects']:
+ req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+ assert 'actions' in actions and len(actions['actions'])
+
+ for key in actions['actions']:
+ req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'),
+ headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_rule():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get(
+ "{}/policies/{}/rules".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ rules = response.json()
+ assert 'rules' in rules and 'policy_id' in rules['rules']
+ assert rules['rules']['policy_id'] == policy_id
+ assert len(rules['rules']['rules'])
+ for i in range(0, len(rules['rules']['rules'])):
+ req = hug.test.delete(update, "/update/rule/{}/{}".format(policy_id,
+ rules['rules']['rules'][i]['id']), headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_model_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = response.json()
+ key = next(iter(models['models']))
+ models['models'][key]['name'] = "new " + models['models'][key]['name']
+ req = hug.test.put(update, "/update/model/{}".format(key),
+ models['models'][key], headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_model():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = response.json()
+ key = next(iter(models['models']))
+ req = hug.test.delete(update, "/update/model/{}".format(key), headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_category():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/subject_categories".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ subject_categories = response.json()
+ category_id = next(iter(subject_categories['subject_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'subject'),
+ headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/action_categories".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ action_categories = response.json()
+ category_id = next(iter(action_categories['action_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'action'),
+ headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/object_categories".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ object_categories = response.json()
+ category_id = next(iter(object_categories['object_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'object'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_meta_rule_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name']
+ req = hug.test.put(update, "/update/meta_rule/{}".format(key),
+ meta_rules['meta_rules'][key], headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_meta_rule():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ req = hug.test.delete(update, "/update/meta_rule/{}".format(key), headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_data():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get(
+ "{}/policies/{}/subject_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ subject_data_id = next(iter(response.json()['subject_data'][0]['data']))
+ req = hug.test.delete(update, "/update/data/{}/{}".format(subject_data_id,'subject'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get(
+ "{}/policies/{}/object_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ object_data_id = next(iter(response.json()['object_data'][0]['data']))
+ req = hug.test.delete(update, "/update/data/{}/{}".format(object_data_id, 'object'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get(
+ "{}/policies/{}/action_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ action_data_id = next(iter(response.json()['action_data'][0]['data']))
+ req = hug.test.delete(update, "/update/data/{}/{}".format(action_data_id, 'action'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py
new file mode 100644
index 00000000..c4df249f
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py
@@ -0,0 +1,57 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import requests
+
+
+def test_wrapper_get_authz():
+ from moon_engine.api.wrapper.api import authz
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdp = response.json()
+ pdp_id = next(iter(pdp['pdps']))
+ policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0]
+ project_id = pdp['pdps'][pdp_id].get("vim_project_id")
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+
+ subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ objects_name = objects['objects'][next(iter(objects['objects']))]['name']
+ actions_name = actions['actions'][next(iter(actions['actions']))]['name']
+
+ req = hug.test.get(authz, "/authz/{}/{}/{}/{}".format(
+ project_id, subjects_name, objects_name, actions_name))
+ assert req.status == hug.HTTP_200