aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests/unit_python/api
diff options
context:
space:
mode:
Diffstat (limited to 'moon_manager/tests/unit_python/api')
-rw-r--r--moon_manager/tests/unit_python/api/__init__.py12
-rw-r--r--moon_manager/tests/unit_python/api/meta_data_test.py196
-rw-r--r--moon_manager/tests/unit_python/api/meta_rules_test.py69
-rw-r--r--moon_manager/tests/unit_python/api/test_assignement.py469
-rw-r--r--moon_manager/tests/unit_python/api/test_assignemnt.py174
-rw-r--r--moon_manager/tests/unit_python/api/test_auth.py71
-rw-r--r--moon_manager/tests/unit_python/api/test_data.py303
-rw-r--r--moon_manager/tests/unit_python/api/test_json_export.py321
-rw-r--r--moon_manager/tests/unit_python/api/test_json_import.py832
-rw-r--r--moon_manager/tests/unit_python/api/test_keystone.py63
-rw-r--r--moon_manager/tests/unit_python/api/test_meta_data.py370
-rw-r--r--moon_manager/tests/unit_python/api/test_meta_rules.py687
-rw-r--r--moon_manager/tests/unit_python/api/test_models.py468
-rw-r--r--moon_manager/tests/unit_python/api/test_nova.py58
-rw-r--r--moon_manager/tests/unit_python/api/test_pdp.py499
-rw-r--r--moon_manager/tests/unit_python/api/test_perimeter.py1357
-rw-r--r--moon_manager/tests/unit_python/api/test_perimeter_examples.py55
-rw-r--r--moon_manager/tests/unit_python/api/test_policies.py443
-rw-r--r--moon_manager/tests/unit_python/api/test_rules.py343
-rw-r--r--moon_manager/tests/unit_python/api/test_slaves.py90
-rw-r--r--moon_manager/tests/unit_python/api/utilities.py32
21 files changed, 6123 insertions, 789 deletions
diff --git a/moon_manager/tests/unit_python/api/__init__.py b/moon_manager/tests/unit_python/api/__init__.py
index e69de29b..1856aa2c 100644
--- a/moon_manager/tests/unit_python/api/__init__.py
+++ b/moon_manager/tests/unit_python/api/__init__.py
@@ -0,0 +1,12 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
diff --git a/moon_manager/tests/unit_python/api/meta_data_test.py b/moon_manager/tests/unit_python/api/meta_data_test.py
deleted file mode 100644
index 8fb39ae1..00000000
--- a/moon_manager/tests/unit_python/api/meta_data_test.py
+++ /dev/null
@@ -1,196 +0,0 @@
-import json
-import api.utilities as utilities
-
-#subject_categories_test
-
-
-def get_subject_categories(client):
- req = client.get("/subject_categories")
- subject_categories = utilities.get_json(req.data)
- return req, subject_categories
-
-
-def add_subject_categories(client, name):
- data = {
- "name": name,
- "description": "description of {}".format(name)
- }
- req = client.post("/subject_categories", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- subject_categories = utilities.get_json(req.data)
- return req, subject_categories
-
-
-def delete_subject_categories(client, name):
- request, subject_categories = get_subject_categories(client)
- for key, value in subject_categories['subject_categories'].items():
- if value['name'] == name:
- req = client.delete("/subject_categories/{}".format(key))
- break
- return req
-
-
-def delete_subject_categories_without_id(client):
- req = client.delete("/subject_categories/{}".format(""))
- return req
-
-
-def test_get_subject_categories():
- client = utilities.register_client()
- req, subject_categories = get_subject_categories(client)
- assert req.status_code == 200
- assert isinstance(subject_categories, dict)
- assert "subject_categories" in subject_categories
-
-
-def test_add_subject_categories():
- client = utilities.register_client()
- req, subject_categories = add_subject_categories(client, "testuser")
- assert req.status_code == 200
- assert isinstance(subject_categories, dict)
- value = list(subject_categories["subject_categories"].values())[0]
- assert "subject_categories" in subject_categories
- assert value['name'] == "testuser"
- assert value['description'] == "description of {}".format("testuser")
-
-
-def test_delete_subject_categories():
- client = utilities.register_client()
- req = delete_subject_categories(client, "testuser")
- assert req.status_code == 200
-
-
-def test_delete_subject_categories_without_id():
- client = utilities.register_client()
- req = delete_subject_categories_without_id(client)
- assert req.status_code == 500
-
-
-#---------------------------------------------------------------------------
-#object_categories_test
-
-def get_object_categories(client):
- req = client.get("/object_categories")
- object_categories = utilities.get_json(req.data)
- return req, object_categories
-
-
-def add_object_categories(client, name):
- data = {
- "name": name,
- "description": "description of {}".format(name)
- }
- req = client.post("/object_categories", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- object_categories = utilities.get_json(req.data)
- return req, object_categories
-
-
-def delete_object_categories(client, name):
- request, object_categories = get_object_categories(client)
- for key, value in object_categories['object_categories'].items():
- if value['name'] == name:
- req = client.delete("/object_categories/{}".format(key))
- break
- return req
-
-
-def delete_object_categories_without_id(client):
- req = client.delete("/object_categories/{}".format(""))
- return req
-
-
-def test_get_object_categories():
- client = utilities.register_client()
- req, object_categories = get_object_categories(client)
- assert req.status_code == 200
- assert isinstance(object_categories, dict)
- assert "object_categories" in object_categories
-
-
-def test_add_object_categories():
- client = utilities.register_client()
- req, object_categories = add_object_categories(client, "testuser")
- assert req.status_code == 200
- assert isinstance(object_categories, dict)
- value = list(object_categories["object_categories"].values())[0]
- assert "object_categories" in object_categories
- assert value['name'] == "testuser"
- assert value['description'] == "description of {}".format("testuser")
-
-
-def test_delete_object_categories():
- client = utilities.register_client()
- req = delete_object_categories(client, "testuser")
- assert req.status_code == 200
-
-
-def test_delete_object_categories_without_id():
- client = utilities.register_client()
- req = delete_object_categories_without_id(client)
- assert req.status_code == 500
-
-
-#---------------------------------------------------------------------------
-#action_categories_test
-
-def get_action_categories(client):
- req = client.get("/action_categories")
- action_categories = utilities.get_json(req.data)
- return req, action_categories
-
-
-def add_action_categories(client, name):
- data = {
- "name": name,
- "description": "description of {}".format(name)
- }
- req = client.post("/action_categories", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- action_categories = utilities.get_json(req.data)
- return req, action_categories
-
-
-def delete_action_categories(client, name):
- request, action_categories = get_action_categories(client)
- for key, value in action_categories['action_categories'].items():
- if value['name'] == name:
- req = client.delete("/action_categories/{}".format(key))
- break
- return req
-
-
-def delete_action_categories_without_id(client):
- req = client.delete("/action_categories/{}".format(""))
- return req
-
-
-def test_get_action_categories():
- client = utilities.register_client()
- req, action_categories = get_action_categories(client)
- assert req.status_code == 200
- assert isinstance(action_categories, dict)
- assert "action_categories" in action_categories
-
-
-def test_add_action_categories():
- client = utilities.register_client()
- req, action_categories = add_action_categories(client, "testuser")
- assert req.status_code == 200
- assert isinstance(action_categories, dict)
- value = list(action_categories["action_categories"].values())[0]
- assert "action_categories" in action_categories
- assert value['name'] == "testuser"
- assert value['description'] == "description of {}".format("testuser")
-
-
-def test_delete_action_categories():
- client = utilities.register_client()
- req = delete_action_categories(client, "testuser")
- assert req.status_code == 200
-
-
-def test_delete_action_categories_without_id():
- client = utilities.register_client()
- req = delete_action_categories_without_id(client)
- assert req.status_code == 500 \ No newline at end of file
diff --git a/moon_manager/tests/unit_python/api/meta_rules_test.py b/moon_manager/tests/unit_python/api/meta_rules_test.py
deleted file mode 100644
index b5b1ecf8..00000000
--- a/moon_manager/tests/unit_python/api/meta_rules_test.py
+++ /dev/null
@@ -1,69 +0,0 @@
-import json
-import api.utilities as utilities
-
-
-def get_meta_rules(client):
- req = client.get("/meta_rules")
- meta_rules = utilities.get_json(req.data)
- return req, meta_rules
-
-
-def add_meta_rules(client, name):
- data = {
- "name": name,
- "subject_categories": ["subject_category_id1",
- "subject_category_id2"],
- "object_categories": ["object_category_id1"],
- "action_categories": ["action_category_id1"]
- }
- req = client.post("/meta_rules", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- meta_rules = utilities.get_json(req.data)
- return req, meta_rules
-
-
-def delete_meta_rules(client, name):
- request, meta_rules = get_meta_rules(client)
- for key, value in meta_rules['meta_rules'].items():
- if value['name'] == name:
- req = client.delete("/meta_rules/{}".format(key))
- break
- return req
-
-
-def delete_meta_rules_without_id(client):
- req = client.delete("/meta_rules/{}".format(""))
- return req
-
-
-def test_get_meta_rules():
- client = utilities.register_client()
- req, meta_rules = get_meta_rules(client)
- assert req.status_code == 200
- assert isinstance(meta_rules, dict)
- assert "meta_rules" in meta_rules
-
-
-def test_add_meta_rules():
- client = utilities.register_client()
- req, meta_rules = add_meta_rules(client, "testuser")
- assert req.status_code == 200
- assert isinstance(meta_rules, dict)
- value = list(meta_rules["meta_rules"].values())[0]
- assert "meta_rules" in meta_rules
- assert value['name'] == "testuser"
- assert value["subject_categories"][0] == "subject_category_id1"
- assert value["object_categories"][0] == "object_category_id1"
- assert value["action_categories"][0] == "action_category_id1"
-
-
-def test_delete_meta_rules():
- client = utilities.register_client()
- req = delete_meta_rules(client, "testuser")
- assert req.status_code == 200
-
-
-def test_delete_meta_rules_without_id():
- client = utilities.register_client()
- req = delete_meta_rules_without_id(client)
- assert req.status_code == 500
diff --git a/moon_manager/tests/unit_python/api/test_assignement.py b/moon_manager/tests/unit_python/api/test_assignement.py
new file mode 100644
index 00000000..3a127477
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_assignement.py
@@ -0,0 +1,469 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+from uuid import uuid4
+import pytest
+import api.utilities as utilities
+from helpers import data_builder as builder
+from moon_utilities import exceptions
+
+
+def delete_assignment_based_on_parameters(type, policy_id, pre_id=None, cat_id=None, data_id=None):
+ if type in ["subject_assignments", "object_assignments", "action_assignments"] and policy_id:
+ url = "/policies/" + policy_id + "/" + type
+ if pre_id:
+ url += "/" + pre_id
+ if cat_id:
+ url += "/" + cat_id
+ if data_id:
+ url += "/" + data_id
+ else:
+ return ""
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(assignments, url, headers=auth_headers)
+ return req
+
+
+# subject_categories_test
+
+
+def get_subject_assignment(policy_id):
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(assignments, "/policies/{}/subject_assignments".format(policy_id), headers=auth_headers)
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment():
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ subject_id = builder.create_subject(policy_id)
+ data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ data = {
+ "id": subject_id,
+ "category_id": subject_category_id,
+ "data_id": data_id
+ }
+ req = hug.test.post(assignments, "/policies/{}/subject_assignments/".format(policy_id),
+ body=data, headers=auth_headers)
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment_without_cat_id():
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ data = {
+ "id": "subject_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = hug.test.post(assignments, "/policies/{}/subject_assignments".format("1111"), body=data,
+ headers=auth_headers)
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def delete_subject_assignment(policy_id, sub_id, cat_id, data_id):
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(assignments, "/policies/{}/subject_assignments/{}/{}/{}".format(
+ policy_id, sub_id, cat_id, data_id), headers=auth_headers)
+ return req
+
+
+def test_add_subject_assignment():
+ req, subject_assignment = add_subject_assignment()
+ assert req.status == hug.HTTP_200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+# def test_add_subject_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, subject_assignment = add_subject_assignment_without_cat_id(client)
+# assert req.status == hug.HTTP_400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_get_subject_assignment():
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(policy_id)
+ assert req.status == hug.HTTP_200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+def test_delete_subject_assignment():
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(policy_id)
+ value = subject_assignment["subject_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_subject_assignment(
+ policy_id,
+ value[_id]['subject_id'],
+ value[_id]['category_id'],
+ value[_id]['assignments'][0])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_subject_assignment_using_policy():
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(policy_id)
+ value = subject_assignment["subject_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "subject_assignments",
+ policy_id)
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_subject_assignment_using_policy_perimeter_id():
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(policy_id)
+ value = subject_assignment["subject_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "subject_assignments",
+ policy_id,
+ value[_id]['subject_id'])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_subject_assignment_using_policy_perimeter_id_category_id():
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(policy_id)
+ value = subject_assignment["subject_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "subject_assignments",
+ policy_id,
+ value[_id]['subject_id'],
+ value[_id]['category_id'])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_subject_assignment_without_policy_id():
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ success_req = delete_subject_assignment("", "id1", "111", "data_id1")
+
+ assert '400: Policy Unknown' == str(exception_info.value)
+
+ # assert success_req.status == hug.HTTP_400
+ # assert success_req.data["message"] == "400: Policy Unknown"
+
+
+# ---------------------------------------------------------------------------
+# object_categories_test
+
+
+def get_object_assignment(policy_id):
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(assignments, "/policies/{}/object_assignments".format(policy_id), headers=auth_headers)
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment():
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ object_id = builder.create_object(policy_id)
+ data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id)
+
+ data = {
+ "id": object_id,
+ "category_id": object_category_id,
+ "data_id": data_id
+ }
+
+ req = hug.test.post(assignments, "/policies/{}/object_assignments".format(policy_id),
+ body=data, headers=auth_headers)
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment_without_cat_id():
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ data = {
+ "id": "object_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = hug.test.post(assignments, "/policies/{}/object_assignments".format("1111"),
+ body=data, headers=auth_headers)
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def delete_object_assignment(policy_id, obj_id, cat_id, data_id):
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(assignments, "/policies/{}/object_assignments/{}/{}/{}".format(
+ policy_id, obj_id, cat_id, data_id), headers=auth_headers)
+ return req
+
+
+def test_get_object_assignment():
+ policy_id = builder.get_policy_id_with_object_assignment()
+
+ req, object_assignment = get_object_assignment(policy_id)
+ assert req.status == hug.HTTP_200
+ assert isinstance(object_assignment, dict)
+ assert "object_assignments" in object_assignment
+
+
+def test_add_object_assignment():
+ req, object_assignment = add_object_assignment()
+ assert req.status == hug.HTTP_200
+ assert "object_assignments" in object_assignment
+
+
+# def test_add_object_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, object_assignment = add_object_assignment_without_cat_id(client)
+# assert req.status == hug.HTTP_400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_object_assignment():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(policy_id)
+ value = object_assignment["object_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_object_assignment(policy_id,
+ value[_id]['object_id'],
+ value[_id]['category_id'],
+ value[_id]['assignments'][0])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_object_assignment_using_policy():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(policy_id)
+ value = object_assignment["object_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "object_assignments",
+ policy_id)
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_object_assignment_using_policy_perimeter_id():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(policy_id)
+ value = object_assignment["object_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "object_assignments",
+ policy_id,
+ value[_id]['object_id'])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_object_assignment_using_policy_perimeter_id_category_id():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(policy_id)
+ value = object_assignment["object_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "object_assignments",
+ policy_id,
+ value[_id]['object_id'],
+ value[_id]['category_id'])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_object_assignment_without_policy_id():
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ success_req = delete_object_assignment("", "id1", "111", "data_id1")
+ # assert success_req.status == hug.HTTP_400
+ # assert success_req.data["message"] == "400: Policy Unknown"
+ assert '400: Policy Unknown' == str(exception_info.value)
+
+
+# ---------------------------------------------------------------------------
+# action_categories_test
+
+
+def get_action_assignment(policy_id):
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(assignments, "/policies/{}/action_assignments".format(policy_id), headers=auth_headers)
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment():
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ action_id = builder.create_action(policy_id)
+ data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id)
+
+ data = {
+ "id": action_id,
+ "category_id": action_category_id,
+ "data_id": data_id
+ }
+ req = hug.test.post(assignments, "/policies/{}/action_assignments".format(policy_id),
+ body=data,
+ headers=auth_headers)
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment_without_cat_id():
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ data = {
+ "id": "action_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = hug.test.post(assignments, "/policies/{}/action_assignments".format("1111"),
+ body=data, headers=auth_headers)
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def delete_action_assignment(policy_id, action_id, cat_id, data_id):
+ from moon_manager.api import assignments
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(assignments, "/policies/{}/action_assignments/{}/{}/{}".format(
+ policy_id, action_id, cat_id, data_id), headers=auth_headers)
+ return req
+
+
+def test_get_action_assignment():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(policy_id)
+ assert req.status == hug.HTTP_200
+ assert isinstance(action_assignment, dict)
+ assert "action_assignments" in action_assignment
+
+
+def test_add_action_assignment():
+ req, action_assignment = add_action_assignment()
+ assert req.status == hug.HTTP_200
+ assert "action_assignments" in action_assignment
+
+
+# def test_add_action_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, action_assignment = add_action_assignment_without_cat_id(client)
+# assert req.status == hug.HTTP_400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_action_assignment():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_action_assignment(policy_id,
+ value[id]['action_id'],
+ value[id]['category_id'],
+ value[id]['assignments'][0])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_action_assignment_policy():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "action_assignments",
+ policy_id)
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_action_assignment_policy_perimeter_id():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "action_assignments",
+ policy_id,
+ value[id]['action_id'])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_action_assignment_policy_perimeter_id_category_id():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_assignment_based_on_parameters(
+ "action_assignments",
+ policy_id,
+ value[id]['action_id'],
+ value[id]['category_id'])
+ assert success_req.status == hug.HTTP_200
+
+
+def test_delete_action_assignment_without_policy_id():
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ success_req = delete_action_assignment("", "id1", "111", "data_id1")
+ # assert success_req.status == hug.HTTP_400
+ # assert success_req.data["message"] == "400: Policy Unknown"
+ assert '400: Policy Unknown' == str(exception_info.value)
+
+# ---------------------------------------------------------------------------
diff --git a/moon_manager/tests/unit_python/api/test_assignemnt.py b/moon_manager/tests/unit_python/api/test_assignemnt.py
deleted file mode 100644
index 08688e04..00000000
--- a/moon_manager/tests/unit_python/api/test_assignemnt.py
+++ /dev/null
@@ -1,174 +0,0 @@
-import api.utilities as utilities
-import json
-
-
-# subject_categories_test
-
-
-def get_subject_assignment(client, policy_id):
- req = client.get("/policies/{}/subject_assignments".format(policy_id))
- subject_assignment = utilities.get_json(req.data)
- return req, subject_assignment
-
-
-def add_subject_assignment(client, policy_id, category_id):
- data = {
- "id": "id1",
- "category_id": category_id,
- "data_id": "data_id1"
- }
- req = client.post("/policies/{}/subject_assignments/{}".format(policy_id, category_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- subject_assignment = utilities.get_json(req.data)
- return req, subject_assignment
-
-
-def delete_subject_assignment(client, policy_id):
- req = client.delete("/policies/{}/subject_assignments".format(policy_id))
- return req
-
-
-def test_get_subject_assignment():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, subject_assignment = get_subject_assignment(client, policy_id)
- assert req.status_code == 200
- assert isinstance(subject_assignment, dict)
- assert "subject_assignments" in subject_assignment
-
-
-def test_add_subject_assignment():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, subject_assignment = add_subject_assignment(client, policy_id, "111")
- assert req.status_code == 200
- assert isinstance(subject_assignment, dict)
- value = subject_assignment["subject_assignments"]
- assert "subject_assignments" in subject_assignment
- id = list(value.keys())[0]
- assert value[id]['policy_id'] == policy_id
- assert value[id]['category_id'] == "111"
- assert value[id]['subject_id'] == "id1"
-
-
-def test_delete_subject_assignment():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_subject_assignment(client, policy_id)
- assert success_req.status_code == 200
-
-# ---------------------------------------------------------------------------
-
-# object_categories_test
-
-
-def get_object_assignment(client, policy_id):
- req = client.get("/policies/{}/object_assignments".format(policy_id))
- object_assignment = utilities.get_json(req.data)
- return req, object_assignment
-
-
-def add_object_assignment(client, policy_id, category_id):
- data = {
- "id": "id1",
- "category_id": category_id,
- "data_id": "data_id1"
- }
- req = client.post("/policies/{}/object_assignments/{}".format(policy_id, category_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- object_assignment = utilities.get_json(req.data)
- return req, object_assignment
-
-
-def delete_object_assignment(client, policy_id):
- req = client.delete("/policies/{}/object_assignments".format(policy_id))
- return req
-
-
-def test_get_object_assignment():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, object_assignment = get_object_assignment(client, policy_id)
- assert req.status_code == 200
- assert isinstance(object_assignment, dict)
- assert "object_assignments" in object_assignment
-
-
-def test_add_object_assignment():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, object_assignment = add_object_assignment(client, policy_id, "111")
- assert req.status_code == 200
- assert isinstance(object_assignment, dict)
- value = object_assignment["object_assignments"]
- assert "object_assignments" in object_assignment
- id = list(value.keys())[0]
- assert value[id]['policy_id'] == policy_id
- assert value[id]['category_id'] == "111"
- assert value[id]['object_id'] == "id1"
-
-
-def test_delete_object_assignment():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_object_assignment(client, policy_id)
- assert success_req.status_code == 200
-
-# ---------------------------------------------------------------------------
-
-# action_categories_test
-
-
-def get_action_assignment(client, policy_id):
- req = client.get("/policies/{}/action_assignments".format(policy_id))
- action_assignment = utilities.get_json(req.data)
- return req, action_assignment
-
-
-def add_action_assignment(client, policy_id, category_id):
- data = {
- "id": "id1",
- "category_id": category_id,
- "data_id": "data_id1"
- }
- req = client.post("/policies/{}/action_assignments/{}".format(policy_id, category_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- action_assignment = utilities.get_json(req.data)
- return req, action_assignment
-
-
-def delete_action_assignment(client, policy_id):
- req = client.delete("/policies/{}/action_assignments".format(policy_id))
- return req
-
-
-def test_get_action_assignment():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, action_assignment = get_action_assignment(client, policy_id)
- assert req.status_code == 200
- assert isinstance(action_assignment, dict)
- assert "action_assignments" in action_assignment
-
-
-def test_add_action_assignment():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, action_assignment = add_action_assignment(client, policy_id, "111")
- assert req.status_code == 200
- assert isinstance(action_assignment, dict)
- value = action_assignment["action_assignments"]
- assert "action_assignments" in action_assignment
- id = list(value.keys())[0]
- assert value[id]['policy_id'] == policy_id
- assert value[id]['category_id'] == "111"
- assert value[id]['action_id'] == "id1"
-
-
-def test_delete_action_assignment():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_action_assignment(client, policy_id)
- assert success_req.status_code == 200
-
-# --------------------------------------------------------------------------- \ No newline at end of file
diff --git a/moon_manager/tests/unit_python/api/test_auth.py b/moon_manager/tests/unit_python/api/test_auth.py
new file mode 100644
index 00000000..ee59bf5e
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_auth.py
@@ -0,0 +1,71 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from falcon import HTTP_200, HTTP_204, HTTP_401
+import hug
+import base64
+from uuid import uuid4
+from helpers import data_builder as builder
+
+
+def test_get_auth():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_manager.api import auth
+ from moon_manager.api import policy
+ headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))}
+ req = hug.test.get(auth, 'auth/', headers=headers)
+ assert req.status == HTTP_200
+ key = req.data
+ assert get_api_key_for_user("admin") == req.data
+ headers = {"x-api-key": key}
+ req = hug.test.get(policy, 'policies/', headers=headers)
+ assert req.status == HTTP_200
+
+
+def test_del_auth():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_manager.api import auth
+ from moon_manager.api import policy
+ headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))}
+ req = hug.test.get(auth, 'auth/', headers=headers)
+ assert req.status == HTTP_200
+ key = req.data
+ headers = {"x-api-key": key}
+ req = hug.test.delete(auth, 'auth/', headers=headers)
+ assert req.status == HTTP_204
+ req = hug.test.get(policy, 'policies/', headers=headers)
+ assert req.status == HTTP_401
+ assert not get_api_key_for_user("admin")
+
+
+def test_readd_auth():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_manager.api import auth
+ from moon_manager.api import policy
+ headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))}
+ req = hug.test.get(auth, 'auth/', headers=headers)
+ assert req.status == HTTP_200
+ key = req.data
+ headers = {"x-api-key": key}
+ req = hug.test.delete(auth, 'auth/', headers=headers)
+ assert req.status == HTTP_204
+ headers = {"Authorization": "Basic {}".format(base64.b64encode(b"admin:admin").decode("utf-8"))}
+ req = hug.test.get(auth, 'auth/', headers=headers)
+ assert req.status == HTTP_200
+ new_key = req.data
+ headers = {"x-api-key": new_key}
+ req = hug.test.get(policy, 'policies/', headers=headers)
+ assert req.status == HTTP_200
+ assert get_api_key_for_user("admin")
+ assert get_api_key_for_user("admin") == new_key
+ assert get_api_key_for_user("admin") != key
+
diff --git a/moon_manager/tests/unit_python/api/test_data.py b/moon_manager/tests/unit_python/api/test_data.py
index 87a80c69..019a8b45 100644
--- a/moon_manager/tests/unit_python/api/test_data.py
+++ b/moon_manager/tests/unit_python/api/test_data.py
@@ -1,46 +1,83 @@
-import api.utilities as utilities
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
import json
+from uuid import uuid4
+
+import hug
+import pytest
+from helpers import data_builder as builder
+from helpers import policy_helper
+from moon_utilities import exceptions
# subject_categories_test
-def get_subject_data(client, policy_id):
- req = client.get("/policies/{}/subject_data".format(policy_id))
- subject_data = utilities.get_json(req.data)
+def get_subject_data(policy_id, category_id=None):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if category_id is None:
+ req = hug.test.get(data, "/policies/{}/subject_data".format(policy_id), headers=auth_headers)
+ else:
+ req = hug.test.get(data, "/policies/{}/subject_data/{}".format(policy_id, category_id), headers=auth_headers)
+ subject_data = req.data
return req, subject_data
-def add_subject_data(client, name, policy_id, category_id):
- data = {
+def add_subject_data(name):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ body = {
"name": name,
"description": "description of {}".format(name)
}
- req = client.post("/policies/{}/subject_data/{}".format(policy_id, category_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- subject_data = utilities.get_json(req.data)
+ req = hug.test.post(data, "/policies/{}/subject_data/{}".format(policy_id, subject_category_id),
+ body=json.dumps(body),
+ headers={'Content-Type': 'application/json', "X-Api-Key": get_api_key_for_user("admin")})
+ subject_data = req.data
return req, subject_data
-def delete_subject_data(client, policy_id):
- req = client.delete("/policies/{}/subject_data".format(policy_id))
+def delete_subject_data(policy_id, category_id, data_id):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(data, "/policies/{}/subject_data/{}/{}".format(policy_id, category_id,
+ data_id), headers=auth_headers)
return req
def test_get_subject_data():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, subject_data = get_subject_data(client, policy_id)
- assert req.status_code == 200
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+ req, subject_data = get_subject_data(policy_id)
+ assert req.status == hug.HTTP_200
assert isinstance(subject_data, dict)
assert "subject_data" in subject_data
def test_add_subject_data():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, subject_data = add_subject_data(client, "testuser", policy_id, "111")
- assert req.status_code == 200
+ req, subject_data = add_subject_data("testuser")
+ assert req.status == hug.HTTP_200
assert isinstance(subject_data, dict)
value = subject_data["subject_data"]['data']
assert "subject_data" in subject_data
@@ -49,120 +86,236 @@ def test_add_subject_data():
assert value[id]['description'] == "description of {}".format("testuser")
+def test_add_subject_data_invalid_name():
+ with pytest.raises(exceptions.DataContentError) as exception_info:
+ req, subject_data = add_subject_data(" ")
+ # assert req.status == hug.HTTP_400
+ assert '400: Data Content Error' == str(exception_info.value)
+ with pytest.raises(exceptions.DataContentError) as exception_info:
+ req, subject_data = add_subject_data("")
+ # assert req.status == hug.HTTP_400
+ assert '400: Data Content Error' == str(exception_info.value)
+
+
def test_delete_subject_data():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_subject_data(client, policy_id)
- assert success_req.status_code == 200
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_subject_data(policy_id, subject_category_id)
+ success_req = delete_subject_data(policy_id, subject_category_id, data_id)
+ assert success_req.status == hug.HTTP_200
+
+
+def test_add_subject_data_with_forbidden_char_in_user():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, subject_data = add_subject_data("<a>")
+ # assert '400: Invalid Content' == str(exception_info.value)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_subject_data_without_policy_id():
+ success_req = delete_subject_data("", "", "")
+ assert success_req.status == hug.HTTP_405
-# ---------------------------------------------------------------------------
+# ---------------------------------------------------------------------------
# object_categories_test
-def get_object_data(client, policy_id):
- req = client.get("/policies/{}/object_data".format(policy_id))
- object_data = utilities.get_json(req.data)
+def get_object_data(policy_id, category_id=None):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if category_id is None:
+ req = hug.test.get(data, "/policies/{}/object_data".format(policy_id), headers=auth_headers)
+ else:
+ req = hug.test.get(data, "/policies/{}/object_data/{}".format(policy_id, category_id), headers=auth_headers)
+ object_data = req.data
return req, object_data
-def add_object_data(client, name, policy_id, category_id):
- data = {
+def add_object_data(name):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ body = {
"name": name,
"description": "description of {}".format(name)
}
- req = client.post("/policies/{}/object_data/{}".format(policy_id, category_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- object_data = utilities.get_json(req.data)
+ req = hug.test.post(data, "/policies/{}/object_data/{}".format(policy_id, object_category_id),
+ body=json.dumps(body), headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ object_data = req.data
return req, object_data
-def delete_object_data(client, policy_id):
- req = client.delete("/policies/{}/object_data".format(policy_id))
+def delete_object_data(policy_id, category_id, data_id):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(data, "/policies/{}/object_data/{}/{}".format(policy_id, category_id,
+ data_id), headers=auth_headers)
return req
def test_get_object_data():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, object_data = get_object_data(client, policy_id)
- assert req.status_code == 200
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+ req, object_data = get_object_data(policy_id)
+ assert req.status == hug.HTTP_200
assert isinstance(object_data, dict)
assert "object_data" in object_data
def test_add_object_data():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, object_data = add_object_data(client, "testuser", policy_id, "111")
- assert req.status_code == 200
+ req, object_data = add_object_data("testuser")
+ assert req.status == hug.HTTP_200
assert isinstance(object_data, dict)
value = object_data["object_data"]['data']
assert "object_data" in object_data
- id = list(value.keys())[0]
- assert value[id]['value']['name'] == "testuser"
- assert value[id]['value']['description'] == "description of {}".format("testuser")
+ _id = list(value.keys())[0]
+ assert value[_id]['name'] == "testuser"
+ assert value[_id]['description'] == "description of {}".format("testuser")
+
+
+def test_add_object_data_invalid_name():
+ with pytest.raises(exceptions.DataContentError) as exception_info:
+ req, object_data = add_object_data(" ")
+ # assert req.status == hug.HTTP_400
+ assert '400: Data Content Error' == str(exception_info.value)
+ with pytest.raises(exceptions.DataContentError):
+ req, object_data = add_object_data("")
+ # assert req.status == hug.HTTP_400
+ assert '400: Data Content Error' == str(exception_info.value)
def test_delete_object_data():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_object_data(client, policy_id)
- assert success_req.status_code == 200
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_object_data(policy_id, object_category_id)
+ success_req = delete_object_data(policy_id, data_id, object_category_id)
+ assert success_req.status == hug.HTTP_200
+
+
+def test_add_object_data_with_forbidden_char_in_user():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, subject_data = add_object_data("<a>")
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+ # assert '400: Invalid Content' == str(exception_info.value)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
-# ---------------------------------------------------------------------------
+def test_delete_object_data_without_policy_id():
+ success_req = delete_object_data("", "", "")
+ assert success_req.status == hug.HTTP_405
+
+
+# ---------------------------------------------------------------------------
# action_categories_test
-def get_action_data(client, policy_id):
- req = client.get("/policies/{}/action_data".format(policy_id))
- action_data = utilities.get_json(req.data)
+def get_action_data(policy_id, category_id=None):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if category_id is None:
+ req = hug.test.get(data, "/policies/{}/action_data".format(policy_id),
+ headers=auth_headers)
+ else:
+ req = hug.test.get(data, "/policies/{}/action_data/{}".format(policy_id, category_id),
+ headers=auth_headers)
+ action_data = req.data
return req, action_data
-def add_action_data(client, name, policy_id, category_id):
- data = {
+def add_action_data(name):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ body = {
"name": name,
"description": "description of {}".format(name)
}
- req = client.post("/policies/{}/action_data/{}".format(policy_id, category_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- action_data = utilities.get_json(req.data)
+ req = hug.test.post(data, "/policies/{}/action_data/{}".format(policy_id, action_category_id),
+ body=json.dumps(body),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ action_data = req.data
return req, action_data
-def delete_action_data(client, policy_id):
- req = client.delete("/policies/{}/action_data".format(policy_id))
+def delete_action_data(policy_id, categorgy_id, data_id):
+ from moon_manager.api import data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(data, "/policies/{}/action_data/{}/{}".format(policy_id, categorgy_id,
+ data_id), headers=auth_headers)
return req
def test_get_action_data():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, action_data = get_action_data(client, policy_id)
- assert req.status_code == 200
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+ req, action_data = get_action_data(policy_id)
+ assert req.status == hug.HTTP_200
assert isinstance(action_data, dict)
assert "action_data" in action_data
def test_add_action_data():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, action_data = add_action_data(client, "testuser", policy_id, "111")
- assert req.status_code == 200
+ req, action_data = add_action_data("testuser")
+ assert req.status == hug.HTTP_200
assert isinstance(action_data, dict)
value = action_data["action_data"]['data']
assert "action_data" in action_data
id = list(value.keys())[0]
- assert value[id]['value']['name'] == "testuser"
- assert value[id]['value']['description'] == "description of {}".format("testuser")
+ assert value[id]['name'] == "testuser"
+ assert value[id]['description'] == "description of {}".format("testuser")
+
+
+def test_add_action_data_invalid_name():
+
+ with pytest.raises(exceptions.DataContentError)as exception_info:
+ req, action_data = add_action_data(" ")
+ # assert req.status == hug.HTTP_400
+ assert '400: Data Content Error' == str(exception_info.value)
+ with pytest.raises(exceptions.DataContentError) as exception_info:
+ req, action_data = add_action_data("")
+ # assert req.status == hug.HTTP_400
+ assert '400: Data Content Error' == str(exception_info.value)
def test_delete_action_data():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_action_data(client, policy_id)
- assert success_req.status_code == 200
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_action_data(policy_id, action_category_id)
+ success_req = delete_action_data(policy_id, data_id, action_category_id)
+ assert success_req.status == hug.HTTP_200
+
+
+def test_add_action_data_with_forbidden_char_in_user():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, action_data = add_action_data("<a>")
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+ # assert '400: Invalid Content' == str(exception_info.value)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
-# --------------------------------------------------------------------------- \ No newline at end of file
+
+def test_delete_action_data_without_policy_id():
+ success_req = delete_action_data("", "", "")
+ assert success_req.status == hug.HTTP_405
+# ---------------------------------------------------------------------------
diff --git a/moon_manager/tests/unit_python/api/test_json_export.py b/moon_manager/tests/unit_python/api/test_json_export.py
new file mode 100644
index 00000000..8de394c9
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_json_export.py
@@ -0,0 +1,321 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import json
+import api.utilities as utilities
+import helpers.import_export_helper as import_export_helper
+import hug
+
+MODEL_WITHOUT_META_RULES = {"models": [{"name": "test model", "description": "model description", "meta_rules": []}]}
+
+DATA = {"subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]
+ }
+
+META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action action description"}],
+ "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}
+
+
+SUBJECTS_OBJECTS_ACTIONS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name":"meta rule"}]}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}],
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}],
+ **META_RULES
+ }
+
+SUBJECT_OBJECT_ACTION_CATEGORIES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action category description"}]}
+
+SUBJECT_OBJECT_ACTION_DATA = {**SUBJECTS_OBJECTS_ACTIONS,
+ **DATA
+ }
+POLICIES = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}],
+ **META_RULES,
+ }
+
+ASSIGNMENTS = {**POLICIES,
+ **DATA,
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object e0", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action e0", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}],
+ "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}],
+ "object_assignments": [{"object": {"name": "test object e0"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}],
+ "action_assignments": [{"action": {"name": "test action e0"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}]}
+
+RULES = {**POLICIES,
+ **DATA,
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object e1", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action e1", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}],
+ "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}],
+ "object_assignments": [{"object": {"name": "test object e1"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}],
+ "action_assignments": [{"action": {"name": "test action e1"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}],
+ "rules": [{"meta_rule": {"name": "meta rule"}, "rule": {"subject_data": [{"name": "test "
+ "subject data"}],
+ "object_data": [{"name": "test object data"}],
+ "action_data": [{"name": "test action data"}]}, "policy": {"name":"test policy"}, "instructions": [{"decision": "grant"}], "enabled": True}]
+ }
+
+
+def test_export_models():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ # import_export_helper.clean_all()
+
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ MODEL_WITHOUT_META_RULES), headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")} )
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in MODEL_WITHOUT_META_RULES.keys())
+
+ req = hug.test.get(json_export, "/export", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+
+ assert "content" in data
+ assert "models" in data["content"]
+ assert isinstance(data["content"]["models"], list)
+ assert len(data["content"]["models"]) == 1
+ model = data["content"]["models"][0]
+ assert model["name"] == "test model"
+ assert model["description"] == "model description"
+ assert isinstance(model["meta_rules"], list)
+ assert len(model["meta_rules"]) == 0
+
+
+def test_export_policies():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ import_export_helper.clean_all()
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ POLICIES), headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in POLICIES.keys())
+
+ req = hug.test.get(json_export, "/export", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+
+ assert "content" in data
+ assert "policies" in data["content"]
+ assert isinstance(data["content"]["policies"], list)
+ assert len(data["content"]["policies"]) == 1
+ policy = data["content"]["policies"][0]
+ assert policy["name"] == "test policy"
+ assert policy["genre"] == "authz"
+ assert policy["description"] == "policy description"
+ assert "model" in policy
+ assert "name" in policy["model"]
+ model = policy["model"]
+ assert model["name"] == "test model"
+
+
+def test_export_subject_object_action():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ import_export_helper.clean_all()
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ SUBJECTS_OBJECTS_ACTIONS) ,headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in SUBJECTS_OBJECTS_ACTIONS.keys())
+
+ req = hug.test.get(json_export, "/export", headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "s"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ element = data["content"][key][0]
+ if type_element == "subject":
+ assert element["name"] == "testuser"
+ else:
+ assert element["name"] == "test "+ type_element
+ assert element["description"] == "description of the " + type_element
+ assert "policies" in element
+ assert isinstance(element["policies"], list)
+ assert len(element["policies"]) == 1
+ assert isinstance(element["policies"][0], dict)
+ assert element["policies"][0]["name"] == "test policy"
+ assert isinstance(element["extra"], dict)
+ key_dict = "field_extra_" + type_element
+ value_dict = "value extra " + type_element
+ assert key_dict in element["extra"]
+ assert element["extra"][key_dict] == value_dict
+
+
+def test_export_subject_object_action_categories():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ import_export_helper.clean_all()
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ SUBJECT_OBJECT_ACTION_CATEGORIES), headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in SUBJECT_OBJECT_ACTION_CATEGORIES.keys())
+
+ req = hug.test.get(json_export, "/export", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "_categories"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ category = data["content"][key][0]
+ assert category["name"] == "test " + type_element + " categories"
+ assert category["description"] == type_element + " category description"
+
+
+def test_export_subject_object_action_data():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ import_export_helper.clean_all()
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ SUBJECT_OBJECT_ACTION_DATA), headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in SUBJECT_OBJECT_ACTION_DATA.keys())
+
+ req = hug.test.get(json_export, "/export", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "_data"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ data_elt = data["content"][key][0]
+ assert data_elt["name"] == "test " + type_element + " data"
+ assert data_elt["description"] == type_element + " data description"
+ assert isinstance(data_elt["policy"], dict)
+ assert data_elt["policy"]["name"] == "test policy"
+ assert isinstance(data_elt["category"], dict)
+ assert data_elt["category"]["name"] == "test " + type_element + " categories"
+
+
+def test_export_assignments():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ import_export_helper.clean_all()
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ ASSIGNMENTS), headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in ASSIGNMENTS.keys())
+
+ req = hug.test.get(json_export, "/export", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "_assignments"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ assignment_elt = data["content"][key][0]
+ assert type_element in assignment_elt
+ assert isinstance(assignment_elt[type_element], dict)
+ if type_element == "subject":
+ assert assignment_elt[type_element]["name"] == "testuser"
+ else:
+ assert assignment_elt[type_element]["name"] == "test " + type_element + " e0"
+ assert "category" in assignment_elt
+ assert isinstance(assignment_elt["category"], dict)
+ assert assignment_elt["category"]["name"] == "test " + type_element + " categories"
+ assert "assignments" in assignment_elt
+ assert isinstance(assignment_elt["assignments"], list)
+ assert len(assignment_elt["assignments"]) == 1
+ assert assignment_elt["assignments"][0]["name"] == "test " + type_element + " data"
+
+ import_export_helper.clean_all()
+
+
+def test_export_rules():
+ from moon_manager.api import json_import
+ from moon_manager.api import json_export
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ import_export_helper.clean_all()
+ req = hug.test.post(json_import, "/import", body=json.dumps(
+ RULES), headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = utilities.get_json(req.data)
+ assert all(e in data for e in RULES.keys())
+
+ req = hug.test.get(json_export, "/export", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ assert "rules" in data["content"]
+ assert isinstance(data["content"]["rules"], list)
+ assert len(data["content"]["rules"]) == 1
+ rule = data["content"]["rules"][0]
+ assert "instructions" in rule
+ assert "decision" in rule["instructions"][0]
+ assert rule["instructions"][0]["decision"] == "grant"
+ assert "enabled" in rule
+ assert rule["enabled"]
+ assert "meta_rule" in rule
+ assert rule["meta_rule"]["name"] == "meta rule"
+ assert "policy" in rule
+ assert rule["policy"]["name"] == "test policy"
+ assert "rule" in rule
+ rule = rule["rule"]
+ assert "subject_data" in rule
+ assert isinstance(rule["subject_data"], list)
+ assert len(rule["subject_data"]) == 1
+ assert rule["subject_data"][0]["name"] == "test subject data"
+ assert "object_data" in rule
+ assert isinstance(rule["object_data"], list)
+ assert len(rule["object_data"]) == 1
+ assert rule["object_data"][0]["name"] == "test object data"
+ assert "action_data" in rule
+ assert isinstance(rule["action_data"], list)
+ assert len(rule["action_data"]) == 1
+ assert rule["action_data"][0]["name"] == "test action data"
diff --git a/moon_manager/tests/unit_python/api/test_json_import.py b/moon_manager/tests/unit_python/api/test_json_import.py
new file mode 100644
index 00000000..3195eca3
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_json_import.py
@@ -0,0 +1,832 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import api.test_models as test_models
+import api.test_policies as test_policies
+import api.test_meta_data as test_categories
+import api.test_data as test_data
+import api.test_meta_rules as test_meta_rules
+import api.test_assignement as test_assignments
+import api.test_rules as test_rules
+import helpers.import_export_helper as import_export_helper
+import helpers.policy_helper as policy_helper
+import hug
+import json
+import pytest
+from moon_utilities import exceptions
+
+
+MODEL_WITHOUT_META_RULES = [
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}]},
+ {"models": [{"name": "test model", "description": "new description", "meta_rules": [],
+ "override": True}]},
+ {"models": [{"name": "test model", "description": "description not taken into account",
+ "meta_rules": [], "override": False}]}
+]
+
+POLICIES = [
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}]},
+ {"policies": [{"name": "test policy", "genre": "authz",
+ "description": "new description not taken into account",
+ "model": {"name": "test model"}, "mandatory": True}]},
+ {"policies": [
+ {"name": "test policy", "genre": "not authz ?", "description": "generates an exception",
+ "model": {"name": "test model"}, "override": True}]},
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [
+ {"name": "test policy", "genre": "not authz ?", "description": "changes taken into account",
+ "model": {"name": "test model"}, "override": True}]},
+]
+
+SUBJECTS = [{"subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {}, "mandatory": True}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": []}]},
+ {"subjects": [{"name": "testuser", "description": "new description of the subject",
+ "extra": {"email": "new-email@test.com"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "subjects": [
+ {"name": "testuser", "description": "description of the subject", "extra": {},
+ "policies": [{"name": "test policy"}]}]}]
+
+OBJECTS = [
+ {"objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": True}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": []}]},
+ {"objects": [{"name": "test object", "description": "new description of the object",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {},
+ "policies": [{"name": "test policy"}]}]},
+]
+
+ACTIONS = [{"actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {}, "mandatory": True}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": []}]},
+ {"actions": [{"name": "test action", "description": "new description of the action",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description", "model": {},
+ "mandatory": False}], "actions": [
+ {"name": "test action", "description": "description of the action", "extra": {},
+ "policies": [{"name": "test policy"}]}]}]
+
+SUBJECT_CATEGORIES = [{"subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"}]},
+ {"subject_categories": [{"name": "test subject categories",
+ "description": "new subject category description"}]}]
+
+OBJECT_CATEGORIES = [{"object_categories": [
+ {"name": "test object categories", "description": "object category description"}]},
+ {"object_categories": [{"name": "test object categories",
+ "description": "new object category description"}]}]
+
+ACTION_CATEGORIES = [{"action_categories": [
+ {"name": "test action categories", "description": "action category description"}]},
+ {"action_categories": [{"name": "test action categories",
+ "description": "new action category description"}]}]
+
+# meta_rules import is needed otherwise the search for data do not work !!!
+PRE_DATA = {"models": [{"name": "test model", "description": "",
+ "meta_rules": [{"name": "good meta rule"},
+ {"name": "other good meta rule"}]}],
+ "policies": [
+ {"name": "test other policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"},
+ {"name": "other test subject categories",
+ "description": "subject category description"}],
+ "object_categories": [
+ {"name": "test object categories", "description": "object category description"},
+ {"name": "other test object categories",
+ "description": "object category description"}],
+ "action_categories": [
+ {"name": "test action categories", "description": "action category description"},
+ {"name": "other test action categories",
+ "description": "action category description"}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]},
+ {"name": "other good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "other test subject categories"}],
+ "object_categories": [{"name": "other test object categories"}],
+ "action_categories": [{"name": "other test action categories"}]}]}
+
+SUBJECT_DATA = [{"subject_data": [
+ {"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]},
+ {"subject_data": [
+ {"name": "not valid subject data", "description": "", "policies": [{}],
+ "category": {"name": "test subject categories"}}]},
+ {"policies": [
+ {"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "subject_data": [
+ {"name": "one valid subject data", "description": "description",
+ "policies": [{}], "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test subject categories"}}]}]
+
+OBJECT_DATA = [{"object_data": [
+ {"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]},
+ {"object_data": [
+ {"name": "not valid object data", "description": "", "policies": [{}],
+ "category": {"name": "test object categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "object_data": [
+ {"name": "one valid object data", "description": "description", "policies": [{}],
+ "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test object categories"}}]}]
+
+ACTION_DATA = [{"action_data": [
+ {"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]},
+ {"action_data": [
+ {"name": "not valid action data", "description": "", "policies": [{}],
+ "category": {"name": "test action categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}], "action_data": [
+ {"name": "one valid action data", "description": "description", "policies": [{}],
+ "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "description",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "new description",
+ "policies": [{"name": "test other policy"}],
+ "category": {"name": "test action categories"}}]}]
+
+PRE_META_RULES = {"subject_categories": [
+ {"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories",
+ "description": "object category description"}],
+ "action_categories": [{"name": "test action categories",
+ "description": "object action description"}]}
+
+META_RULES = [{"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "not valid category"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "not valid category"}],
+ "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "not valid category"}]}]},
+ {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}]}]
+
+PRE_ASSIGNMENTS = {"models": [
+ {"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}],
+ "policies": [
+ {"name": "test policy", "genre": "authz", "description": "description",
+ "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [{"name": "test subject categories",
+ "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories",
+ "description": "object category description"}],
+ "action_categories": [{"name": "test action categories",
+ "description": "object action description"}],
+ "subjects": [{"name": "testuser", "description": "description of the subject",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object", "description": "description of the object",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action", "description": "description of the action",
+ "extra": {}, "policies": [{"name": "test policy"}]}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule",
+ "subject_categories": [{"name": "test subject categories"}],
+ "object_categories": [{"name": "test object categories"}],
+ "action_categories": [{"name": "test action categories"}]}],
+ "subject_data": [{"name": "subject data", "description": "test subject data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "object data", "description": "test object data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "action data", "description": "test action data",
+ "policies": [{"name": "test policy"}],
+ "category": {"name": "test action categories"}}]}
+
+SUBJECT_ASSIGNMENTS = [
+ {"subject_assignments": [
+ {"subject": {"name": "unknown"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"subject_assignments": [
+ {"subject": {"name": "testuser"},
+ "category": {"name": "test subject categories"},
+ "assignments": [{"name": "subject data"}]}],
+ "exception": None
+ }]
+
+OBJECT_ASSIGNMENTS = [
+ {"object_assignments": [
+ {"object": {"name": "unknown"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"object_assignments": [
+ {"object": {"name": "test object"},
+ "category": {"name": "test object categories"},
+ "assignments": [{"name": "object data"}]}],
+ "exception": None
+ }]
+
+ACTION_ASSIGNMENTS = [
+ {"action_assignments": [
+ {"action": {"name": "unknown"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "unknown"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": exceptions.UnknownName
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "unknown"}]}],
+ "exception": exceptions.InvalidJson
+ },
+ {"action_assignments": [
+ {"action": {"name": "test action"},
+ "category": {"name": "test action categories"},
+ "assignments": [{"name": "action data"}]}],
+ "exception": None
+ }]
+
+RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test "
+ "policy"},
+ "instructions": [{"decision": "grant"}], "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown "
+ "policy"},
+ "instructions": [{"decision": "grant"}], "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": [{"decision": "grant"}], "enabled": True, "rule": {
+ "subject_data": [{"name": "unknown subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": [{"decision": "grant"}], "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "unknown object data"}],
+ "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": [{"decision": "grant"}], "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "unknown action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"},
+ "instructions": [{"decision": "grant"}], "enabled": True, "rule": {
+ "subject_data": [{"name": "subject data"}],
+ "object_data": [{"name": "object data"}],
+ "action_data": [{"name": "action data"}]}}]}]
+
+
+def test_import_models_without_new_meta_rules():
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ import_export_helper.clean_all()
+ counter = 0
+ for models_description in MODEL_WITHOUT_META_RULES:
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(models_description)
+ , headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ data = req.data
+ assert all(e in data for e in models_description.keys())
+ req, models = test_models.get_models()
+ models = models["models"]
+ assert len(list(models.keys())) == 1
+ values = list(models.values())
+ assert values[0]["name"] == "test model"
+ if counter == 0:
+ assert len(values[0]["description"]) == 0
+ if counter == 1 or counter == 2:
+ assert values[0]["description"] == "new description"
+ counter = counter + 1
+ import_export_helper.clean_all()
+
+
+def test_import_policies():
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ import_export_helper.clean_all()
+ counter = -1
+ for policy_description in POLICIES:
+ counter = counter + 1
+ from moon_manager.api import json_import
+ if counter == 2:
+ with pytest.raises(exceptions.UnknownName):
+ req = hug.test.post(json_import, "/import", body=json.dumps(policy_description),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ continue
+ else:
+ req = hug.test.post(json_import, "/import", body=json.dumps(policy_description),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ data = req.data
+ assert all(e in data for e in policy_description.keys())
+
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = test_policies.get_policies(auth_headers)
+ policies = req.data
+ policies = policies["policies"]
+ assert len(list(policies.keys())) == 1
+ values = list(policies.values())
+ assert values[0]["name"] == "test policy"
+ if counter < 3:
+ assert values[0]["genre"] == "authz"
+ assert values[0]["description"] == "description"
+ else:
+ assert values[0]["genre"] == "not authz ?"
+ assert values[0]["description"] == "changes taken into account"
+ assert len(values[0]["model_id"]) > 0
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ type_elements = ["object", "action"]
+ perimeter_id = None
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECTS
+ clean_method = import_export_helper.clean_subjects
+ name = "testuser"
+ key_extra = "email"
+ value_extra = "new-email@test.com"
+ elif type_element == "object":
+ elements = OBJECTS
+ clean_method = import_export_helper.clean_objects
+ name = "test object"
+ key_extra = "test"
+ value_extra = "test extra"
+ else:
+ elements = ACTIONS
+ clean_method = import_export_helper.clean_actions
+ name = "test action"
+ key_extra = "test"
+ value_extra = "test extra"
+
+ for element in elements:
+ counter = counter + 1
+ if counter == 2 or counter == 4:
+ clean_method()
+
+ from moon_manager.api import perimeter
+ if counter == 3:
+ req = hug.test.patch(perimeter, "/{}s/{}".format(type_element, perimeter_id),
+ body=json.dumps(element["{}s".format(type_element)][0]),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ elif counter < 2:
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/{}s/{}".format(type_element, perimeter_id),
+ body=json.dumps(element["{}s".format(type_element)][0]),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ # assert req.status == hug.HTTP_400
+ assert '400: Perimeter content is invalid.' == str(exception_info.value)
+ continue
+ else:
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(element),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+
+ try:
+ data = req.data
+ except Exception as e:
+ assert False
+ # assert counter < 2 #  this is an expected failure
+ # continue
+
+ if counter != 3:
+ assert any(e in data for e in element["{}s".format(type_element)][0].keys()) #NOTE: logs are skipped for some elements
+
+ from moon_manager.api import perimeter
+ get_elements = hug.test.get(perimeter, "/" + type_element + "s", headers=auth_headers ).data
+ get_elements = get_elements[type_element + "s"]
+
+ perimeter_id = list(get_elements.keys())[0]
+
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == name
+ if counter == 2 or counter == 4:
+ assert values[0]["description"] == "description of the " + type_element
+ # assert not values[0]["extra"]
+ if counter == 3:
+ assert values[0]["description"] == "new description of the " + type_element
+ assert values[0]["extra"][key_extra] == value_extra
+
+ #  assert len(values[0]["policy_list"]) == 1
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action_categories():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECT_CATEGORIES
+ get_method = test_categories.get_subject_categories
+ elif type_element == "object":
+ elements = OBJECT_CATEGORIES
+ get_method = test_categories.get_object_categories
+ else:
+ elements = ACTION_CATEGORIES
+ get_method = test_categories.get_action_categories
+
+ for element in elements:
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(element),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")} )
+ counter = counter + 1
+ data = req.data
+ assert all(e in data for e in element.keys())
+ req, get_elements = get_method()
+ get_elements = get_elements[type_element + "_categories"]
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == "test " + type_element + " categories"
+ assert values[0]["description"] == type_element + " category description"
+
+
+def test_import_meta_rules():
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ import_export_helper.clean_all()
+ # import some categories
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(PRE_META_RULES),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ data = req.data
+ assert all(e in data for e in PRE_META_RULES.keys())
+
+ counter = -1
+ for meta_rule in META_RULES:
+ counter = counter + 1
+ if counter != 3:
+ with pytest.raises(exceptions.UnknownName) as exception_info:
+ req = hug.test.post(json_import, "/import", body=json.dumps(meta_rule),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ # assert req.status == hug.HTTP_400
+ assert '400: Unknown Name.' == str(exception_info.value)
+ continue
+ else:
+ req = hug.test.post(json_import, "/import", body=json.dumps(meta_rule),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ data = req.data
+ assert all(e in data for e in meta_rule.keys())
+ assert req.status == hug.HTTP_200
+
+ req, meta_rules = test_meta_rules.get_meta_rules()
+ meta_rules = meta_rules["meta_rules"]
+ key = list(meta_rules.keys())[0]
+ assert isinstance(meta_rules, dict)
+ assert meta_rules[key]["name"] == "good meta rule"
+ assert meta_rules[key]["description"] == "valid meta rule"
+ assert len(meta_rules[key]["subject_categories"]) == 1
+ assert len(meta_rules[key]["object_categories"]) == 1
+ assert len(meta_rules[key]["action_categories"]) == 1
+
+ subject_category_key = meta_rules[key]["subject_categories"][0]
+ object_category_key = meta_rules[key]["object_categories"][0]
+ action_category_key = meta_rules[key]["action_categories"][0]
+
+ req, sub_cat = test_categories.get_subject_categories()
+ sub_cat = sub_cat["subject_categories"]
+ assert sub_cat[subject_category_key]["name"] == "test subject categories"
+
+ req, ob_cat = test_categories.get_object_categories()
+ ob_cat = ob_cat["object_categories"]
+ assert ob_cat[object_category_key]["name"] == "test object categories"
+
+ req, ac_cat = test_categories.get_action_categories()
+ ac_cat = ac_cat["action_categories"]
+ assert ac_cat[action_category_key]["name"] == "test action categories"
+
+ import_export_helper.clean_all()
+
+
+def test_import_subject_object_action_assignments():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ import_export_helper.clean_all()
+
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(PRE_ASSIGNMENTS),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")} )
+ data = req.data
+ assert any(e in data for e in PRE_ASSIGNMENTS.keys()) #NOTE: note assignment logs are skipped
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ counter = -1
+ if type_element == "subject":
+ datas = SUBJECT_ASSIGNMENTS
+ get_method = test_assignments.get_subject_assignment
+ elif type_element == "object":
+ datas = OBJECT_ASSIGNMENTS
+ get_method = test_assignments.get_object_assignment
+ else:
+ datas = ACTION_ASSIGNMENTS
+ get_method = test_assignments.get_action_assignment
+
+ for assignments in datas:
+ counter = counter + 1
+ my_exception = assignments.pop("exception")
+ if my_exception:
+ with pytest.raises(my_exception) as exception_info:
+ req = hug.test.post(json_import, "/import", body=json.dumps(assignments),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ assert '400:' in str(exception_info.value)
+ else:
+ req = hug.test.post(json_import, "/import", body=json.dumps(assignments),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ assert len(assignments.keys()) > 0 #NOTE logs for assignments are skipped
+ assert req.status == hug.HTTP_200
+ req = test_policies.get_policies(auth_headers=auth_headers)
+ policies = req.data
+ for policy_key in policies["policies"]:
+ req, get_assignments = get_method(policy_key)
+ get_assignments = get_assignments[type_element + "_assignments"]
+ assert len(get_assignments) == 1
+
+
+def test_import_rules():
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ import_export_helper.clean_all()
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(PRE_ASSIGNMENTS),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ data = req.data
+ assert all(e in data for e in PRE_ASSIGNMENTS.keys())
+
+ counter = -1
+ for rule in RULES:
+ counter = counter + 1
+ from moon_manager.api import json_import
+ if counter < 5:
+ with pytest.raises(exceptions.UnknownName) as exception_info:
+ req = hug.test.post(json_import, "/import", body=json.dumps(rule),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+
+ # assert req.status == hug.HTTP_400
+ assert '400: Unknown Name.' == str(exception_info.value)
+ continue
+ req = hug.test.post(json_import, "/import", body=json.dumps(rule),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+
+ assert req.status == hug.HTTP_200
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies = test_policies.get_policies(auth_headers=auth_headers).data
+ for policy in policies['policies']:
+ if policies['policies'][policy]['name'] == rule['rules'][0]['policy']['name']:
+ policy_id = policy
+ break
+
+ req, rules = test_rules.test_get_rules(policy_id)
+ rules = rules["rules"]
+ rules = rules["rules"]
+ assert len(rules) == 1
+ rules = rules[0]
+ assert rules["enabled"]
+ assert rules["instructions"][0]["decision"] == "grant"
+
+ req, meta_rules = test_meta_rules.get_meta_rules()
+ assert meta_rules["meta_rules"][list(meta_rules["meta_rules"].keys())[0]][
+ "name"] == "good meta rule"
+
+
+def test_import_subject_object_action_data():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_helper.clean_all()
+ from moon_manager.api import json_import
+ req = hug.test.post(json_import, "/import", body=json.dumps(PRE_DATA),
+ headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECT_DATA
+ get_method = test_data.get_subject_data
+ get_categories = test_categories.get_subject_categories
+ elif type_element == "object":
+ elements = OBJECT_DATA
+ get_method = test_data.get_object_data
+ get_categories = test_categories.get_object_categories
+ else:
+ elements = ACTION_DATA
+ get_method = test_data.get_action_data
+ get_categories = test_categories.get_action_categories
+
+ for element in elements:
+ from moon_manager.api import json_import
+ counter = counter + 1
+ if counter == 0 or counter == 1:
+ with pytest.raises(exceptions.MissingIdOrName) as exception_info:
+ req = hug.test.post(json_import, "/import", body=json.dumps(element), headers={
+ 'Content-Type': 'application/json', "X-Api-Key": get_api_key_for_user("admin")})
+ # assert req.status == hug.HTTP_400
+ assert '400: Missing ID or Name.' == str(exception_info.value)
+ continue
+ else:
+ req = hug.test.post(json_import, "/import", body=json.dumps(element), headers={
+ 'Content-Type': 'application/json', "X-Api-Key": get_api_key_for_user("admin")})
+ assert req.status == hug.HTTP_200
+ data = req.data
+ assert all(e in data for e in element.keys())
+
+ req = test_policies.get_policies(auth_headers=auth_headers)
+ policies = req.data
+ policies = policies["policies"]
+ req, categories = get_categories()
+ categories = categories[type_element + "_categories"]
+ case_tested = False
+ for policy_key in policies.keys():
+ policy = policies[policy_key]
+ for category_key in categories:
+ req, get_elements = get_method(policy_id=policy_key,
+ category_id=category_key)
+ if len(get_elements[type_element + "_data"]) == 0:
+ continue
+
+ # do this because the backend gives an element with empty data if the policy_key,
+ # category_key couple does not have any data...
+ get_elements = get_elements[type_element + "_data"]
+ if len(get_elements[0]["data"]) == 0:
+ continue
+
+ if policy["name"] == "test policy":
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ if counter == 2:
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "one valid " + type_element + " data"
+ if counter == 3:
+ assert len(el["data"].keys()) == 2
+ el1 = el["data"][list(el["data"].keys())[0]]
+ el2 = el["data"][list(el["data"].keys())[1]]
+ if "value" in el1:
+ el1 = el1["value"]
+ el2 = el2["value"]
+ assert (el1["name"] == "one valid " + type_element + " data" and el2[
+ "name"] == "valid " + type_element + " data") or (el2[
+ "name"] == "one valid " + type_element + " data" and
+ el1[
+ "name"] == "valid " + type_element + " data")
+ assert el1["description"] == "description"
+ assert el2["description"] == "description"
+
+ case_tested = True
+
+ if policy["name"] == "test other policy":
+ if counter == 4:
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "valid " + type_element + " data"
+ assert el["description"] == "new description"
+ case_tested = True
+
+ assert case_tested is True
+
+
+def test_clean():
+ import_export_helper.clean_all()
+ # restore the database as previously
+ policy_helper.add_policies()
diff --git a/moon_manager/tests/unit_python/api/test_keystone.py b/moon_manager/tests/unit_python/api/test_keystone.py
new file mode 100644
index 00000000..5ed08ca7
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_keystone.py
@@ -0,0 +1,63 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+
+def create_project(tenant_dict):
+ from moon_manager.pip_driver import InformationManager
+ return InformationManager["subjects"][0].create_project(**tenant_dict)
+
+
+def list_projects():
+ from moon_manager.pip_driver import InformationManager
+ return InformationManager["subjects"][0].get_projects()
+
+
+def create_user(subject_dict):
+ from moon_manager.pip_driver import InformationManager
+ return InformationManager["subjects"][0].add_item(**subject_dict)
+
+
+def test_create_project():
+ tenant_dict = {
+ "description": "test_project",
+ "domain": ['domain_id_1'],
+ "enabled": True,
+ "is_domain": False,
+ "name": 'project_1'
+ }
+ project = create_project(tenant_dict)
+ assert project
+ assert project.get('name') == tenant_dict.get('name')
+
+# TODO TO BE UPDATED
+# def test_create_project_without_name():
+# tenant_dict = {
+# "description": "test_project",
+# "domain_id": ['domain_id_1'],
+# "enabled": True,
+# "is_domain": False,
+# }
+# with pytest.raises(Exception) as exception_info:
+# create_project(tenant_dict)
+# assert '400: Keystone project error' == str(exception_info.value)
+
+
+def test_create_user():
+ subject_dict = {
+ "password": "password",
+ "domain": ['domain_id_1'],
+ "enabled": True,
+ "project": 'test_project',
+ "name": 'user_id_1'
+ }
+ user = create_user(subject_dict)
+ assert user
diff --git a/moon_manager/tests/unit_python/api/test_meta_data.py b/moon_manager/tests/unit_python/api/test_meta_data.py
new file mode 100644
index 00000000..1d37ab70
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_meta_data.py
@@ -0,0 +1,370 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import json
+from helpers import data_builder
+from uuid import uuid4
+import pytest
+from moon_utilities import exceptions
+
+# subject_categories_test
+
+
+def get_subject_categories():
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(meta_data, "/subject_categories", headers=auth_headers )
+ subject_categories = req.data
+ return req, subject_categories
+
+
+def add_subject_categories(name):
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = hug.test.post(meta_data, "/subject_categories", body=json.dumps(data),
+ headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")})
+
+ subject_categories = req.data
+ return req, subject_categories
+
+
+def delete_subject_categories(name):
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ request, subject_categories = get_subject_categories()
+ for key, value in subject_categories['subject_categories'].items():
+ if value['name'] == name:
+ return hug.test.delete(meta_data, "/subject_categories/{}".format(key), headers=auth_headers )
+ return hug.test.delete(meta_data, "/subject_categories/{}".format(name), headers=auth_headers )
+
+
+def test_get_subject_categories():
+ req, subject_categories = get_subject_categories()
+ assert req.status == hug.HTTP_200
+ assert isinstance(subject_categories, dict)
+ assert "subject_categories" in subject_categories
+
+
+def test_add_subject_categories():
+ name = "testuser" + uuid4().hex
+ req, subject_categories = add_subject_categories(name)
+ assert req.status == hug.HTTP_200
+ assert isinstance(subject_categories, dict)
+ value = list(subject_categories["subject_categories"].values())[0]
+ assert "subject_categories" in subject_categories
+ assert value['name'] == name
+ assert value['description'] == "description of {}".format(name)
+
+
+def test_add_subject_categories_with_existed_name():
+ name = uuid4().hex
+ req, subject_categories = add_subject_categories(name)
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.SubjectCategoryExisting) as exception_info:
+ req, subject_categories = add_subject_categories(name)
+ assert '409: Subject Category Existing' == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data['message'] == '409: Subject Category Existing'
+
+
+def test_add_subject_categories_name_contain_space():
+ with pytest.raises(exceptions.CategoryNameInvalid) as exception_info:
+ req, subject_categories = add_subject_categories(" ")
+ assert '400: Category Name Invalid' == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == '400: Category Name Invalid'
+
+
+def test_add_subject_categories_with_empty_name():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, subject_categories = add_subject_categories("<a>")
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]"
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+
+
+def test_add_subject_categories_with_name_contain_space():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, subject_categories = add_subject_categories("test<z>user")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_subject_categories():
+ name = "testuser" + uuid4().hex
+ add_subject_categories(name)
+ req = delete_subject_categories(name)
+ assert req.status == hug.HTTP_200
+
+
+def test_delete_subject_categories_without_id():
+ with pytest.raises(exceptions.SubjectCategoryUnknown) as exception_info:
+ req = delete_subject_categories(uuid4().hex)
+ assert "400: Subject Category Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "400: Subject Category Unknown"
+
+
+# ---------------------------------------------------------------------------
+# object_categories_test
+
+def get_object_categories():
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(meta_data, "/object_categories", headers=auth_headers )
+ object_categories = req.data
+ return req, object_categories
+
+
+def add_object_categories(name):
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = hug.test.post(meta_data, "/object_categories", body=json.dumps(data),
+ headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")} )
+ object_categories = req.data
+ return req, object_categories
+
+
+def delete_object_categories(name):
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ request, object_categories = get_object_categories()
+ for key, value in object_categories['object_categories'].items():
+ if value['name'] == name:
+ return hug.test.delete(meta_data, "/object_categories/{}".format(key),
+ headers=auth_headers )
+ return hug.test.delete(meta_data, "/object_categories/{}".format(name), headers=auth_headers )
+
+
+def test_get_object_categories():
+ req, object_categories = get_object_categories()
+ assert req.status == hug.HTTP_200
+ assert isinstance(object_categories, dict)
+ assert "object_categories" in object_categories
+
+
+def test_add_object_categories():
+ name="testuser"+uuid4().hex
+ req, object_categories = add_object_categories(name)
+ assert req.status == hug.HTTP_200
+ assert isinstance(object_categories, dict)
+ value = list(object_categories["object_categories"].values())[0]
+ assert "object_categories" in object_categories
+ assert value['name'] == name
+ assert value['description'] == "description of {}".format(name)
+
+
+def test_add_object_categories_with_existed_name():
+ name = uuid4().hex
+ req, object_categories = add_object_categories(name)
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.ObjectCategoryExisting) as exception_info:
+ req, object_categories = add_object_categories(name)
+ assert "409: Object Category Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data['message'] == '409: Object Category Existing'
+
+
+def test_add_object_categories_name_contain_space():
+ with pytest.raises(exceptions.CategoryNameInvalid) as exception_info:
+ req, subject_categories = add_object_categories(" ")
+ assert "400: Category Name Invalid" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == '400: Category Name Invalid'
+
+
+def test_add_object_categories_with_empty_name():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, object_categories = add_object_categories("<a>")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_object_categories_with_name_contain_space():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, object_categories = add_object_categories("test<a>user")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_object_categories():
+ name = uuid4().hex
+ add_object_categories(name)
+ req = delete_object_categories(name)
+ assert req.status == hug.HTTP_200
+
+
+def test_delete_object_categories_without_id():
+ with pytest.raises(exceptions.ObjectCategoryUnknown) as exception_info:
+ req = delete_object_categories(uuid4().hex)
+ assert "400: Object Category Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "400: Object Category Unknown"
+
+
+# ---------------------------------------------------------------------------
+# action_categories_test
+
+def get_action_categories():
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(meta_data, "/action_categories", headers=auth_headers )
+ action_categories = req.data
+ return req, action_categories
+
+
+def add_action_categories(name):
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = hug.test.post(meta_data, "/action_categories", body=json.dumps(data),
+ headers={'Content-Type': 'application/json', "X-Api-Key":
+ get_api_key_for_user("admin")} )
+ action_categories = req.data
+ return req, action_categories
+
+
+def delete_action_categories(name):
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ request, action_categories = get_action_categories()
+ for key, value in action_categories['action_categories'].items():
+ if value['name'] == name:
+ return hug.test.delete(meta_data, "/action_categories/{}".format(key), headers=auth_headers )
+ return hug.test.delete(meta_data, "/action_categories/{}".format(name), headers=auth_headers )
+
+
+def test_get_action_categories():
+ req, action_categories = get_action_categories()
+ assert req.status == hug.HTTP_200
+ assert isinstance(action_categories, dict)
+ assert "action_categories" in action_categories
+
+
+def test_add_action_categories():
+ name = "testuser" + uuid4().hex
+ req, action_categories = add_action_categories(name)
+ assert req.status == hug.HTTP_200
+ assert isinstance(action_categories, dict)
+ value = list(action_categories["action_categories"].values())[0]
+ assert "action_categories" in action_categories
+ assert value['name'] == name
+ assert value['description'] == "description of {}".format(name)
+
+
+def test_add_action_categories_with_existed_name():
+ name = uuid4().hex
+ req, action_categories = add_action_categories(name)
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.ActionCategoryExisting) as exception_info:
+ req, action_categories = add_action_categories(name)
+ assert "409: Action Category Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data['message'] == '409: Action Category Existing'
+
+
+def test_add_action_categories_name_contain_space():
+ with pytest.raises(exceptions.CategoryNameInvalid) as exception_info:
+ req, subject_categories = add_action_categories(" ")
+ assert "400: Category Name Invalid" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == '400: Category Name Invalid'
+
+
+def test_add_action_categories_with_empty_name():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, action_categories = add_action_categories("<a>")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_action_categories_with_name_contain_space():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, action_categories = add_action_categories("test<a>user")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_action_categories():
+ name = "testuser" + uuid4().hex
+ add_action_categories(name)
+ req = delete_action_categories(name)
+ assert req.status == hug.HTTP_200
+
+
+def test_delete_action_categories_without_id():
+ with pytest.raises(exceptions.ActionCategoryUnknown) as exception_info:
+ req = delete_action_categories(uuid4().hex)
+ assert "400: Action Category Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == "400: Action Category Unknown"
+
+
+def test_delete_data_categories_connected_to_meta_rule():
+ from moon_manager.api import meta_data
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+
+ with pytest.raises(exceptions.DeleteSubjectCategoryWithMetaRule) as exception_info:
+ req = hug.test.delete(meta_data, "/subject_categories/{}".format(subject_category_id),
+ headers=auth_headers )
+ assert "400: Subject Category With Meta Rule Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == '400: Subject Category With Meta Rule Error'
+
+ with pytest.raises(exceptions.DeleteObjectCategoryWithMetaRule) as exception_info:
+ req = hug.test.delete(meta_data, "/object_categories/{}".format(object_category_id), headers=auth_headers)
+ assert "400: Object Category With Meta Rule Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == '400: Object Category With Meta Rule Error'
+
+ with pytest.raises(exceptions.DeleteActionCategoryWithMetaRule) as exception_info:
+ req = hug.test.delete(meta_data, "/action_categories/{}".format(action_category_id), headers=auth_headers)
+ assert "400: Action Category With Meta Rule Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data['message'] == '400: Action Category With Meta Rule Error'
diff --git a/moon_manager/tests/unit_python/api/test_meta_rules.py b/moon_manager/tests/unit_python/api/test_meta_rules.py
new file mode 100644
index 00000000..6c6797f5
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_meta_rules.py
@@ -0,0 +1,687 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import api.utilities as utilities
+from helpers import category_helper
+from helpers import data_builder
+from helpers import policy_helper
+from helpers import model_helper
+from helpers import meta_rule_helper
+from uuid import uuid4
+import pytest
+from moon_utilities import exceptions
+
+
+def get_meta_rules():
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(meta_rules, "/meta_rules", headers=auth_headers)
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def add_meta_rules(name, data=None):
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if not data:
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id = list(action_category.keys())[0]
+
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ req = hug.test.post(meta_rules, "/meta_rules", body=data,
+ headers=auth_headers)
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def add_meta_rules_without_category_ids(name):
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ data = {
+ "name": name + uuid4().hex,
+ "subject_categories": [],
+ "object_categories": [],
+ "action_categories": []
+ }
+ req = hug.test.post(meta_rules, "/meta_rules", body=data,
+ headers=auth_headers)
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def update_meta_rules(name, metaRuleId, data=None):
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if not data:
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name update" + uuid4().hex,
+ "description": "description 1"})
+ subject_category_id = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name update" + uuid4().hex,
+ "description": "description 1"})
+ object_category_id = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name update" + uuid4().hex,
+ "description": "description 1"})
+ action_category_id = list(action_category.keys())[0]
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+
+ req = hug.test.patch(meta_rules, "/meta_rules/{}".format(metaRuleId), body=data,
+ headers=auth_headers)
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def update_meta_rules_with_categories(name, data=None, meta_rule_id=None):
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if not meta_rule_id:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+
+ req = hug.test.patch(meta_rules, "/meta_rules/{}".format(meta_rule_id), body=data,
+ headers=auth_headers)
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def delete_meta_rules(name):
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ request, meta_rules_data = get_meta_rules()
+ for key, value in meta_rules_data['meta_rules'].items():
+ if value['name'] == name:
+ return hug.test.delete(meta_rules, "/meta_rules/{}".format(key), headers=auth_headers)
+
+
+def delete_meta_rules_without_id():
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.delete(meta_rules, "/meta_rules/{}".format(""), headers=auth_headers)
+ return req
+
+
+def test_get_meta_rules():
+ req, meta_rules = get_meta_rules()
+ assert req.status == hug.HTTP_200
+ assert isinstance(meta_rules, dict)
+ assert "meta_rules" in meta_rules
+
+
+def test_add_meta_rules():
+ meta_rule_name = uuid4().hex
+ req, meta_rules = add_meta_rules(meta_rule_name)
+ assert req.status == hug.HTTP_200
+ assert isinstance(meta_rules, dict)
+ value = list(meta_rules["meta_rules"].values())[0]
+ assert "meta_rules" in meta_rules
+ assert value['name'] == meta_rule_name
+
+
+def test_add_meta_rules_space_name():
+ with pytest.raises(exceptions.MetaRuleContentError) as exception_info:
+ req, meta_rules = add_meta_rules(" ")
+ assert "400: Meta Rule Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Meta Rule Error'
+
+
+def test_add_meta_rules_empty_name():
+ with pytest.raises(exceptions.MetaRuleContentError) as exception_info:
+ req, meta_rules = add_meta_rules("")
+ assert "400: Meta Rule Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Meta Rule Error'
+
+
+def test_add_two_meta_rules_with_same_categories_combination():
+ meta_rule_name = uuid4().hex
+ req, meta_rules = add_meta_rules(meta_rule_name)
+ data = None
+ assert req.status == hug.HTTP_200
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name:
+ data = meta_rules['meta_rules'][meta_rule_id]
+
+ assert data
+ data['name'] = uuid4().hex
+ with pytest.raises(exceptions.MetaRuleExisting) as exception_info:
+ req, meta_rules = add_meta_rules(name=data['name'], data=data)
+ assert "409: Meta Rule Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Meta Rule Existing'
+
+
+def test_add_three_meta_rules_with_different_combination_but_similar_items():
+ meta_rule_name1 = uuid4().hex
+ req, meta_rules = add_meta_rules(meta_rule_name1)
+ assert req.status == hug.HTTP_200
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name1:
+ data = meta_rules['meta_rules'][meta_rule_id]
+ break
+
+ meta_rule_name2 = uuid4().hex
+
+ req, meta_rules = add_meta_rules(meta_rule_name2)
+
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name2:
+ data['subject_categories'] += meta_rules['meta_rules'][meta_rule_id][
+ 'subject_categories']
+ data['object_categories'] += meta_rules['meta_rules'][meta_rule_id]['object_categories']
+ data['action_categories'] += meta_rules['meta_rules'][meta_rule_id]['action_categories']
+ break
+
+ data['name'] = uuid4().hex
+
+ req, meta_rules = add_meta_rules(name=data['name'], data=data)
+ assert req.status == hug.HTTP_200
+
+
+def test_add_two_meta_rules_with_different_combination_but_similar_items():
+ meta_rule_name1 = uuid4().hex
+ meta_rule_name2 = uuid4().hex
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(meta_rule_name1, data=data)
+ assert req.status == hug.HTTP_200
+ data = {
+ "name": meta_rule_name2,
+ "subject_categories": [subject_category_id2],
+ "object_categories": [object_category_id1],
+ "action_categories": [action_category_id2]
+ }
+
+ req, meta_rules = add_meta_rules(meta_rule_name1, data=data)
+ assert req.status == hug.HTTP_200
+
+
+# This test Succeed as it's okay to have empty id in adding meta rule, as it is not attached to model yet
+def test_add_meta_rules_with_empty_subject_in_mid():
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ value = meta_rule_helper.get_body_meta_rule_with_empty_category_in_mid('subject')
+ with pytest.raises(exceptions.SubjectCategoryUnknown) as exception_info:
+ req = hug.test.post(meta_rules, "/meta_rules", body=value,
+ headers=auth_headers)
+ # assert req.status == hug.HTTP_200
+ assert str(exception_info.value) == "400: Subject Category Unknown"
+
+
+def test_add_meta_rules_with_empty_object_in_mid():
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ value = meta_rule_helper.get_body_meta_rule_with_empty_category_in_mid('object')
+ with pytest.raises(exceptions.ObjectCategoryUnknown) as exception_info:
+ req = hug.test.post(meta_rules, "/meta_rules", body=value,
+ headers=auth_headers)
+ assert str(exception_info.value) == "400: Object Category Unknown"
+
+
+def test_add_meta_rules_with_empty_action_in_mid():
+ from moon_manager.api import meta_rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ value = meta_rule_helper.get_body_meta_rule_with_empty_category_in_mid('action')
+ with pytest.raises(exceptions.ActionCategoryUnknown) as exception_info:
+ req = hug.test.post(meta_rules, "/meta_rules", body=value,
+ headers=auth_headers)
+ assert str(exception_info.value) == "400: Action Category Unknown"
+
+
+def test_add_meta_rule_with_existing_name_error():
+ name = uuid4().hex
+ req, meta_rules = add_meta_rules(name)
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.MetaRuleExisting) as exception_info:
+ req, meta_rules = add_meta_rules(name)
+ assert "409: Meta Rule Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Meta Rule Existing'
+
+
+def test_add_meta_rules_with_forbidden_char_in_name():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, meta_rules = add_meta_rules("<a>")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_meta_rules_with_blank_name():
+ with pytest.raises(exceptions.MetaRuleContentError) as exception_info:
+ req, meta_rules = add_meta_rules("")
+ assert "400: Meta Rule Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Meta Rule Error'
+
+
+def test_add_meta_rules_without_subject_categories():
+ name_meta_rule = uuid4().hex
+ req, meta_rules = add_meta_rules_without_category_ids(name_meta_rule)
+ assert req.status == hug.HTTP_200
+
+
+def test_delete_meta_rules():
+ name_meta_rule = uuid4().hex
+ req, meta_rules = add_meta_rules_without_category_ids(name_meta_rule)
+ meta_rule_id = next(iter(meta_rules['meta_rules']))
+ req = delete_meta_rules(meta_rules['meta_rules'][meta_rule_id]['name'])
+ assert req.status == hug.HTTP_200
+
+
+def test_delete_meta_rules_without_id():
+ with pytest.raises(exceptions.MetaRuleUnknown) as exception_info:
+ req = delete_meta_rules_without_id()
+ assert "400: Meta Rule Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Meta Rule Unknown"
+
+
+def test_update_meta_rules():
+ name = "testuser" + uuid4().hex
+ req = add_meta_rules(name)
+ meta_rule_id = list(req[1]['meta_rules'])[0]
+ req_update = update_meta_rules(name, meta_rule_id)
+ assert req_update[0].status == hug.HTTP_200
+ delete_meta_rules("testuser")
+ get_meta_rules()
+
+
+def test_update_meta_rules_empty_name():
+ req = add_meta_rules("testuser" + uuid4().hex)
+ meta_rule_id = list(req[1]['meta_rules'])[0]
+ with pytest.raises(exceptions.MetaRuleContentError) as exception_info:
+ req_update = update_meta_rules("", meta_rule_id)
+ assert "400: Meta Rule Error" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Meta Rule Error'
+
+
+def test_update_meta_rules_space_name():
+ req = add_meta_rules("testuser" + uuid4().hex)
+ meta_rule_id = list(req[1]['meta_rules'])[0]
+ with pytest.raises(exceptions.MetaRuleContentError) as exception_info:
+ req_update = update_meta_rules(" ", meta_rule_id)
+ assert "400: Meta Rule Error" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Meta Rule Error'
+
+
+def test_update_meta_rule_with_combination_existed():
+ meta_rule_name1 = uuid4().hex
+ req, meta_rules = add_meta_rules(meta_rule_name1)
+ meta_rule_id1 = next(iter(meta_rules['meta_rules']))
+ data1 = meta_rules['meta_rules'][meta_rule_id1]
+
+ meta_rule_name2 = uuid4().hex
+ req, meta_rules = add_meta_rules(meta_rule_name2)
+ meta_rule_id2 = next(iter(meta_rules['meta_rules']))
+ data2 = meta_rules['meta_rules'][meta_rule_id2]
+ data1['name'] = data2['name']
+ with pytest.raises(exceptions.MetaRuleExisting) as exception_info:
+ req_update = update_meta_rules(name=meta_rule_name2, metaRuleId=meta_rule_id2,
+ data=data1)
+ assert "409: Meta Rule Existing" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_409
+ # assert req_update[1]['message'] == '409: Meta Rule Existing'
+
+
+def test_update_meta_rule_with_different_combination_but_same_data():
+ meta_rule_name1 = uuid4().hex
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(meta_rule_name1, data=data)
+ assert req.status == hug.HTTP_200
+
+ meta_rule_name2 = uuid4().hex
+ req, meta_rules = add_meta_rules(meta_rule_name2)
+ meta_rule_id2 = next(iter(meta_rules['meta_rules']))
+ data2 = {
+ "name": meta_rule_name2,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+
+ req_update = update_meta_rules(name=meta_rule_name2, metaRuleId=meta_rule_id2,
+ data=data2)
+ assert req_update[0].status == hug.HTTP_200
+
+
+def test_update_meta_rules_without_id():
+ with pytest.raises(exceptions.MetaRuleUnknown) as exception_info:
+ req_update = update_meta_rules("testuser", "")
+ assert "400: Meta Rule Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[0].data["message"] == "400: Meta Rule Unknown"
+
+
+def test_update_meta_rules_without_name():
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req_update = update_meta_rules("<br/>", "1234567")
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[0].data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_update_meta_rules_without_categories():
+ req_update = update_meta_rules_with_categories("testuser" + uuid4().hex)
+ assert req_update[0].status == hug.HTTP_200
+
+
+def test_update_meta_rules_with_empty_categories():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [""],
+ "object_categories": [""],
+ "action_categories": [""]
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status == hug.HTTP_200
+ # assert "400: Subject Category Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Subject Category Unknown'
+
+
+def test_update_meta_rules_with_blank_subject_categories():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser1",
+ "subject_categories": [],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+
+ assert req_update[0].status == hug.HTTP_200
+
+
+def test_update_meta_rules_with_blank_object_categories():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser1",
+ "subject_categories": [subject_category_id],
+ "object_categories": [],
+ "action_categories": [action_category_id]
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+
+ assert req_update[0].status == hug.HTTP_200
+
+
+def test_update_meta_rules_with_blank_action_categories():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser1",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": []
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+
+ assert req_update[0].status == hug.HTTP_200
+
+
+def test_update_meta_rules_with_empty_subject_category():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [""],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status == hug.HTTP_200
+ # assert "400: Subject Category Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Subject Category Unknown'
+
+
+def test_update_meta_rules_with_empty_action_category():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [""]
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status == hug.HTTP_200
+ # assert "400: Action Category Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Action Category Unknown'
+
+
+def test_update_meta_rules_with_empty_object_category():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id],
+ "object_categories": [""],
+ "action_categories": [action_category_id]
+ }
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+
+ assert req_update[0].status == hug.HTTP_200
+ # assert "400: Object Category Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Object Category Unknown'
+
+
+def test_update_meta_rules_with_categories_and_one_empty():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id, ""],
+ "object_categories": [object_category_id, ""],
+ "action_categories": [action_category_id, ""]
+ }
+ with pytest.raises(exceptions.SubjectCategoryUnknown) as exception_info:
+ req_update = update_meta_rules_with_categories("testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert "400: Subject Category Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Subject Category Unknown'
+
+
+def test_add_one_meta_rules_with_different_combination_but_similar_items():
+ meta_rule_name1 = uuid4().hex
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(meta_rule_name1, data=data)
+ assert req.status == hug.HTTP_200
+
+ value = {
+ "name": "name_model",
+ "description": "test",
+ "meta_rules": [next(iter(meta_rules['meta_rules']))]
+ }
+ mode_id = next(iter(model_helper.add_model(value=value)))
+
+ value = {
+ "name": "test_policy" + uuid4().hex,
+ "model_id": mode_id,
+ "genre": "authz",
+ "description": "test",
+ }
+
+ policy_id = next(iter(policy_helper.add_policies(value=value)))
+
+ data_id_1 = data_builder.create_subject_data(policy_id, subject_category_id1)
+ data_id_2 = data_builder.create_subject_data(policy_id, subject_category_id2)
+ data_id_3 = data_builder.create_object_data(policy_id, object_category_id2)
+ data_id_4 = data_builder.create_object_data(policy_id, object_category_id1)
+ data_id_5 = data_builder.create_action_data(policy_id, action_category_id1)
+ data_id_5 = data_builder.create_action_data(policy_id, action_category_id2)
+
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from falcon import HTTP_200, HTTP_400, HTTP_405, HTTP_409
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+
+ req = hug.test.delete(policy, "policies/{}".format(policy_id), headers=auth_headers)
+ assert req.status == HTTP_200
+
+
+def test_update_meta_rules_with_blank_action_categories_assigned_to_used_model():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_manager.api import meta_rules
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies_list = policy_helper.add_policies_with_model()
+ policy_id = list(policies_list.keys())[0]
+ model_id = policies_list[policy_id]['model_id']
+ models_list = model_helper.get_models(model_id=model_id)
+ meta_rule_id = models_list[model_id]["meta_rules"][0]
+ meta_rules_list = meta_rule_helper.get_meta_rules(meta_rule_id=meta_rule_id);
+ data = meta_rules_list[meta_rule_id]
+
+ data["action_categories"] = []
+
+ with pytest.raises(exceptions.MetaRuleUpdateError) as exception_info:
+ hug.test.patch(meta_rules, "/meta_rules/{}".format(meta_rule_id), body=data,
+ headers=auth_headers)
+ assert "400: Meta_Rule Update Error" == str(exception_info.value)
diff --git a/moon_manager/tests/unit_python/api/test_models.py b/moon_manager/tests/unit_python/api/test_models.py
index 3c205d1d..569fe1b4 100644
--- a/moon_manager/tests/unit_python/api/test_models.py
+++ b/moon_manager/tests/unit_python/api/test_models.py
@@ -1,67 +1,475 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
import json
-import api.utilities as utilities
+import hug
+import pytest
+from moon_utilities import exceptions
+from helpers import data_builder as builder
+from helpers import policy_helper
+from helpers import model_helper
+from uuid import uuid4
+
+
+def get_models():
+ from moon_manager.api import models
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(models, "/models", headers=auth_headers)
+ models = req.data
+ return req, models
+
+
+def add_models(name, headers, data=None, ):
+ from moon_manager.api import models
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = \
+ builder.create_new_meta_rule()
+ if not data:
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.post(models, "/models", body=json.dumps(data),
+ headers=headers)
+ models = req.data
+ return req, models
+
+
+def update_model(name, model_id, headers):
+ from moon_manager.api import models
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = \
+ builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data),
+ headers=headers)
+ if req.status == hug.HTTP_405:
+ return req
+ models = req.data
+ return req, models
+
+
+def add_model_without_meta_rules_ids(name, headers):
+ from moon_manager.api import models
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": []
+ }
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.post(models, "/models", body=json.dumps(data),
+ headers=headers)
+ models = req.data
+ return req, models
-def get_models(client):
- req = client.get("/models")
- models = utilities.get_json(req.data)
+def add_model_with_empty_meta_rule_id(name, headers):
+ from moon_manager.api import models
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [""]
+ }
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.post(models, "/models", body=json.dumps(data),
+ headers=headers)
+ models = req.data
return req, models
-def add_models(client, name):
+def update_model_without_meta_rules_ids(model_id, headers):
+ from moon_manager.api import models
+ name = "model_id" + uuid4().hex
data = {
"name": name,
"description": "description of {}".format(name),
- "meta_rules": ["meta_rule_id1", "meta_rule_id2"]
+ "meta_rules": []
}
- req = client.post("/models", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- models = utilities.get_json(req.data)
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data),
+ headers=headers)
+ models = req.data
return req, models
-def delete_models(client, name):
- request, models = get_models(client)
+def delete_models(name, headers):
+ request, models = get_models()
for key, value in models['models'].items():
if value['name'] == name:
- req = client.delete("/models/{}".format(key))
+ from moon_manager.api import models
+ req = hug.test.delete(models, "/models/{}".format(key), headers=headers)
break
return req
-def delete_models_without_id(client):
- req = client.delete("/models/{}".format(""))
+def delete_models_without_id(headers):
+ from moon_manager.api import models
+ req = hug.test.delete(models, "/models/{}".format(""), headers=headers)
return req
+def clean_models(headers):
+ req, models = get_models()
+ for key, value in models['models'].items():
+ print(key)
+ print(value)
+ from moon_manager.api import models
+ hug.test.delete(models, "/models/{}".format(key), headers=headers)
+
+
+def test_delete_model_assigned_to_policy():
+ policy_name = "testuser" + uuid4().hex
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ from moon_manager.api import policy
+ from moon_manager.api import models
+ from moon_utilities.auth_functions import get_api_key_for_user
+ headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'}
+ hug.test.post(policy, "/policies", body=json.dumps(data), headers=headers)
+ with pytest.raises(exceptions.DeleteModelWithPolicy) as exception_info:
+ req = hug.test.delete(models, "/models/{}".format(model_id), headers={"X-Api-Key":
+ get_api_key_for_user("admin")})
+ assert "400: Model With Policy Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Model With Policy Error'
+
+
def test_get_models():
- client = utilities.register_client()
- req, models= get_models(client)
- assert req.status_code == 200
+ req, models = get_models()
+ assert req.status == hug.HTTP_200
assert isinstance(models, dict)
assert "models" in models
def test_add_models():
- client = utilities.register_client()
- req, models = add_models(client, "testuser")
- assert req.status_code == 200
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req, models = add_models("testuser", auth_headers)
+ assert req.status == hug.HTTP_200
assert isinstance(models, dict)
- value = list(models["models"].values())[0]
+ model_id = list(models["models"])[0]
assert "models" in models
- assert value['name'] == "testuser"
- assert value["description"] == "description of {}".format("testuser")
- assert value["meta_rules"][0] == "meta_rule_id1"
+ assert models['models'][model_id]['name'] == "testuser"
+ assert models['models'][model_id]["description"] == "description of {}".format("testuser")
+
+
+def test_add_models_with_meta_rule_has_blank_subject():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ name = "testuser1"
+ from moon_manager.api import models
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = \
+ builder.create_new_meta_rule(empty="subject")
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ auth_headers['Content-Type'] = 'application/json'
+ req = hug.test.post(models, "/models", body=json.dumps(data),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_200
+
+
+def test_add_models_with_meta_rule_has_blank_object():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ name = "testuser1"
+ from moon_manager.api import models
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = \
+ builder.create_new_meta_rule(empty="object")
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ auth_headers['Content-Type'] = 'application/json'
+ req = hug.test.post(models, "/models", body=json.dumps(data),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_200
+
+
+def test_add_models_with_meta_rule_has_blank_action():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ name = "testuser1"
+ from moon_manager.api import models
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = \
+ builder.create_new_meta_rule(empty="action")
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ auth_headers['Content-Type'] = 'application/json'
+ req = hug.test.post(models, "/models", body=json.dumps(data),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_200
def test_delete_models():
- client = utilities.register_client()
- req = delete_models(client, "testuser")
- assert req.status_code == 200
+ name = uuid4().hex + "testuser"
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ add_models(name, auth_headers)
+ req = delete_models(name, headers=auth_headers)
+ assert req.status == hug.HTTP_200
+
+
+def test_update_models_with_assigned_policy():
+ from moon_manager.api import models
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+ value = {
+ "name": "test_policy" + uuid4().hex,
+ "model_id": model_id,
+ "description": "test",
+ }
+ policy_helper.add_policies(value=value)
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ "meta_rules": []
+ }
+ from moon_utilities.auth_functions import get_api_key_for_user
+ headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'}
+ with pytest.raises(exceptions.DeleteModelWithPolicy) as exception_info:
+ req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data),
+ headers=headers)
+ assert "400: Model With Policy Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Model With Policy Error"
+
+
+def test_update_models_with_no_assigned_policy():
+ from moon_manager.api import models
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ "meta_rules": []
+ }
+ from moon_utilities.auth_functions import get_api_key_for_user
+ headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'}
+ req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data),
+ headers=headers)
+ assert req.status == hug.HTTP_200
+
+
+def test_update_models_without_meta_rule_key():
+ from moon_manager.api import models
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ }
+ from moon_utilities.auth_functions import get_api_key_for_user
+ headers = {"X-Api-Key": get_api_key_for_user("admin"), 'Content-Type': 'application/json'}
+ with pytest.raises(exceptions.MetaRuleUnknown) as exception_info:
+ req = hug.test.patch(models, "/models/{}".format(model_id), body=json.dumps(data),
+ headers=headers)
+ assert "400: Meta Rule Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Meta Rule Unknown"
def test_delete_models_without_id():
- client = utilities.register_client()
- req = delete_models_without_id(client)
- assert req.status_code == 500
+ from moon_utilities.auth_functions import get_api_key_for_user
+ headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = delete_models_without_id(headers=headers)
+ assert req.status == hug.HTTP_405
+
+
+def test_add_model_with_empty_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, models = add_models("<br/>", headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_model_with_name_contain_space():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req, models = add_models("test<br>user", headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_model_with_name_space():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ with pytest.raises(exceptions.ModelContentError) as exception_info:
+ req, models = add_models(" ", headers=auth_headers)
+ assert "400: Model Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Model Unknown'
+
+
+def test_add_model_with_empty_meta_rule_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ with pytest.raises(exceptions.MetaRuleUnknown) as exception_info:
+ req, meta_rules = add_model_with_empty_meta_rule_id("testuser", headers=auth_headers)
+ assert "400: Meta Rule Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Meta Rule Unknown'
+
+def test_add_model_with_existed_name():
+ name = uuid4().hex
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req, models = add_models(name, headers=auth_headers)
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.ModelExisting) as exception_info:
+ req, models = add_models(name, headers=auth_headers)
+ assert "409: Model Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Model Error'
+
+
+def test_add_model_with_existed_meta_rules_list():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ name = uuid4().hex
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = \
+ builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ name = uuid4().hex
+ req, models = add_models(name=name, headers=auth_headers, data=data)
+ assert req.status == hug.HTTP_200
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ with pytest.raises(exceptions.ModelExisting) as exception_info:
+ req, models = add_models(name=name, headers=auth_headers, data=data)
+ assert "409: Model Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Model Error'
+
+
+def test_add_model_without_meta_rules():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req, meta_rules = add_model_without_meta_rules_ids("testuser", headers=auth_headers)
+ assert req.status == hug.HTTP_200
+
+
+def test_update_model():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req = add_models("testuser", headers=auth_headers)
+ model_id = list(req[1]['models'])[0]
+ req_update = update_model("testuser", model_id, headers=auth_headers)
+ assert req_update[0].status == hug.HTTP_200
+ model_id = list(req_update[1]["models"])[0]
+ assert req_update[1]["models"][model_id]["meta_rules"][0] is not None
+ delete_models("testuser", headers=auth_headers)
+
+
+def test_update_model_name_with_space():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req = add_models("testuser", headers=auth_headers)
+ model_id = list(req[1]['models'])[0]
+ with pytest.raises(exceptions.ModelContentError) as exception_info:
+ req_update = update_model(" ", model_id, headers=auth_headers)
+ assert "400: Model Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]["message"] == '400: Model Unknown'
+
+
+def test_update_model_with_empty_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req = add_models("testuser", headers=auth_headers)
+ model_id = list(req[1]['models'])[0]
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.ModelContentError) as exception_info:
+ req_update = update_model("", model_id, headers=auth_headers)
+ assert "400: Model Unknown" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[1]['message'] == '400: Model Unknown'
+
+
+def test_update_meta_rules_without_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ clean_models(headers=auth_headers)
+ req_update = update_model("testuser", "", headers=auth_headers)
+ assert req_update.status == hug.HTTP_405
+
+
+def test_update_meta_rules_without_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req_update = update_model("<a></a>", "1234567", headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req_update[0].status == hug.HTTP_400
+ # assert req_update[0].data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_update_meta_rules_without_meta_rules():
+ value = {
+ "name": "mls_model_id" + uuid4().hex,
+ "description": "test",
+ "meta_rules": []
+ }
+ model = model_helper.add_model(value=value)
+ model_id = list(model.keys())[0]
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req_update = update_model_without_meta_rules_ids(model_id, headers=auth_headers)
+ assert req_update[0].status == hug.HTTP_200
diff --git a/moon_manager/tests/unit_python/api/test_nova.py b/moon_manager/tests/unit_python/api/test_nova.py
new file mode 100644
index 00000000..10118cc3
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_nova.py
@@ -0,0 +1,58 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+
+def create_project(tenant_dict):
+ from moon_manager.pip_driver import InformationManager
+ return InformationManager["objects"][0].create_project(**tenant_dict)
+
+
+def list_projects():
+ from moon_manager.pip_driver import InformationManager
+ return InformationManager["objects"][0].get_projects()
+
+
+def list_objects():
+ from moon_manager.pip_driver import InformationManager
+ print(f"IM : {InformationManager['objects'][0].driver.__dict__}")
+ return InformationManager["objects"][0].get_items()
+
+
+def test_create_project():
+ tenant_dict = {
+ "description": "test_project",
+ "domain": ['domain_id_1'],
+ "enabled": True,
+ "is_domain": False,
+ "name": 'project_1'
+ }
+ project = create_project(tenant_dict)
+ assert project
+ assert project.get('name') == tenant_dict.get('name')
+
+
+def test_list_objects():
+ objects = list_objects()
+ assert objects
+ assert objects["servers"][0].get('name') == "vm1"
+
+# TODO TO BE UPDATED
+# def test_create_project_without_name():
+# tenant_dict = {
+# "description": "test_project",
+# "domain_id": ['domain_id_1'],
+# "enabled": True,
+# "is_domain": False,
+# }
+# with pytest.raises(Exception) as exception_info:
+# create_project(tenant_dict)
+# assert '400: Keystone project error' == str(exception_info.value)
diff --git a/moon_manager/tests/unit_python/api/test_pdp.py b/moon_manager/tests/unit_python/api/test_pdp.py
index a2d0cb5a..32b75726 100644
--- a/moon_manager/tests/unit_python/api/test_pdp.py
+++ b/moon_manager/tests/unit_python/api/test_pdp.py
@@ -1,62 +1,479 @@
-import json
-import api.utilities as utilities
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from falcon import HTTP_200, HTTP_400, HTTP_405
+import hug
import pytest
+from moon_utilities import exceptions
+from uuid import uuid4
+from helpers import data_builder as builder
+
+
+def test_get_pdp():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ req = hug.test.get(pdp, 'pdp/', headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ assert "pdps" in req.data
+
+def test_add_pdp_invalid_security_pipeline(mocker):
+ from moon_manager.api import pdp
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data_no_pipeline = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ data_no_project_no_pipeline = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [],
+ "vim_project_id": None,
+ "description": "description of testuser"
+ }
+ data_no_project = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [policy_id],
+ "vim_project_id": None,
+ "description": "description of testuser"
+ }
+
+ req = hug.test.post(pdp, "pdp/", data_no_project_no_pipeline, headers=auth_headers)
+ assert req.status == HTTP_200
+
+ with pytest.raises(exceptions.PdpContentError) as exception_info:
+ req = hug.test.post(pdp, "pdp/", data_no_pipeline, headers=auth_headers)
+ assert "400: Pdp Error" == str(exception_info.value)
+
+ with pytest.raises(exceptions.PdpContentError) as exception_info:
+ req = hug.test.post(pdp, "pdp/", data_no_project, headers=auth_headers)
+ assert "400: Pdp Error" == str(exception_info.value)
+
+def test_update_pdp_invalid_security_pipeline(mocker):
+ from moon_manager.api import pdp
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data_no_pipeline = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ data_no_project_no_pipeline = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [],
+ "vim_project_id": None,
+ "description": "description of testuser"
+ }
+ data_no_project = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [policy_id],
+ "vim_project_id": None,
+ "description": "description of testuser"
+ }
+
+ data_valid = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [policy_id],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ req = hug.test.post(pdp, "pdp/", data_valid, headers=auth_headers)
+ assert req.status == HTTP_200
+ pip_id = list(req.data['pdps'])[0]
+
+ req = hug.test.patch(pdp, "pdp/{}".format(pip_id), data_no_project_no_pipeline, headers=auth_headers)
+ assert req.status == HTTP_200
+ with pytest.raises(exceptions.PdpContentError) as exception_info:
+ req = hug.test.patch(pdp, "pdp/{}".format(pip_id), data_no_pipeline, headers=auth_headers)
+ assert "400: Pdp Error" == str(exception_info.value)
-def get_pdp(client):
- req = client.get("/pdp")
- pdp = utilities.get_json(req.data)
- return req, pdp
+ with pytest.raises(exceptions.PdpContentError) as exception_info:
+ req = hug.test.patch(pdp, "pdp/{}".format(pip_id), data_no_project, headers=auth_headers)
+ assert "400: Pdp Error" == str(exception_info.value)
+
+def test_add_pdp(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [policy_id],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ req = hug.test.post(pdp, "pdp/", data, headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ found = False
+ assert "pdps" in req.data
+ for value in req.data["pdps"].values():
+ if value['name'] == data['name']:
+ found = True
+ assert value["description"] == "description of {}".format("testuser")
+ assert value["vim_project_id"] == "vim_project_id"
+ break
+ assert found
-def add_pdp(client, data):
- req = client.post("/pdp", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- pdp = utilities.get_json(req.data)
- return req, pdp
+def test_add_pdp_name_existed(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "security_pipeline": [policy_id1],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ req = hug.test.post(pdp, "pdp/", data, headers=auth_headers)
+ assert req.status == HTTP_200
-def delete_pdp(client, key):
- req = client.delete("/pdp/{}".format(key))
- return req
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data = {
+ "name": name,
+ "security_pipeline": [policy_id2],
+ "vim_project_id": "vim_project_id" + uuid4().hex,
+ "description": "description of testuser" + uuid4().hex
+ }
+ with pytest.raises(exceptions.PdpExisting) as exception_info:
+ req = hug.test.post(pdp, "pdp/", data, headers=auth_headers)
+ assert "409: Pdp Error" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data['message'] == '409: Pdp Error'
-def delete_pdp_without_id(client):
- req = client.delete("/pdp/{}".format(""))
- return req
+def test_add_pdp_policy_used(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
-def test_get_pdp():
- client = utilities.register_client()
- req, pdp = get_pdp(client)
- assert req.status_code == 200
- assert isinstance(pdp, dict)
- assert "pdps" in pdp
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [policy_id1],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ req = hug.test.post(pdp, "pdp/", data, headers=auth_headers)
+ assert req.status == HTTP_200
+
+ name_uuid = "testuser" + uuid4().hex
+ data = {
+ "name": name_uuid,
+ "security_pipeline": [policy_id1],
+ "vim_project_id": "vim_project_id " + name_uuid,
+ "description": "description of testuser " + name_uuid
+ }
+ with pytest.raises(exceptions.PdpInUse) as exception_info:
+ req = hug.test.post(pdp, "pdp/", data, headers=auth_headers)
+ assert "400: Pdp Inuse" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data['message'] == '409: Pdp Conflict'
-def test_add_pdp():
+
+def test_delete_pdp(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
data = {
+ "name": "testuser" + uuid4().hex,
+ "security_pipeline": [policy_id],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+ req = hug.test.post(pdp, "pdp/", data, headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ req = hug.test.get(pdp, 'pdp/', headers=auth_headers)
+ success_req = None
+ for key, value in req.data['pdps'].items():
+ if value['name'] == data['name']:
+ success_req = hug.test.delete(pdp, 'pdp/{}'.format(key), headers=auth_headers)
+ break
+ assert success_req
+ assert success_req.status == HTTP_200
+
+
+# Fixme: should re-enabled the input validation for those tests
+# def test_add_pdp_with_forbidden_char_in_user():
+# data = {
+# "name": "<a>",
+# "security_pipeline": ["policy_id_1", "policy_id_2"],
+# "vim_project_id": "vim_project_id",
+# "description": "description of testuser"
+# }
+# req = hug.test.post(pdp, "pdp/", data)
+# assert req.status == HTTP_400
+# print(req.data)
+# assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+#
+#
+# def test_add_pdp_with_forbidden_char_in_keystone():
+# data = {
+# "name": "testuser",
+# "security_pipeline": ["policy_id_1", "policy_id_2"],
+# "vim_project_id": "<a>",
+# "description": "description of testuser"
+# }
+# req = hug.test.post(pdp, "pdp/", data)
+# assert req.status == 400
+# assert req.data["message"] == "Key: 'vim_project_id', [Forbidden characters in string]"
+
+
+def test_update_pdp(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data_add = {
"name": "testuser",
+ "security_pipeline": [policy_id],
+ "vim_project_id": "vim_project_id",
+ "description": "description of testuser"
+ }
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id_update = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data_update = {
+ "name": "testuser_updated",
+ "security_pipeline": [policy_id_update],
+ "vim_project_id": "vim_project_id_update",
+ "description": "description of testuser_updated"
+ }
+ req = hug.test.post(pdp, "pdp/", data_add, headers=auth_headers)
+ pdp_id = list(req.data['pdps'])[0]
+ req_update = hug.test.patch(pdp, "pdp/{}".format(pdp_id), data_update, headers=auth_headers)
+ assert req_update.status == HTTP_200
+ value = list(req_update.data["pdps"].values())[0]
+ assert value["vim_project_id"] == data_update["vim_project_id"]
+ assert value["description"] == data_update["description"]
+ assert value["name"] == data_update['name']
+ assert value["security_pipeline"] == data_update['security_pipeline']
+ req = hug.test.get(pdp, 'pdp/', headers=auth_headers)
+ for key, value in req.data['pdps'].items():
+ if value['name'] == "testuser":
+ hug.test.delete(pdp, 'pdp/{}'.format(key), headers=auth_headers)
+ break
+
+
+def test_update_pdp_without_id(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ req = hug.test.patch(pdp, "pdp/", "testuser", headers=auth_headers)
+ assert req.status == HTTP_405
+ # assert req.data["message"] == 'Invalid Key :name not found'
+
+
+def test_update_pdp_without_user(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ data = {
+ "name": "",
"security_pipeline": ["policy_id_1", "policy_id_2"],
- "keystone_project_id": "keystone_project_id",
+ "vim_project_id": "vim_project_id",
"description": "description of testuser"
}
- client = utilities.register_client()
- req, pdp = add_pdp(client, data)
- assert req.status_code == 200
- assert isinstance(pdp, dict)
- value = list(pdp["pdps"].values())[0]
- assert "pdps" in pdp
- assert value['name'] == "testuser"
- assert value["description"] == "description of {}".format("testuser")
- assert value["keystone_project_id"] == "keystone_project_id"
+ req = hug.test.patch(pdp, "pdp/<a>", data, headers=auth_headers)
+ assert req.status == HTTP_400
+ print(req.data)
+ assert req.data["errors"] == {'uuid': 'Invalid UUID provided'}
-def test_delete_pdp():
- client = utilities.register_client()
- request, pdp = get_pdp(client)
- for key, value in pdp['pdps'].items():
- if value['name'] == "testuser":
- success_req = delete_pdp(client, key)
+def test_update_pdp_name_existed(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ uuid1 = uuid4().hex
+ data1 = {
+ "name": "testuser1" + uuid1,
+ "security_pipeline": [policy_id1],
+ "vim_project_id": "vim_project_id" + uuid1,
+ "description": "description of testuser1" + uuid1
+ }
+ req = hug.test.post(pdp, "pdp/", data1, headers=auth_headers)
+ assert req.status == HTTP_200
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+
+ uuid2 = uuid4().hex
+ data2 = {
+ "name": "testuser2" + uuid2,
+ "security_pipeline": [policy_id2],
+ "vim_project_id": "vim_project_id" + uuid2,
+ "description": "description of testuser2" + uuid2
+ }
+ req = hug.test.post(pdp, "pdp/", data2, headers=auth_headers)
+ pdp_id = list(req.data['pdps'])[0]
+ for item in list(req.data['pdps']):
+ if req.data['pdps'][item]['name']==data2['name']:
+ pdp_id=item
+ break
+ data2['name'] = data1['name']
+ with pytest.raises(exceptions.PdpExisting) as exception_info:
+ req_update = hug.test.patch(pdp, "pdp/{}".format(pdp_id), data2, headers=auth_headers)
+ # assert req_update.data['message'] == '409: Pdp Error'
+ assert "409: Pdp Error" == str(exception_info.value)
+
+
+
+def test_update_pdp_policy_used(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import pdp
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:20000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id1 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ uuid1 = uuid4().hex
+ data1 = {
+ "name": "testuser1" + uuid1,
+ "security_pipeline": [policy_id1],
+ "vim_project_id": "vim_project_id" + uuid1,
+ "description": "description of testuser1" + uuid1
+ }
+ req = hug.test.post(pdp, "pdp/", data1, headers=auth_headers)
+ assert req.status == HTTP_200
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id2 = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+
+ uuid2 = uuid4().hex
+ data2 = {
+ "name": "testuser2" + uuid2,
+ "security_pipeline": [policy_id2],
+ "vim_project_id": "vim_project_id" + uuid2,
+ "description": "description of testuser2" + uuid2
+ }
+ req = hug.test.post(pdp, "pdp/", data2, headers=auth_headers)
+ pdp_id = list(req.data['pdps'])[0]
+ for item in list(req.data['pdps']):
+ if req.data['pdps'][item]['name']==data2['name']:
+ pdp_id=item
break
- assert success_req.status_code == 200
+ data2['security_pipeline'] = data1['security_pipeline']
+
+ with pytest.raises(exceptions.PdpInUse) as exception_info:
+ req_update = hug.test.patch(pdp, "pdp/{}".format(pdp_id), data2, headers=auth_headers)
+ assert "400: Pdp Inuse" == str(exception_info.value)
+ # assert req_update.data['message'] == '409: Pdp Conflict'
+
+
diff --git a/moon_manager/tests/unit_python/api/test_perimeter.py b/moon_manager/tests/unit_python/api/test_perimeter.py
index db09780f..c741adf7 100644
--- a/moon_manager/tests/unit_python/api/test_perimeter.py
+++ b/moon_manager/tests/unit_python/api/test_perimeter.py
@@ -1,157 +1,1304 @@
-# import moon_manager
-# import moon_manager.api
-import json
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
import api.utilities as utilities
+from helpers import data_builder as builder
+import helpers.policy_helper as policy_helper
+from uuid import uuid4
+import pytest
+from moon_utilities import exceptions
-def get_subjects(client):
- req = client.get("/subjects")
- assert req.status_code == 200
+def get_subjects(subject_id=None):
+ from moon_manager.api import perimeter
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ if not subject_id:
+ req = hug.test.get(perimeter, 'subjects/', headers=auth_headers)
+ else:
+ req = hug.test.get(perimeter, 'subjects/{}'.format(subject_id), headers=auth_headers)
subjects = utilities.get_json(req.data)
+ return req, subjects
+
+
+def add_subjects(policy_id, name, perimeter_id=None, data=None, auth_headers=None):
+ from moon_manager.api import perimeter
+ if not data:
+ name = name + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ if not perimeter_id:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(policy_id),
+ body=data, headers=auth_headers)
+ else:
+ req = hug.test.post(perimeter, "/policies/{}/subjects/{}".format(policy_id, perimeter_id),
+ body=data, headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ return req, subjects
+
+
+def delete_subjects_without_perimeter_id(auth_headers=None):
+ from moon_manager.api import perimeter
+ req = hug.test.delete(perimeter, "/subjects/{}".format(""), headers=auth_headers)
+ return req
+
+
+def test_perimeter_get_subject():
+ req, subjects = get_subjects()
+ assert req.status == hug.HTTP_200
assert isinstance(subjects, dict)
assert "subjects" in subjects
- return subjects
-def add_subjects(client, name):
+def test_perimeter_add_subject():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value["name"]
+ assert value["email"]
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ name = "testuser"
+ perimeter_id = uuid4().hex
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ add_subjects(policy_id1, data['name'], perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ req, subjects = add_subjects(policy_id2, data['name'],
+ perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value["name"]
+ assert value["email"]
+ assert len(value['policy_list']) == 2
+ assert policy_id1 in value['policy_list']
+ assert policy_id2 in value['policy_list']
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_different_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ add_subjects(policy_id1, "testuser", perimeter_id=perimeter_id, auth_headers=auth_headers)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req, subjects = add_subjects(policy_id2, "testuser", perimeter_id=perimeter_id,
+ auth_headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_subject_name_with_new_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ name = "testuser" + uuid4().hex
data = {
"name": name,
"description": "description of {}".format(name),
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- req = client.post("/subjects", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 200
- subjects = utilities.get_json(req.data)
- assert isinstance(subjects, dict)
- key = list(subjects["subjects"].keys())[0]
+ req, subjects = add_subjects(policy_id1, None, perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
value = list(subjects["subjects"].values())[0]
- assert "subjects" in subjects
- assert key == "1111111111111"
- assert value['id'] == "1111111111111"
- assert value['name'] == name
- assert value["description"] == "description of {}".format(name)
- assert value["email"] == "{}@moon".format(name)
- return subjects
+ data = {
+ "name": value['name'],
+ "description": "description of {}".format(value['name']),
+ "password": "password for {}".format(value['name']),
+ "email": "{}@moon".format(value['name'])
+ }
+ req, subjects = add_subjects(policy_id2, None, data=data, auth_headers=auth_headers)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value["name"]
+ assert value["email"]
+ assert len(value['policy_list']) == 2
+ assert policy_id1 in value['policy_list']
+ assert policy_id2 in value['policy_list']
-def add_subjects_without_name(client, name):
+def test_perimeter_add_same_subject_name_with_same_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ name = "testuser" + uuid4().hex
data = {
"name": name,
"description": "description of {}".format(name),
"password": "password for {}".format(name),
"email": "{}@moon".format(name)
}
- req = client.post("/subjects", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 500
+ req, subjects = add_subjects(policy_id1, None, perimeter_id=perimeter_id,
+ data=data, auth_headers=auth_headers)
+ value = list(subjects["subjects"].values())[0]
+ data = {
+ "name": value['name'],
+ "description": "description of {}".format(value['name']),
+ "password": "password for {}".format(value['name']),
+ "email": "{}@moon".format(value['name'])
+ }
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, subjects = add_subjects(policy_id1, None, data=data, auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
-def delete_subject(client, name):
- subjects = get_subjects(client)
- for key, value in subjects['subjects'].items():
- if value['name'] == name:
- req = client.delete("/subjects/{}".format(key))
- assert req.status_code == 200
- break
- subjects = get_subjects(client)
- assert name not in [x['name'] for x in subjects["subjects"].values()]
+def test_perimeter_add_same_subject_perimeter_id_with_existed_policy_id_in_list():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ subj_id = "b34e5a29-5494-4cc5-9356-daa244b8c254"
+ req, subjects = get_subjects(subj_id)
+ if subjects['subjects']:
+ for __policy_id in subjects['subjects'][subj_id]['policy_list']:
+ req = hug.test.delete(perimeter,
+ "/policies/{}/subjects/{}".format(__policy_id, subj_id),
+ headers=auth_headers)
+ req, subjects = add_subjects(policy_id, name, data=data, auth_headers=auth_headers)
+ perimeter_id = list(subjects["subjects"].values())[0]['id']
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, subjects = add_subjects(policy_id, name, perimeter_id=perimeter_id, data=data,
+ auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_subject_invalid_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ name = "testuser"
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req, subjects = add_subjects( policy_id + "0", "testuser", data, auth_headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_subject_blank_data():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ with pytest.raises(exceptions.ValidationKeyError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(policy_id), body={'test':"aa"},
+ headers=auth_headers)
+ assert "Invalid Key :name not found" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == 'Invalid Key :name not found'
+
+
+def test_perimeter_add_subject_with_forbidden_char_in_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "<a>",
+ "description": "description of {}".format(""),
+ "password": "password for {}".format(""),
+ "email": "{}@moon".format("")
+ }
+ subj_id = "a34e5a29-5494-4cc5-9356-daa244b8c888"
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(subj_id), body=data,
+ headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_update_subject_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_subject_description():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update",
+ }
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+def test_perimeter_update_subject_description_and_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
-def test_subject():
- client = utilities.register_client()
- get_subjects(client)
- add_subjects(client, "testuser")
- add_subjects_without_name(client, "")
- delete_subject(client, "testuser")
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update",
+ 'name': value1['name'] + "update"
+ }
+ from moon_manager.api import perimeter
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_subject_wrong_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, subjects = add_subjects(policy_id=policy_id1, name='testuser', data=data,
+ auth_headers=auth_headers)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id + "wrong"),
+ body=data, headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
-def get_objects(client):
- req = client.get("/objects")
- assert req.status_code == 200
+def test_perimeter_update_subject_name_with_existed_one():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name1 = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id1 = uuid4().hex
+ req, subjects = add_subjects(policy_id=policy_id1, name=name1,
+ perimeter_id=perimeter_id1, auth_headers=auth_headers)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id2 = uuid4().hex
+ name2 = 'testuser' + uuid4().hex
+ req, subjects = add_subjects(policy_id=policy_id1, name=name2,
+ perimeter_id=perimeter_id2, auth_headers=auth_headers)
+ data = {
+ 'name': value1['name'],
+ }
+ with pytest.raises(exceptions.SubjectExisting) as exception_info:
+ req = hug.test.patch(perimeter, "/subjects/{}".format(perimeter_id2), body=data,
+ headers=auth_headers)
+ assert "409: Subject Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+
+
+def test_perimeter_delete_subject():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(policy_id, "testuser", auth_headers=auth_headers)
+ subject_id = list(subjects["subjects"].values())[0]["id"]
+ req = hug.test.delete(perimeter, "/policies/{}/subjects/{}".format(policy_id, subject_id),
+ headers=auth_headers)
+ assert req.status == hug.HTTP_200
+
+
+def test_perimeter_delete_subjects_without_perimeter_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ with pytest.raises(exceptions.SubjectUnknown) as exception_info:
+ req = delete_subjects_without_perimeter_id(auth_headers)
+ assert "400: Subject Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Subject Unknown"
+
+
+def get_objects():
+ from moon_manager.api import perimeter
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(perimeter, "/objects", headers=auth_headers)
objects = utilities.get_json(req.data)
+ return req, objects
+
+
+def add_objects(name, policyId=None, data=None, perimeter_id=None, auth_headers=None):
+ from moon_manager.api import perimeter
+ if not policyId:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ if not data:
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ }
+ if not perimeter_id:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(policyId), body=data,
+ headers=auth_headers)
+ else:
+ req = hug.test.post(perimeter, "/policies/{}/objects/{}".format(policyId, perimeter_id),
+ body=data, headers=auth_headers)
+
+ objects = utilities.get_json(req.data)
+ return req, objects
+
+
+def delete_objects_without_perimeter_id(auth_headers=None):
+ from moon_manager.api import perimeter
+ req = hug.test.delete(perimeter, "/objects/{}".format(""), headers=auth_headers)
+ return req
+
+
+def test_perimeter_get_object():
+
+ req, objects = get_objects()
+ assert req.status == hug.HTTP_200
assert isinstance(objects, dict)
assert "objects" in objects
- return objects
-def add_objects(client, name):
+def test_perimeter_add_object():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects("testuser", auth_headers=auth_headers)
+ value = list(objects["objects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value['name']
+
+
+def test_perimeter_add_object_with_wrong_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req, objects = add_objects("testuser", policyId='wrong', auth_headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_object_with_policy_id_none():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "description": "description of {}".format("testuser"),
+ }
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(None), body=data,
+ headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_same_object_name_with_new_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects("testuser", auth_headers=auth_headers)
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_object_perimeter_id_with_new_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects( "testuser", auth_headers=auth_headers)
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'],auth_headers=auth_headers)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_object_perimeter_id_with_different_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, objects = add_objects( "testuser", auth_headers=auth_headers)
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'] + 'different',
+ "description": "description of {}".format('testuser'),
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_object_name_with_same_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
data = {
"name": name,
- "description": "description of {}".format(name),
+ "description": "description of {}".format('testuser'),
}
- req = client.post("/objects", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 200
- objects = utilities.get_json(req.data)
- assert isinstance(objects, dict)
- key = list(objects["objects"].keys())[0]
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers)
value = list(objects["objects"].values())[0]
- assert "objects" in objects
- assert value['name'] == name
- assert value["description"] == "description of {}".format(name)
- return objects
+ assert req.status == hug.HTTP_200
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data, auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
-def delete_objects(client, name):
- objects = get_objects(client)
- for key, value in objects['objects'].items():
- if value['name'] == name:
- req = client.delete("/objects/{}".format(key))
- assert req.status_code == 200
- break
- objects = get_objects(client)
- assert name not in [x['name'] for x in objects["objects"].values()]
+def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects( 'testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
+ value = list(objects["objects"].values())[0]
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ perimeter_id=value['id'], auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
-def test_objects():
- client = utilities.register_client()
- get_objects(client)
- add_objects(client, "testuser")
- delete_objects(client, "testuser")
+def test_perimeter_update_object_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
-def get_actions(client):
- req = client.get("/actions")
- assert req.status_code == 200
- actions = utilities.get_json(req.data)
- assert isinstance(actions, dict)
- assert "actions" in actions
- return actions
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
-def add_actions(client, name):
+def test_perimeter_update_object_description():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
data = {
"name": name,
- "description": "description of {}".format(name),
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_object_description_and_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_object_wrong_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data,
+ auth_headers=auth_headers)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id + "wrong"), body=data,
+ headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+
+
+def test_perimeter_update_object_name_with_existed_one():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data1 = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data1,
+ auth_headers=auth_headers)
+ value1 = list(objects["objects"].values())[0]
+
+ name = 'testuser' + uuid4().hex
+
+ data2 = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects('testuser', policyId=policy_id1, data=data2,
+ auth_headers=auth_headers)
+
+ value2 = list(objects["objects"].values())[0]
+ perimeter_id2 = value2['id']
+
+ data3 = {
+ 'name': value1['name']
}
- req = client.post("/actions", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- assert req.status_code == 200
+ with pytest.raises(exceptions.ObjectExisting) as exception_info:
+ req = hug.test.patch(perimeter, "/objects/{}".format(perimeter_id2), body=data3,
+ headers=auth_headers)
+ assert "409: Object Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Object Existing'
+
+
+def test_perimeter_add_object_without_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "<br/>",
+ "description": "description of {}".format(""),
+ }
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_object_blank_data():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ with pytest.raises(exceptions.ValidationKeyError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body={}, headers=auth_headers)
+ assert "Invalid Key :name not found" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == 'Invalid Key :name not found'
+
+
+def test_perimeter_add_object_with_name_contain_spaces():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "test<a>user",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"), body=data,
+ headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_object_with_name_space():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": " ",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects/".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body =data, headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_delete_object():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ object_id = builder.create_object(policy_id)
+ req = hug.test.delete(perimeter, "/policies/{}/objects/{}".format(policy_id, object_id), headers=auth_headers)
+
+ assert req.status == hug.HTTP_200
+
+
+def test_perimeter_delete_objects_without_perimeter_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ with pytest.raises(exceptions.ObjectUnknown) as exception_info:
+ req = delete_objects_without_perimeter_id(auth_headers=auth_headers)
+ assert "400: Object Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Object Unknown"
+
+
+def get_actions():
+ from moon_manager.api import perimeter
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req = hug.test.get(perimeter, "/actions", headers=auth_headers)
+ actions = utilities.get_json(req.data)
+ return req, actions
+
+
+def add_actions(name, policy_id=None, data=None, perimeter_id=None, auth_headers=None):
+ from moon_manager.api import perimeter
+ if not policy_id:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+
+ if not data:
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ }
+ if not perimeter_id:
+ req = hug.test.post(perimeter, "/policies/{}/actions/".format(policy_id), body=data,
+ headers=auth_headers)
+ else:
+ req = hug.test.post(perimeter, "/policies/{}/actions/{}".format(policy_id, perimeter_id),
+ body=data, headers=auth_headers)
+
actions = utilities.get_json(req.data)
+ return req, actions
+
+
+def delete_actions_without_perimeter_id(auth_headers=None):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req = hug.test.delete(perimeter, "/actions/{}".format(""), headers=auth_headers)
+ return req
+
+
+def test_perimeter_get_actions():
+
+ req, actions = get_actions()
+
+ assert req.status == hug.HTTP_200
assert isinstance(actions, dict)
- key = list(actions["actions"].keys())[0]
- value = list(actions["actions"].values())[0]
assert "actions" in actions
- assert value['name'] == name
- assert value["description"] == "description of {}".format(name)
- return actions
-
-
-def delete_actions(client, name):
- actions = get_actions(client)
- for key, value in actions['actions'].items():
- if value['name'] == name:
- req = client.delete("/actions/{}".format(key))
- assert req.status_code == 200
- break
- actions = get_actions(client)
- assert name not in [x['name'] for x in actions["actions"].values()]
-
-
-def test_actions():
- client = utilities.register_client()
- get_actions(client)
- add_actions(client, "testuser")
- delete_actions(client, "testuser")
+
+
+def test_perimeter_add_actions():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value = list(actions["actions"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value['name']
+
+
+def test_perimeter_add_action_with_wrong_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req, actions = add_actions("testuser", policy_id="wrong", auth_headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_action_with_policy_id_none():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "description": "description of {}".format("testuser"),
+ }
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions/".format(None), body=data,
+ headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_same_action_name_with_new_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, action = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ auth_headers=auth_headers)
+ value2 = list(action["actions"].values())[0]
+ assert req.status == hug.HTTP_200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_action_perimeter_id_with_new_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, action = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ value2 = list(action["actions"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_action_perimeter_id_with_different_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req, action = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'] + 'different',
+ "description": "description of {}".format('testuser'),
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_action_name_with_same_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ req, action = add_actions("testuser", policy_id=policy_id1, auth_headers=auth_headers)
+ value1 = list(action["actions"].values())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_action_perimeter_id_with_existed_policy_id_in_list():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ req, action = add_actions("testuser", policy_id=policy_id1, auth_headers=auth_headers)
+ value1 = list(action["actions"].values())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req, action = add_actions('testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'], auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_actions_without_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "<a>",
+ "description": "description of {}".format(""),
+ }
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_actions_with_name_contain_spaces():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "test<a>user",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.ValidationContentError) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "Key: 'name', [Forbidden characters in string]" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_subjects_without_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/subjects".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+def test_add_objects_without_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/objects".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"),
+ body=data, headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+def test_add_action_without_policy_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(perimeter, "/policies/{}/actions".format(
+ "a34e5a29-5494-4cc5-9356-daa244b8c888"), body=data,
+ headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+def test_perimeter_update_action_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_actions_description():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_actions_description_and_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id), body=data,
+ headers=auth_headers)
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+
+ assert req.status == hug.HTTP_200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_action_wrong_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ with pytest.raises(exceptions.PerimeterContentError) as exception_info:
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id + "wrong"), body=data,
+ headers=auth_headers)
+ assert "400: Perimeter content is invalid." == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_update_action_name_with_existed_one():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value1 = list(actions["actions"].values())[0]
+ req, actions = add_actions("testuser", auth_headers=auth_headers)
+ value2 = list(actions["actions"].values())[0]
+ perimeter_id2 = value2['id']
+ data = {
+ 'name': value1['name'],
+ }
+ with pytest.raises(exceptions.ActionExisting) as exception_info:
+ req = hug.test.patch(perimeter, "/actions/{}".format(perimeter_id2), body=data,
+ headers=auth_headers)
+ assert "409: Action Existing" == str(exception_info.value)
+ # assert req.status == hug.HTTP_409
+ # assert req.data["message"] == '409: Action Existing'
+
+
+def test_perimeter_delete_actions():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ action_id = builder.create_action(policy_id)
+ req = hug.test.delete(perimeter, "/policies/{}/actions/{}".format(policy_id, action_id),
+ headers=auth_headers)
+
+
+ assert req.status == hug.HTTP_200
+
+def test_delete_subject_assigned_to_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ from moon_manager.db_driver import PolicyManager
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ subject_id = builder.create_subject(policy_id)
+ PolicyManager.delete_policy(moon_user_id="admin", policy_id=policy_id)
+ PolicyManager.delete_subject(moon_user_id="admin", policy_id=None ,perimeter_id=subject_id)
+
+ req = hug.test.get(perimeter, "subjects/{}".format(subject_id), headers=auth_headers)
+ assert req.data['subjects'] == {}
+
+
+def test_delete_subject_without_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ subject_id = builder.create_subject(policy_id)
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.delete(perimeter, "/subjects/{}".format(subject_id), headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+def test_delete_objects_without_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ object_id = builder.create_object(policy_id)
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.delete(perimeter, "/objects/{}".format(object_id), headers=auth_headers)
+
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+def test_delete_actions_without_policy():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import perimeter
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.delete(perimeter, "/actions/{}".format(action_id), headers=auth_headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+def test_perimeter_delete_actions_without_perimeter_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.ActionUnknown) as exception_info:
+ req = delete_actions_without_perimeter_id(auth_headers=auth_headers)
+ assert "400: Action Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Action Unknown"
diff --git a/moon_manager/tests/unit_python/api/test_perimeter_examples.py b/moon_manager/tests/unit_python/api/test_perimeter_examples.py
new file mode 100644
index 00000000..0598629c
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_perimeter_examples.py
@@ -0,0 +1,55 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import hug
+import json
+
+
+# def test_local_perimeter_get_subject():
+# from moon_manager.api import perimeter
+# subjects = perimeter.Subjects.get()
+# assert isinstance(subjects, dict)
+# assert "subjects" in subjects
+
+
+
+# def test_http_perimeter_post_subject():
+# from moon_manager.api import perimeter
+# result = hug.test.post(perimeter, 'subjects/b34e5a2954944cc59356daa244b8c254',
+# body={'name': 'ha'},
+# headers={'Content-Type': 'application/json'})
+# assert result.status == hug.HTTP_200
+# assert isinstance(result.data, dict)
+# assert "subjects" in result.data
+#
+#
+# def test_http_perimeter_get_subject_2():
+# from moon_manager.api import perimeter
+# result = hug.test.get(perimeter, 'subjects/b34e5a29-5494-4cc5-9356-daa244b8c254')
+# assert result.status == hug.HTTP_200
+# assert isinstance(result.data, dict)
+# assert "subjects" in result.data
+#
+# def test_http_perimeter_get_subject_3():
+# from moon_manager.api import perimeter
+# result = hug.test.get(perimeter, 'policies/b34e5a29-5494-4cc5-9356-daa244b8c254/subjects/')
+# assert result.status == hug.HTTP_200
+# assert isinstance(result.data, dict)
+# assert "subjects" in result.data
+#
+#
+# def test_http_perimeter_get_subject_4():
+# from moon_manager.api import perimeter
+# result = hug.test.get(perimeter, 'policies/b34e5a29-5494-4cc5-9356-daa244b8c254/subjects/b34e5a29-5494-4cc5-9356-daa244b8c254')
+# assert result.status == hug.HTTP_200
+# assert isinstance(result.data, dict)
+# assert "subjects" in result.data
diff --git a/moon_manager/tests/unit_python/api/test_policies.py b/moon_manager/tests/unit_python/api/test_policies.py
index 4d4e387e..a07ba725 100644
--- a/moon_manager/tests/unit_python/api/test_policies.py
+++ b/moon_manager/tests/unit_python/api/test_policies.py
@@ -1,69 +1,424 @@
-import json
-import api.utilities as utilities
+# Software Name: MOON
+# Version: 5.4
-def get_policies(client):
- req = client.get("/policies")
- policies = utilities.get_json(req.data)
- return req, policies
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
-def add_policies(client, name):
+
+from falcon import HTTP_200, HTTP_400, HTTP_405, HTTP_409
+import hug
+from uuid import uuid4
+import pytest
+from moon_utilities import exceptions
+from helpers import model_helper
+from helpers import policy_helper
+
+
+def get_policies(auth_headers):
+ from moon_manager.api import policy
+ req = hug.test.get(policy, "policies", headers=auth_headers)
+ return req
+
+
+def add_policies(name, auth_headers):
+ from moon_manager.api import policy
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
data = {
"name": name,
"description": "description of {}".format(name),
- "model_id": "modelId",
+ "model_id": model_id,
"genre": "genre"
}
- req = client.post("/policies", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- policies = utilities.get_json(req.data)
- return req, policies
-
-
-def delete_policies(client, name):
- request, policies = get_policies(client)
- for key, value in policies['policies'].items():
- if value['name'] == name:
- req = client.delete("/policies/{}".format(key))
- break
+ req = hug.test.post(policy, "policies", data, headers=auth_headers)
return req
-def delete_policies_without_id(client):
- req = client.delete("/policies/{}".format(""))
+def delete_policies_without_id(auth_headers):
+ from moon_manager.api import policy
+ req = hug.test.delete(policy, "policies/{}".format(""), headers=auth_headers)
return req
def test_get_policies():
- client = utilities.register_client()
- req, policies = get_policies(client)
- assert req.status_code == 200
- assert isinstance(policies, dict)
- assert "policies" in policies
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = get_policies(auth_headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ assert "policies" in req.data
def test_add_policies():
- client = utilities.register_client()
- req, policies = add_policies(client, "testuser")
- assert req.status_code == 200
- assert isinstance(policies, dict)
- value = list(policies["policies"].values())[0]
- assert "policies" in policies
- assert value['name'] == "testuser"
- assert value["description"] == "description of {}".format("testuser")
- assert value["model_id"] == "modelId"
- assert value["genre"] == "genre"
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policy_name = "testuser" + uuid4().hex
+ req = add_policies(policy_name, auth_headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ value = list(req.data["policies"].values())[0]
+ assert "policies" in req.data
+ assert value['name'] == policy_name
+ assert value["description"] == "description of {}".format(policy_name)
+
+
+def test_add_policies_without_model():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ policy_name = "testuser" + uuid4().hex
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+ req = hug.test.post(policy, "policies/", data, headers=auth_headers)
+
+ assert req.status == HTTP_200
+
+
+def test_add_policies_with_same_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ name = uuid4().hex
+ policy_name = name
+ req = add_policies(policy_name, auth_headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ value = list(req.data["policies"].values())[0]
+ assert "policies" in req.data
+ assert value['name'] == policy_name
+ assert value["description"] == "description of {}".format(policy_name)
+ with pytest.raises(exceptions.PolicyExisting) as exception_info:
+ req = add_policies(policy_name, auth_headers=auth_headers)
+ assert "409: Policy Already Exists" == str(exception_info.value)
+ # assert req.status == HTTP_409
+ # assert req.data["message"] == '409: Policy Already Exists'
+
+
+def test_add_policy_with_empty_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policy_name = ""
+ with pytest.raises(exceptions.PolicyContentError) as exception_info:
+ req = add_policies(policy_name, auth_headers=auth_headers)
+ assert "400: Policy Content Error" == str(exception_info.value)
+ # assert req.status == HTTP_400
+ # assert req.data["message"] == '400: Policy Content Error'
+
+
+def test_add_policy_with_model_has_no_meta_rule():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_manager.api import policy
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policy_name = "testuser" + uuid4().hex
+ req = model_helper.add_model_without_meta_rule()
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ with pytest.raises(exceptions.MetaRuleUnknown) as exception_info:
+ hug.test.post(policy, "policies/", data, headers=auth_headers)
+ assert "400: Meta Rule Unknown" == str(exception_info.value)
+
+
+def test_add_policy_with_model_has_blank_subject_meta_rule():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ from moon_manager.api import policy
+
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ policy_name = "testuser" + uuid4().hex
+ req = model_helper.add_model_with_blank_subject_meta_rule()
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ with pytest.raises(exceptions.MetaRuleContentError) as exception_info:
+ hug.test.post(policy, "policies/", data, headers=auth_headers)
+ assert "400: Meta Rule Error" == str(exception_info.value)
+
+
+
+# FIXME: uncomment when model API is re-inserted
+# def test_update_policies_with_model():
+# from moon_manager.api import policy
+# policy_name = "testuser" + uuid4().hex
+# data = {
+# "name": policy_name,
+# "description": "description of {}".format(policy_name),
+# "model_id": "",
+# "genre": "genre"
+# }
+# req = hug.test.post(policy, "policies/", data)
+# policy_id = next(iter(req.data['policies']))
+# req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+# model_id = list(req.data.keys())[0]
+# data = {
+# "name": policy_name + "-2",
+# "description": "description of {}".format(policy_name),
+# "model_id": model_id,
+# "genre": "genre"
+# }
+# req = hug.test.patch("policies/{}".format(policy_id), data)
+# assert req.status == HTTP_200
+# assert req.data['policies'][policy_id]['name'] == policy_name + '-2'
+
+
+def test_update_policies_name_success():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ policy_name = "testuser" + uuid4().hex
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = hug.test.post(policy, "policies/", data, headers=auth_headers)
+ policy_id = next(iter(req.data['policies']))
+
+ data = {
+ "name": policy_name + "-2",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers)
+ assert req.status == HTTP_200
+ assert req.data['policies'][policy_id]['name'] == policy_name + '-2'
+
+
+def test_update_blank_policies_with_model():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ policy_name = uuid4().hex
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+ req = hug.test.post(policy, "policies/", data, headers=auth_headers)
+ policy_id = next(iter(req.data['policies']))
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers)
+ assert req.status == HTTP_200
+
+
+def test_update_policies_model_unused():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ policy_name = uuid4().hex
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = hug.test.post(policy, "policies/", data, headers=auth_headers)
+ policy_id = next(iter(req.data['policies']))
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+
+ with pytest.raises(exceptions.PolicyUpdateError) as exception_info:
+ req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers)
+ assert "400: Policy update error" == str(exception_info.value)
+
+
+# FIXME: uncomment when model API is re-inserted
+# def test_update_policy_name_with_existed_one():
+# from moon_manager.api import policy
+# policy_name1 = "testuser" + uuid4().hex
+# req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+# model_id = list(req.keys())[0]
+# data = {
+# "name": policy_name1,
+# "description": "description of {}".format(policy_name1),
+# "model_id": model_id,
+# "genre": "genre"
+# }
+# req = hug.test.post(policy, "policies/", data)
+# policy_id1 = next(iter(req.data['policies']))
+#
+# policy_name2 = "testuser" + uuid4().hex
+# eq = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+# model_id = list(req.data.keys())[0]
+# data = {
+# "name": policy_name2,
+# "description": "description of {}".format(policy_name2),
+# "model_id": model_id,
+# "genre": "genre"
+# }
+# req = hug.test.post(policy, "policies/", data)
+# policy_id2 = next(iter(req.data['policies']))
+#
+# data = {
+# "name": policy_name1,
+# "description": "description of {}".format(policy_name1),
+# "model_id": model_id,
+# "genre": "genre"
+# }
+# req = hug.test.patch(policy, "policies/{}".format(policy_id2), data)
+# assert req.status == HTTP_409
+# assert req.data["message"] == '409: Policy Already Exists'
+
+
+def test_update_policies_with_empty_name():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ policy_name = "testuser" + uuid4().hex
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = hug.test.post(policy, "policies/", data, headers=auth_headers)
+ policy_id = next(iter(req.data['policies']))
+
+ data = {
+ "name": "",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ with pytest.raises(exceptions.PolicyContentError) as exception_info:
+ req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers)
+ assert "400: Policy Content Error" == str(exception_info.value)
+ # assert req.status == HTTP_400
+ # assert req.data["message"] == '400: Policy Content Error'
+
+
+def test_update_policies_with_blank_model():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ policy_name = "testuser" + uuid4().hex
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = hug.test.post(policy, "policies/", data, headers=auth_headers)
+ policy_id = next(iter(req.data['policies']))
+
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+
+ with pytest.raises(exceptions.PolicyUpdateError) as exception_info:
+ req = hug.test.patch(policy, "policies/{}".format(policy_id), data, headers=auth_headers)
+ assert "400: Policy update error" == str(exception_info.value)
+
+
+# FIXME: uncomment when model API is re-inserted
+# def test_update_policies_connected_to_rules_with_blank_model():
+# from moon_manager.api import policy
+# req, rules, policy_id = data_builder.add_rules()
+# req = hug.test.get(policy, "policies")
+# for policy_obj_id in req.data['policies']:
+# if policy_obj_id == policy_id:
+# policy = req.data['policies'][policy_obj_id]
+# policy['model_id'] = ''
+# req = hug.test.patch("/policies/{}".format(policy_id), req.data)
+# assert req.status == HTTP_400
+# assert req.data["message"] == '400: Policy update error'
def test_delete_policies():
- client = utilities.register_client()
- req = delete_policies(client, "testuser")
- assert req.status_code == 200
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import policy
+ _policy = policy_helper.add_policies()
+ policy_id = list(_policy.keys())[0]
+ req = hug.test.delete(policy, "policies/{}".format(policy_id), headers=auth_headers)
+ assert req.status == HTTP_200
-def test_delete_policies_without_id():
- client = utilities.register_client()
- req = delete_policies_without_id(client)
- assert req.status_code == 500
+# FIXME: uncomment when rule API is re-inserted
+# def test_delete_policy_with_dependencies_rule():
+# from moon_manager.api import policy
+# req, rules, policy_id = data_builder.add_rules()
+# req = hug.test.delete(policy, "policies/{}".format(policy_id))
+# assert req.status == HTTP_400
+# assert req.data["message"] == '400: Policy With Rule Error'
+
+# FIXME: uncomment when perimeter API is re-inserted
+# def test_delete_policy_with_dependencies_subject_data():
+# from moon_manager.api import policy
+# req, rules, policy_id = data_builder.add_rules()
+# req = hug.test.delete(policy, "policies/{}/rules/{}".format(policy_id, next(iter(rules['rules']))))
+# assert req.status == HTTP_200
+# req = hug.test.delete(policy, "policies/{}".format(policy_id))
+# assert req.status == HTTP_400
+# assert req.data["message"] == '400: Policy With Data Error'
+
+
+# FIXME: uncomment when perimeter API is re-inserted
+# def test_delete_policy_with_dependencies_perimeter():
+# from moon_manager.api import policy
+# _policy = policy_helper.add_policies()
+# policy_id = next(iter(_policy))
+#
+# data = {
+# "name": 'testuser'+uuid4().hex,
+# "description": "description of {}".format(uuid4().hex),
+# "password": "password for {}".format(uuid4().hex),
+# "email": "{}@moon".format(uuid4().hex)
+# }
+# req = hug.test.post(policy, "policies/{}/subjects".format(policy_id), data)
+#
+# assert req.status == HTTP_200
+# req = hug.test.delete(policy, "policies/{}".format(policy_id))
+# assert req.status == HTTP_400
+# assert req.data["message"] == '400: Policy With Perimeter Error'
+
+
+def test_delete_policies_without_id():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = delete_policies_without_id(auth_headers=auth_headers)
+ assert req.status == HTTP_405
diff --git a/moon_manager/tests/unit_python/api/test_rules.py b/moon_manager/tests/unit_python/api/test_rules.py
index 86a3d390..2bb7a96f 100644
--- a/moon_manager/tests/unit_python/api/test_rules.py
+++ b/moon_manager/tests/unit_python/api/test_rules.py
@@ -1,58 +1,333 @@
-import api.utilities as utilities
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import pytest
+from moon_utilities import exceptions
import json
+from helpers import data_builder as builder
+from helpers import policy_helper
+from helpers import rules_helper
+import hug
+
+def get_rules(policy_id):
+ from moon_manager.api import rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
-def get_rules(client, policy_id):
- req = client.get("/policies/{}/rules".format(policy_id))
- rules = utilities.get_json(req.data)
+ req = hug.test.get(rules, "/policies/{}/rules".format(policy_id), headers=auth_headers)
+ rules = req.data
return req, rules
-def add_rules(client, policy_id):
+def add_rules_without_policy_id(headers):
+ from moon_manager.api import rules
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
data = {
- "meta_rule_id": "meta_rule_id1",
+ "meta_rule_id": meta_rule_id,
+ "rule": [subject_category_id, object_category_id, action_category_id],
+ "instructions": [
+ {"decision": "grant"},
+ ],
+ "enabled": True
+ }
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.post(rules, "/policies/{}/rules".format(None), body=json.dumps(data),
+ headers=headers)
+ rules = req.data
+ return req, rules
+
+
+def add_rules_without_meta_rule_id(policy_id, headers):
+ from moon_manager.api import rules
+ data = {
+ "meta_rule_id": "",
"rule": ["subject_data_id2", "object_data_id2", "action_data_id2"],
- "instructions": (
+ "instructions": [
{"decision": "grant"},
- ),
+ ],
"enabled": True
}
- req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- rules = utilities.get_json(req.data)
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers=headers)
+ rules = req.data
return req, rules
-def delete_rules(client, policy_id, meta_rule_id):
- req = client.delete("/policies/{}/rules/{}".format(policy_id, meta_rule_id))
+def add_rules_without_rule(policy_id, headers):
+ from moon_manager.api import rules
+ data = {
+ "meta_rule_id": "meta_rule_id1",
+ "instructions": [
+ {"decision": "grant"},
+ ],
+ "enabled": True
+ }
+ headers['Content-Type'] = 'application/json'
+ req = hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers=headers)
+ rules = req.data
+ return req, rules
+
+
+def delete_rules(policy_id, meta_rule_id, headers):
+ from moon_manager.api import rules
+ req = hug.test.delete(rules, "/policies/{}/rules/{}".format(policy_id, meta_rule_id),
+ headers=headers)
+ return req
+
+
+def update_rule(policy_id, rule_id, instructions, headers):
+ from moon_manager.api import rules
+ req = hug.test.patch(rules, "/policies/{}/rules/{}".format(policy_id, rule_id),
+ headers=headers,
+ body=instructions)
return req
-def test_get_rules():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, rules = get_rules(client, policy_id)
- assert req.status_code == 200
+def test_add_rules_with_invalid_decision_instructions():
+ from moon_manager.api import rules
+
+ auth_headers = rules_helper.get_headers()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [subject_category_id, object_category_id, action_category_id],
+ "instructions": [
+ {"decision": "invalid"},
+ ],
+ "enabled": True
+ }
+
+ with pytest.raises(exceptions.RuleContentError) as exception_info:
+ hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers=auth_headers)
+ assert "400: Rule Error" == str(exception_info.value)
+
+
+def test_add_rules_with_meta_rule_not_linked_with_policy_model():
+ from moon_manager.api import rules
+
+ auth_headers = rules_helper.get_headers()
+ policy_id = builder.create_new_policy()[-1]
+ meta_rule_id = builder.create_new_meta_rule()[-1]
+
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": ["subject_data_id2", "object_data_id2", "action_data_id2"],
+ "instructions": [
+ {"decision": "grant"},
+ ],
+ "enabled": True
+ }
+
+ with pytest.raises(exceptions.MetaRuleNotLinkedWithPolicyModel) as exception_info:
+ hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers=auth_headers)
+ assert "400: MetaRule Not Linked With Model - Policy" == str(exception_info.value)
+
+
+def test_add_rules_with_invalid_rule():
+ from moon_manager.api import rules
+
+ auth_headers = rules_helper.get_headers()
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ sub_data_id = builder.create_subject_data(policy_id, subject_category_id)
+ obj_data_id = builder.create_object_data(policy_id, object_category_id)
+ act_data_id = builder.create_action_data(policy_id, action_category_id)
+
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [obj_data_id, sub_data_id, act_data_id],
+ "instructions": [
+ {"decision": "grant"},
+ ],
+ "enabled": True
+ }
+
+ with pytest.raises(exceptions.RuleContentError) as exception_info:
+ hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers=auth_headers)
+ assert "400: Rule Error" == str(exception_info.value)
+
+
+def test_add_rules_with_no_given_decision_instructions(policy_id=None):
+ from moon_manager.api import rules
+
+ auth_headers = rules_helper.get_headers()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ sub_data_id = builder.create_subject_data(policy_id, subject_category_id)
+ obj_data_id = builder.create_object_data(policy_id, object_category_id)
+ act_data_id = builder.create_action_data(policy_id, action_category_id)
+
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [sub_data_id, obj_data_id, act_data_id],
+ "instructions": [],
+ "enabled": True
+ }
+
+ req = hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers=auth_headers)
+
+ assert req.status == hug.HTTP_200
+
+ default_instruction = {"decision": "grant"}
+ rules = req.data['rules']
+ rule_id = next(iter(req.data['rules']))
+ assert rules[rule_id]["instructions"][0] == default_instruction
+
+
+def test_get_rules(policy_id=None):
+ if policy_id == None:
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+
+ req, rules = get_rules(policy_id)
+ assert req.status == hug.HTTP_200
assert isinstance(rules, dict)
assert "rules" in rules
+ return req, rules
def test_add_rules():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, rules = add_rules(client, policy_id)
- assert req.status_code == 200
- assert isinstance(rules, dict)
- value = rules["rules"]
- assert "rules" in rules
- id = list(value.keys())[0]
- assert value[id]["meta_rule_id"] == "meta_rule_id1"
+ req, rules, policy = builder.add_rules()
+ assert req.status == hug.HTTP_200
+
+
+def test_add_rules_without_policy_id():
+ from moon_manager.api import rules
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [subject_category_id, object_category_id, action_category_id],
+ "instructions": [
+ {"decision": "grant"},
+ ],
+ "enabled": True
+ }
+
+ headers = rules_helper.get_headers()
+ with pytest.raises(exceptions.PolicyUnknown) as exception_info:
+ req = hug.test.post(rules, "/policies/{}/rules".format(None), body=json.dumps(data),
+ headers=headers)
+ assert "400: Policy Unknown" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == "400: Policy Unknown"
+
+
+#
+# def test_add_rules_without_meta_rule_id():
+# policy_id = utilities.get_policy_id()
+# client = utilities.register_client()
+# req, rules = add_rules_without_meta_rule_id(client, policy_id)
+# assert req.status == 400
+# assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]"
+
+def test_add_rules_without_rule():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ with pytest.raises(exceptions.ValidationKeyError) as exception_info:
+ req, rules = add_rules_without_rule(policy_id, headers=auth_headers)
+ assert "Invalid Key :rule not found" == str(exception_info.value)
+ # assert req.status == hug.HTTP_400
+ # assert req.data["message"] == 'Invalid Key :rule not found'
-def test_delete_rules():
- client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- req, added_rules = get_rules(client, policy_id)
- id = added_rules["rules"]['rules'][0]['id']
- rules = delete_rules(client, policy_id, id)
- assert rules.status_code == 200
+
+def test_update_rule_without_body():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req, rules, policy_id = builder.add_rules()
+ rule_id = list(rules['rules'].keys())[0]
+
+ req = update_rule(policy_id, rule_id, instructions=None, headers=auth_headers)
+
+ assert req.status == hug.HTTP_400
+
+
+def test_update_rule_without_instructions_in_body():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ data = {"instruction": [ # faute de frappe
+ {"decision": "deny"},
+ ]}
+
+ req, rules, policy_id = builder.add_rules()
+ rule_id = list(rules['rules'].keys())[0]
+
+ req = update_rule(policy_id, rule_id, instructions=None, headers=auth_headers)
+
+ assert req.status == hug.HTTP_400
+
+
+def test_update_rule():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+
+ req, rules, policy_id = builder.add_rules()
+ rule_id = list(rules['rules'].keys())[0]
+
+ data = {"instructions": [
+ {"decision": "deny"},
+ ]}
+ req = update_rule(policy_id, rule_id, data, headers=auth_headers)
+
+ rules = get_rules(policy_id)[1]['rules']['rules']
+
+ rule = None
+ for rule_ in rules:
+ if rule_['id'] == rule_id:
+ rule = rule_
+ break
+
+ assert req.status == hug.HTTP_200 and rule['instructions'][0]['decision'] == "deny"
+
+
+def test_delete_rules_with_invalid_parameters():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ req = delete_rules("", "", headers=auth_headers)
+ assert req.status == hug.HTTP_405
+
+
+def test_delete_rules_without_policy_id():
+ from moon_manager.api import rules
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ sub_data_id = builder.create_subject_data(policy_id, subject_category_id)
+ obj_data_id = builder.create_object_data(policy_id, object_category_id)
+ act_data_id = builder.create_action_data(policy_id, action_category_id)
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [sub_data_id, obj_data_id, act_data_id],
+ "instructions": [
+ {"decision": "grant"},
+ ],
+ "enabled": True
+ }
+ hug.test.post(rules, "/policies/{}/rules".format(policy_id), body=json.dumps(data),
+ headers={'Content-Type': 'application/json',
+ "X-Api-Key": get_api_key_for_user("admin")})
+ req, added_rules = get_rules(policy_id)
+ id = list(added_rules["rules"]["rules"])[0]["id"]
+ rules = delete_rules(None, id, headers=auth_headers)
+ assert rules.status == hug.HTTP_200
diff --git a/moon_manager/tests/unit_python/api/test_slaves.py b/moon_manager/tests/unit_python/api/test_slaves.py
new file mode 100644
index 00000000..29d5e62e
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_slaves.py
@@ -0,0 +1,90 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+from falcon import HTTP_200, HTTP_400, HTTP_405
+import hug
+from uuid import uuid4
+from helpers import data_builder as builder
+
+
+def test_get_slaves():
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import slave
+ req = hug.test.get(slave, 'slaves/', headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ assert "slaves" in req.data
+ for slave in req.data.get("slaves"):
+ assert "name" in slave
+ assert "description" in slave
+ assert "status" in slave
+ assert "server_ip" in slave
+ assert "port" in slave
+
+
+def test_add_slave(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import slave
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:10000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ data = {
+ "name": "test_slave_" + uuid4().hex,
+ "description": "description of test_slave"
+ }
+ req = hug.test.post(slave, "slave/", data, headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ found = False
+ assert "slaves" in req.data
+ for value in req.data["slaves"].values():
+ assert "name" in value
+ assert "description" in value
+ assert "api_key" in value
+ assert "process" in value
+ assert "log" in value
+ assert "extra" in value
+ if value['name'] == data['name']:
+ found = True
+ assert value["description"] == "description of test_slave"
+ assert "port" in value.get("extra")
+ assert "status" in value.get("extra")
+ assert "server_ip" in value.get("extra")
+ break
+ assert found
+
+
+def test_delete_slave(mocker):
+ from moon_utilities.auth_functions import get_api_key_for_user
+ auth_headers = {"X-Api-Key": get_api_key_for_user("admin")}
+ from moon_manager.api import slave
+ mocker.patch('moon_manager.plugins.pyorchestrator.get_server_url',
+ return_value="http://127.0.0.1:10000")
+ mocker.patch("subprocess.Popen", return_value=True)
+ data = {
+ "name": "test_slave_" + uuid4().hex,
+ "description": "description of test_slave"
+ }
+ req = hug.test.post(slave, "slave/", data, headers=auth_headers)
+ assert req.status == HTTP_200
+ assert isinstance(req.data, dict)
+ req = hug.test.get(slave, 'slaves/', headers=auth_headers)
+ success_req = None
+ for key, value in req.data['slaves'].items():
+ if value['name'] == data['name']:
+ success_req = hug.test.delete(slave, 'slave/{}'.format(key), headers=auth_headers)
+ break
+ assert success_req
+ assert success_req.status == HTTP_200
+
diff --git a/moon_manager/tests/unit_python/api/utilities.py b/moon_manager/tests/unit_python/api/utilities.py
index 66ca30c5..baf59a51 100644
--- a/moon_manager/tests/unit_python/api/utilities.py
+++ b/moon_manager/tests/unit_python/api/utilities.py
@@ -1,26 +1,16 @@
-import json
-
+# Software Name: MOON
-def get_json(data):
- return json.loads(data.decode("utf-8"))
+# Version: 5.4
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
-def register_client():
- import moon_manager.server
- server = moon_manager.server.create_server()
- client = server.app.test_client()
- return client
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
-def get_policy_id():
- import api.test_policies as policies
- client = register_client()
- policy_id = ''
- req, policy = policies.get_policies(client)
- for id in policy['policies']:
- if id:
- policy_id = id
- break
- if not policy_id:
- policies.add_policies(client, "testuser")
- return policy_id
+import json
+from uuid import uuid4
+def get_json(data):
+ return data;#json.loads(data.decode("utf-8"))