aboutsummaryrefslogtreecommitdiffstats
path: root/moon_engine/tests/unit_python/api/wrapper
diff options
context:
space:
mode:
Diffstat (limited to 'moon_engine/tests/unit_python/api/wrapper')
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/__init__.py11
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py209
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_pipeline.py32
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_update.py602
-rw-r--r--moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py57
5 files changed, 911 insertions, 0 deletions
diff --git a/moon_engine/tests/unit_python/api/wrapper/__init__.py b/moon_engine/tests/unit_python/api/wrapper/__init__.py
new file mode 100644
index 00000000..582be686
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/__init__.py
@@ -0,0 +1,11 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py
new file mode 100644
index 00000000..770e9bb6
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_oslo_wrapper.py
@@ -0,0 +1,209 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import json
+import requests
+
+
+def get_subject_object_action():
+ from moon_engine.api.configuration import get_configuration
+ from moon_cache.cache import Cache
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ # Note: patching the cache for the test
+ CACHE.add_pipeline("b3d3e18a-bf33-40e8-b635-fd49e6634ccd", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ CACHE.add_pipeline("f8f49a77-9ceb-47b3-ac81-0f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ pdp = response.json()
+ pdp_id = next(iter(pdp['pdps']))
+ policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0]
+ project_id = pdp['pdps'][pdp_id].get("vim_project_id")
+
+ # response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ # headers=auth_headers)
+ # policies = response.json()
+ # policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subjects".format(
+ get_configuration("management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+
+ response = requests.get("{}/policies/{}/objects".format(
+ get_configuration("management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+
+ response = requests.get("{}/policies/{}/actions".format(
+ get_configuration("management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+ return subjects, objects, actions, project_id
+
+
+def test_post_authz():
+ from moon_engine.plugins import oslowrapper
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ pdp = response.json()
+ pdp_id = next(iter(pdp['pdps']))
+ policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0]
+ project_id = pdp['pdps'][pdp_id].get("vim_project_id")
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+
+ subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ objects_name = objects['objects'][next(iter(objects['objects']))]['name']
+ actions_name = actions['actions'][next(iter(actions['actions']))]['name']
+
+ # Note: patching the cache for the test
+ from moon_cache.cache import Cache
+ CACHE = Cache.getInstance()
+ CACHE.add_pipeline("b3d3e18abf3340e8b635fd49e6634ccd", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ CACHE.add_pipeline("f8f49a779ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+
+ _target = {
+ 'target': {
+ "name": objects_name,
+ },
+ "project_id": project_id,
+ "user_id": subjects_name
+ }
+ _credentials = {
+ "project_id": project_id,
+ "user_id": subjects_name
+ }
+
+ authz_data = {
+ 'rule': actions_name,
+ 'target': json.dumps(_target),
+ 'credentials': json.dumps(_credentials)}
+ req = hug.test.post(oslowrapper, "/authz/oslo", authz_data)
+ assert req.status == hug.HTTP_200
+
+
+def test_authz_true():
+ from moon_engine.plugins import oslowrapper
+
+ subjects, objects, actions, project_id = get_subject_object_action()
+
+ _target = {
+ 'target': {
+ "name": objects['objects'][next(iter(objects['objects']))]['name'],
+ },
+ "project_id": project_id,
+ "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ }
+ _credentials = {
+ "project_id": project_id,
+ "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ }
+ authz_data = {
+ 'rule': actions['actions'][next(iter(actions['actions']))]['name'],
+ 'target': json.dumps(_target),
+ 'credentials': json.dumps(_credentials)}
+
+ req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data)
+
+ assert req.status == hug.HTTP_200 and req.data is not None and req.data
+
+
+def test_authz_error_response_code():
+ from moon_engine.plugins import oslowrapper
+
+ subjects, objects, actions, project_id = get_subject_object_action()
+
+ _target = {
+ 'target': {
+ "name": objects['objects'][next(iter(objects['objects']))]['name'],
+ },
+ "project_id": "a64beb1cc224474fb4badd431f3e7106", # invalid project id
+ "user_id": subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ }
+ authz_data = {
+ 'rule': actions['actions'][next(iter(actions['actions']))]['name'],
+ 'target': json.dumps(_target),
+ 'credentials': 'null'}
+
+ print(authz_data)
+ req = hug.test.post(oslowrapper, "/authz/oslo", body=authz_data)
+
+ assert req.status != hug.HTTP_200
+
+# def test_authz_error_no_interface_key(context):
+# import moon_wrapper.server
+# server = moon_wrapper.server.main()
+# client = server.app.test_client()
+# _target = {
+# 'target': {
+# "name": context.get('object_name'),
+# },
+# "project_id": context.get('project_with_no_interface_key'),
+# "user_id": context.get('subject_name')
+# }
+# authz_data = {
+# 'rule': context.get('action_name'),
+# 'target': json.dumps(_target),
+# 'credentials': 'null'}
+# req = client.post("/authz/oslo", data=json.dumps(authz_data))
+#
+# assert req.data == b"False"
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py
new file mode 100644
index 00000000..ed16883d
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_pipeline.py
@@ -0,0 +1,32 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+
+
+def test_get_pipelines():
+ from moon_engine.api.wrapper.api import pipeline
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = hug.test.get(pipeline, "/pipelines", headers=auth_headers )
+ assert req.status == hug.HTTP_200
+ assert isinstance(req.data, dict)
+ assert "pipelines" in req.data
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_update.py b/moon_engine/tests/unit_python/api/wrapper/test_update.py
new file mode 100644
index 00000000..75e7ef2d
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_update.py
@@ -0,0 +1,602 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import requests
+
+
+def test_wrapper_update_policy_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ policies['policies'][key]['name'] = "new " + policies['policies'][key]['name']
+ req = hug.test.put(update, "/update/policy/{}".format(key),
+ policies['policies'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_policy():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ req = hug.test.delete(update, "/update/policy/{}".format(key),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_policy_not_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ key = next(iter(policies['policies']))
+ policies['policies'][key]['name'] = "new " + policies['policies'][key]['name']
+ req = hug.test.put(update, "/update/policy/{}".format("eac0ecd09ceb47b3ac810f01ef71b4e0"),
+ policies['policies'][key], headers=auth_headers)
+ assert req.status == hug.HTTP_208
+
+
+def test_wrapper_update_pdp_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ pdps['pdps'][key]['name'] = "new " + pdps['pdps'][key]['name']
+ req = hug.test.put(update, "/update/pdp/{}".format(key),
+ pdps['pdps'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_pdp():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdps = response.json()
+ key = next(iter(pdps['pdps']))
+ req = hug.test.delete(update, "/update/pdp/{}".format(key), headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_assignments():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subject_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ subject_assignments = response.json()
+ assert 'subject_assignments' in subject_assignments and len(
+ subject_assignments['subject_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "subject"),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/object_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ object_assignments = response.json()
+ assert 'object_assignments' in object_assignments and len(
+ object_assignments['object_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "object"),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/action_assignments".format(get_configuration(
+ "management").get("url"), policy_id),
+ headers=auth_headers)
+ action_assignments = response.json()
+ assert 'action_assignments' in action_assignments and len(
+ action_assignments['action_assignments'])
+ req = hug.test.delete(update,
+ "/update/assignment/{}/{}/".format(policy_id, "action"),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_perimeter_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+ assert 'subjects' in subjects and len(subjects['subjects'])
+
+ for key in subjects['subjects']:
+ subjects['subjects'][key]['name'] = "new " + subjects['subjects'][key]['name']
+ req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'),
+ subjects['subjects'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+ assert 'objects' in objects and len(objects['objects'])
+
+ for key in objects['objects']:
+ objects['objects'][key]['name'] = "new " + objects['objects'][key]['name']
+ req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'),
+ objects['objects'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+ assert 'actions' in actions and len(actions['actions'])
+
+ for key in actions['actions']:
+ actions['actions'][key]['name'] = "new " + actions['actions'][key]['name']
+ req = hug.test.put(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'),
+ actions['actions'][key], headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_perimeter():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+ assert 'subjects' in subjects and len(subjects['subjects'])
+
+ for key in subjects['subjects']:
+ req = hug.test.delete(update,
+ "/update/perimeter/{}/{}/{}".format(key, policy_id, 'subject'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+ assert 'objects' in objects and len(objects['objects'])
+
+ for key in objects['objects']:
+ req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'object'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+ assert 'actions' in actions and len(actions['actions'])
+
+ for key in actions['actions']:
+ req = hug.test.delete(update, "/update/perimeter/{}/{}/{}".format(key, policy_id, 'action'),
+ headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_rule():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get(
+ "{}/policies/{}/rules".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ rules = response.json()
+ assert 'rules' in rules and 'policy_id' in rules['rules']
+ assert rules['rules']['policy_id'] == policy_id
+ assert len(rules['rules']['rules'])
+ for i in range(0, len(rules['rules']['rules'])):
+ req = hug.test.delete(update, "/update/rule/{}/{}".format(policy_id,
+ rules['rules']['rules'][i]['id']), headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_model_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = response.json()
+ key = next(iter(models['models']))
+ models['models'][key]['name'] = "new " + models['models'][key]['name']
+ req = hug.test.put(update, "/update/model/{}".format(key),
+ models['models'][key], headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_model():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/models".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ models = response.json()
+ key = next(iter(models['models']))
+ req = hug.test.delete(update, "/update/model/{}".format(key), headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_category():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/subject_categories".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ subject_categories = response.json()
+ category_id = next(iter(subject_categories['subject_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'subject'),
+ headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/action_categories".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ action_categories = response.json()
+ category_id = next(iter(action_categories['action_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'action'),
+ headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get("{}/object_categories".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+ object_categories = response.json()
+ category_id = next(iter(object_categories['object_categories']))
+ req = hug.test.delete(update, "/update/meta_data/{}/{}".format(category_id, 'object'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_update_meta_rule_existed():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ meta_rules['meta_rules'][key]['name'] = "new " + meta_rules['meta_rules'][key]['name']
+ req = hug.test.put(update, "/update/meta_rule/{}".format(key),
+ meta_rules['meta_rules'][key], headers=auth_headers)
+
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_meta_rule():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/meta_rules".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ meta_rules = response.json()
+ key = next(iter(meta_rules['meta_rules']))
+ req = hug.test.delete(update, "/update/meta_rule/{}".format(key), headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+
+def test_wrapper_delete_data():
+ from moon_engine.api.wrapper.api import update
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"), api_key=get_api_key_for_user(
+ "admin"))
+
+ CACHE.add_pipeline("eac0ecd09ceb47b3ac810f01ef71b4e0", {
+ "name": "test",
+ "description": "test",
+ "host": "127.0.0.1",
+ "port": 20000,
+ })
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/policies".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ policies = response.json()
+ policy_id = next(iter(policies['policies']))
+
+ response = requests.get(
+ "{}/policies/{}/subject_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ subject_data_id = next(iter(response.json()['subject_data'][0]['data']))
+ req = hug.test.delete(update, "/update/data/{}/{}".format(subject_data_id,'subject'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get(
+ "{}/policies/{}/object_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ object_data_id = next(iter(response.json()['object_data'][0]['data']))
+ req = hug.test.delete(update, "/update/data/{}/{}".format(object_data_id, 'object'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
+
+ response = requests.get(
+ "{}/policies/{}/action_data".format(get_configuration("management").get("url"), policy_id),
+ headers=auth_headers)
+ action_data_id = next(iter(response.json()['action_data'][0]['data']))
+ req = hug.test.delete(update, "/update/data/{}/{}".format(action_data_id, 'action'),
+ headers=auth_headers)
+ assert (req.status == hug.HTTP_202 or req.status == hug.HTTP_200)
diff --git a/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py
new file mode 100644
index 00000000..c4df249f
--- /dev/null
+++ b/moon_engine/tests/unit_python/api/wrapper/test_wrapper_authz.py
@@ -0,0 +1,57 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import requests
+
+
+def test_wrapper_get_authz():
+ from moon_engine.api.wrapper.api import authz
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_cache.cache import Cache
+ from moon_engine.api.configuration import get_configuration
+ CACHE = Cache.getInstance(manager_url=get_configuration("management").get("url"),
+ incremental=get_configuration("incremental_updates"),
+ manager_api_key=get_configuration("api_token"))
+
+ CACHE.set_current_server(url=get_configuration("management").get("url"),
+ api_key=get_api_key_for_user("admin"))
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ response = requests.get("{}/pdp".format(get_configuration("management").get("url")),
+ headers=auth_headers)
+
+ pdp = response.json()
+ pdp_id = next(iter(pdp['pdps']))
+ policy_id = pdp['pdps'][pdp_id].get("security_pipeline")[0]
+ project_id = pdp['pdps'][pdp_id].get("vim_project_id")
+
+ response = requests.get("{}/policies/{}/subjects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ subjects = response.json()
+
+ response = requests.get("{}/policies/{}/objects".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ objects = response.json()
+
+ response = requests.get("{}/policies/{}/actions".format(get_configuration(
+ "management").get("url"), policy_id), headers=auth_headers)
+ actions = response.json()
+
+ subjects_name = subjects['subjects'][next(iter(subjects['subjects']))]['name']
+ objects_name = objects['objects'][next(iter(objects['objects']))]['name']
+ actions_name = actions['actions'][next(iter(actions['actions']))]['name']
+
+ req = hug.test.get(authz, "/authz/{}/{}/{}/{}".format(
+ project_id, subjects_name, objects_name, actions_name))
+ assert req.status == hug.HTTP_200