aboutsummaryrefslogtreecommitdiffstats
path: root/moon_manager/tests
diff options
context:
space:
mode:
authorAsteroide <thomas.duval@orange.com>2018-10-05 15:01:17 +0000
committerGerrit Code Review <gerrit@opnfv.org>2018-10-05 15:01:17 +0000
commitcbea4e360e9bfaa9698cf7c61c83c96a1ba89b8c (patch)
treea8bf6a7bfb06605ed5bfab77570afbe1e46cff4b /moon_manager/tests
parenta3f68df52836676b23ac0f5e3d8c40c957ee80a7 (diff)
parent2e35a7e46f0929438c1c206e3116caa829f07dc6 (diff)
Merge "Update code to 4.6 official version"
Diffstat (limited to 'moon_manager/tests')
-rw-r--r--moon_manager/tests/unit_python/api/import_export_utilities.py36
-rw-r--r--moon_manager/tests/unit_python/api/test_assignement.py280
-rw-r--r--moon_manager/tests/unit_python/api/test_data.py84
-rw-r--r--moon_manager/tests/unit_python/api/test_import.py20
-rw-r--r--moon_manager/tests/unit_python/api/test_meta_data.py116
-rw-r--r--moon_manager/tests/unit_python/api/test_meta_rules.py370
-rw-r--r--moon_manager/tests/unit_python/api/test_pdp.py57
-rw-r--r--moon_manager/tests/unit_python/api/test_perimeter.py935
-rw-r--r--moon_manager/tests/unit_python/api/test_policies.py287
-rw-r--r--moon_manager/tests/unit_python/api/test_rules.py47
-rw-r--r--moon_manager/tests/unit_python/api/test_unit_models.py226
-rw-r--r--moon_manager/tests/unit_python/conftest.py22
-rw-r--r--moon_manager/tests/unit_python/helpers/data_builder.py79
-rw-r--r--moon_manager/tests/unit_python/helpers/data_helper.py6
-rw-r--r--moon_manager/tests/unit_python/helpers/model_helper.py7
-rw-r--r--moon_manager/tests/unit_python/helpers/policy_helper.py4
-rw-r--r--moon_manager/tests/unit_python/requirements.txt4
17 files changed, 2181 insertions, 399 deletions
diff --git a/moon_manager/tests/unit_python/api/import_export_utilities.py b/moon_manager/tests/unit_python/api/import_export_utilities.py
index 12cb208e..2ee2627d 100644
--- a/moon_manager/tests/unit_python/api/import_export_utilities.py
+++ b/moon_manager/tests/unit_python/api/import_export_utilities.py
@@ -9,7 +9,7 @@ import api.test_perimeter as test_perimeter
import api.test_meta_data as test_categories
import api.test_data as test_data
import api.test_meta_rules as test_meta_rules
-import api.test_assignemnt as test_assignments
+import api.test_assignement as test_assignments
import api.test_rules as test_rules
import logging
@@ -38,7 +38,6 @@ def clean_subjects(client):
logger.info("subjects policy_keys {}".format(policy_keys))
for policy_key in policy_keys:
client.delete("/policies/{}/subjects/{}".format(policy_key, key))
- client.delete("/subjects/{}".format(key))
def clean_objects(client):
@@ -50,11 +49,11 @@ def clean_objects(client):
logger.info("objects policy_keys {}".format(policy_keys))
for policy_key in policy_keys:
client.delete("/policies/{}/objects/{}".format(policy_key, key))
- client.delete("/objects/{}".format(key))
def clean_actions(client):
actions = test_perimeter.get_actions(client)
+ actions = test_perimeter.get_actions(client)
logger.info("actions {}".format(actions))
for key in actions[1]["actions"]:
action = actions[1]["actions"][key]
@@ -62,7 +61,6 @@ def clean_actions(client):
logger.info("action policy_keys {}".format(policy_keys))
for policy_key in policy_keys:
client.delete("/policies/{}/actions/{}".format(policy_key, key))
- client.delete("/actions/{}".format(key))
def clean_subject_categories(client):
@@ -92,25 +90,33 @@ def clean_subject_data(client):
for policy_key in policies["policies"]:
req, data = test_data.get_subject_data(client, policy_id=policy_key)
logger.info("============= data {}".format(data))
- for key in data["subject_data"]:
- logger.info("============= Deleting {}/{}".format(policy_key, key))
- client.delete("/policies/{}/subject_data/{}".format(policy_key, key))
+ for data_item in data["subject_data"]:
+ if data_item["data"]:
+ for data_id in data_item["data"]:
+ logger.info("============= Deleting {}/{}".format(policy_key, data_id))
+ client.delete("/policies/{}/subject_data/{}/{}".format(policy_key, data_item['category_id'], data_id))
def clean_object_data(client):
req, policies = test_policies.get_policies(client)
for policy_key in policies["policies"]:
req, data = test_data.get_object_data(client, policy_id=policy_key)
- for key in data["object_data"]:
- client.delete("/policies/{}/object_data/{}".format(policy_key, key))
+ for data_item in data["object_data"]:
+ if data_item["data"]:
+ for data_id in data_item["data"]:
+ logger.info("============= object_data {}/{}".format(policy_key, data_id))
+ client.delete("/policies/{}/object_data/{}/{}".format(policy_key, data_item['category_id'], data_id))
def clean_action_data(client):
req, policies = test_policies.get_policies(client)
for policy_key in policies["policies"]:
req, data = test_data.get_action_data(client, policy_id=policy_key)
- for key in data["action_data"]:
- client.delete("/policies/{}/action_data/{}".format(policy_key, key))
+ for data_item in data["action_data"]:
+ if data_item["data"]:
+ for data_id in data_item["data"]:
+ logger.info("============= action_data {}/{}".format(policy_key, data_id))
+ client.delete("/policies/{}/action_data/{}/{}".format(policy_key, data_item['category_id'], data_id))
def clean_meta_rule(client):
@@ -165,10 +171,9 @@ def clean_rules(client):
req, policies = test_policies.get_policies(client)
for policy_key in policies["policies"]:
req, rules = test_rules.get_rules(client, policy_key)
- rules = rules["rules"]
- rules = rules["rules"]
+ rules = rules["rules"]["rules"]
for rule_key in rules:
- client.delete("/policies/{}/rules/{}".format(policy_key, rule_key))
+ req = client.delete("/policies/{}/rules/{}".format(policy_key, rule_key["id"]))
def clean_all(client):
@@ -178,7 +183,6 @@ def clean_all(client):
clean_object_assignments(client)
clean_action_assignments(client)
- clean_meta_rule(client)
clean_subject_data(client)
clean_object_data(client)
@@ -192,5 +196,7 @@ def clean_all(client):
clean_object_categories(client)
clean_action_categories(client)
+
clean_policies(client)
clean_models(client)
+ clean_meta_rule(client) \ No newline at end of file
diff --git a/moon_manager/tests/unit_python/api/test_assignement.py b/moon_manager/tests/unit_python/api/test_assignement.py
new file mode 100644
index 00000000..b56fb420
--- /dev/null
+++ b/moon_manager/tests/unit_python/api/test_assignement.py
@@ -0,0 +1,280 @@
+import api.utilities as utilities
+import json
+from helpers import data_builder as builder
+from uuid import uuid4
+
+
+# subject_categories_test
+
+
+def get_subject_assignment(client, policy_id):
+ req = client.get("/policies/{}/subject_assignments".format(policy_id))
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ subject_id = builder.create_subject(policy_id)
+ data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ data = {
+ "id": subject_id,
+ "category_id": subject_category_id,
+ "data_id": data_id
+ }
+ req = client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment_without_cat_id(client):
+
+ data = {
+ "id": "subject_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/subject_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def delete_subject_assignment(client, policy_id, sub_id, cat_id,data_id):
+ req = client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_id, sub_id, cat_id,data_id))
+ return req
+
+
+def test_add_subject_assignment():
+ client = utilities.register_client()
+ req, subject_assignment = add_subject_assignment(client)
+ assert req.status_code == 200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+# def test_add_subject_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, subject_assignment = add_subject_assignment_without_cat_id(client)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_get_subject_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+def test_delete_subject_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(client, policy_id)
+ value = subject_assignment["subject_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_subject_assignment(client,
+ policy_id,
+ value[_id]['subject_id'],
+ value[_id]['category_id'],
+ value[_id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_subject_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_subject_assignment(client, "", "id1", "111", "data_id1")
+ assert success_req.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+# object_categories_test
+
+
+def get_object_assignment(client, policy_id):
+ req = client.get("/policies/{}/object_assignments".format(policy_id))
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ object_id = builder.create_object(policy_id)
+ data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id)
+
+ data = {
+ "id": object_id,
+ "category_id": object_category_id,
+ "data_id": data_id
+ }
+
+ req = client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment_without_cat_id(client):
+
+ data = {
+ "id": "object_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/object_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def delete_object_assignment(client, policy_id, obj_id, cat_id, data_id):
+ req = client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_id, obj_id, cat_id, data_id))
+ return req
+
+
+def test_get_object_assignment():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ client = utilities.register_client()
+ req, object_assignment = get_object_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(object_assignment, dict)
+ assert "object_assignments" in object_assignment
+
+
+def test_add_object_assignment():
+ client = utilities.register_client()
+ req, object_assignment = add_object_assignment(client)
+ assert req.status_code == 200
+ assert "object_assignments" in object_assignment
+
+
+# def test_add_object_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, object_assignment = add_object_assignment_without_cat_id(client)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_object_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(client, policy_id)
+ value = object_assignment["object_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_object_assignment(client,
+ policy_id,
+ value[_id]['object_id'],
+ value[_id]['category_id'],
+ value[_id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_object_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_object_assignment(client, "", "id1", "111", "data_id1")
+ assert success_req.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+# action_categories_test
+
+
+def get_action_assignment(client, policy_id):
+ req = client.get("/policies/{}/action_assignments".format(policy_id))
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ action_id = builder.create_action(policy_id)
+ data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id)
+
+ data = {
+ "id": action_id,
+ "category_id": action_category_id,
+ "data_id": data_id
+ }
+ req = client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment_without_cat_id(client):
+
+ data = {
+ "id": "action_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/action_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def delete_action_assignment(client, policy_id, action_id, cat_id, data_id):
+ req = client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_id, action_id, cat_id, data_id))
+ return req
+
+
+def test_get_action_assignment():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ client = utilities.register_client()
+ req, action_assignment = get_action_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(action_assignment, dict)
+ assert "action_assignments" in action_assignment
+
+
+def test_add_action_assignment():
+ client = utilities.register_client()
+ req, action_assignment = add_action_assignment(client)
+ assert req.status_code == 200
+ assert "action_assignments" in action_assignment
+
+
+# def test_add_action_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, action_assignment = add_action_assignment_without_cat_id(client)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_action_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(client, policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_action_assignment(client,
+ policy_id,
+ value[id]['action_id'],
+ value[id]['category_id'],
+ value[id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_action_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_action_assignment(client, "", "id1", "111", "data_id1")
+ assert success_req.status_code == 404
+
+# ---------------------------------------------------------------------------
diff --git a/moon_manager/tests/unit_python/api/test_data.py b/moon_manager/tests/unit_python/api/test_data.py
index ff0856af..433f69e6 100644
--- a/moon_manager/tests/unit_python/api/test_data.py
+++ b/moon_manager/tests/unit_python/api/test_data.py
@@ -36,8 +36,8 @@ def add_subject_data(client, name):
return req, subject_data
-def delete_subject_data(client, policy_id):
- req = client.delete("/policies/{}/subject_data".format(policy_id))
+def delete_subject_data(client, policy_id, category_id, data_id):
+ req = client.delete("/policies/{}/subject_data/{}/{}".format(policy_id,category_id,data_id))
return req
@@ -65,31 +65,24 @@ def test_add_subject_data():
def test_delete_subject_data():
client = utilities.register_client()
subject_category_id, object_category_id, action_category_id, meta_rule_id,policy_id = builder.create_new_policy()
- success_req = delete_subject_data(client, policy_id)
+ data_id = builder.create_subject_data(policy_id,subject_category_id)
+ success_req = delete_subject_data(client, policy_id, subject_category_id, data_id )
assert success_req.status_code == 200
-def test_add_subject_data_with_empty_user():
+def test_add_subject_data_with_forbidden_char_in_user():
client = utilities.register_client()
- req, subject_data = add_subject_data(client, "")
+ req, subject_data = add_subject_data(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
-
-
-def test_add_subject_data_with_user_contain_space():
- client = utilities.register_client()
- req, subject_data = add_subject_data(client, "test user")
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_delete_subject_data_without_policy_id():
client = utilities.register_client()
- success_req = delete_subject_data(client, "")
+ success_req = delete_subject_data(client, "", "", "")
assert success_req.status_code == 404
# ---------------------------------------------------------------------------
-
# object_categories_test
@@ -118,8 +111,8 @@ def add_object_data(client, name):
return req, object_data
-def delete_object_data(client, policy_id):
- req = client.delete("/policies/{}/object_data".format(policy_id))
+def delete_object_data(client, policy_id, category_id, data_id):
+ req = client.delete("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id))
return req
@@ -139,42 +132,34 @@ def test_add_object_data():
assert isinstance(object_data, dict)
value = object_data["object_data"]['data']
assert "object_data" in object_data
- id = list(value.keys())[0]
- print("-----------------------")
- print(id)
- print(value[id])
- print("-----------------------")
- assert value[id]['name'] == "testuser"
- assert value[id]['description'] == "description of {}".format("testuser")
+ _id = list(value.keys())[0]
+ assert value[_id]['name'] == "testuser"
+ assert value[_id]['description'] == "description of {}".format("testuser")
def test_delete_object_data():
client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_object_data(client, policy_id)
- assert success_req.status_code == 200
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_object_data(policy_id, object_category_id)
-def test_add_object_data_with_empty_user():
- client = utilities.register_client()
- req, subject_data = add_object_data(client, "")
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ success_req = delete_object_data(client, policy_id, data_id, object_category_id)
+ assert success_req.status_code == 200
-def test_add_object_data_with_user_contain_space():
+def test_add_object_data_with_forbidden_char_in_user():
client = utilities.register_client()
- req, object_data = add_object_data(client, "test user")
+ req, subject_data = add_object_data(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_delete_object_data_without_policy_id():
client = utilities.register_client()
- success_req = delete_object_data(client, "")
+ success_req = delete_object_data(client, "", "", "")
assert success_req.status_code == 404
-# ---------------------------------------------------------------------------
+# ---------------------------------------------------------------------------
# action_categories_test
@@ -203,8 +188,8 @@ def add_action_data(client, name):
return req, action_data
-def delete_action_data(client, policy_id):
- req = client.delete("/policies/{}/action_data".format(policy_id))
+def delete_action_data(client, policy_id, categorgy_id, data_id):
+ req = client.delete("/policies/{}/action_data/{}/{}".format(policy_id, categorgy_id, data_id))
return req
@@ -231,27 +216,24 @@ def test_add_action_data():
def test_delete_action_data():
client = utilities.register_client()
- policy_id = utilities.get_policy_id()
- success_req = delete_action_data(client, policy_id)
- assert success_req.status_code == 200
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_action_data(policy_id, action_category_id)
-def test_add_action_data_with_empty_user():
- client = utilities.register_client()
- req, action_data = add_action_data(client, "")
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ success_req = delete_action_data(client, policy_id, data_id, action_category_id)
+
+ assert success_req.status_code == 200
-def test_add_action_data_with_user_contain_space():
+def test_add_action_data_with_forbidden_char_in_user():
client = utilities.register_client()
- req, action_data = add_action_data(client, "test user")
+ req, action_data = add_action_data(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_delete_action_data_without_policy_id():
client = utilities.register_client()
- success_req = delete_action_data(client, "")
+ success_req = delete_action_data(client, "", "", "")
assert success_req.status_code == 404
# ---------------------------------------------------------------------------
diff --git a/moon_manager/tests/unit_python/api/test_import.py b/moon_manager/tests/unit_python/api/test_import.py
index f1ab8251..af5f753a 100644
--- a/moon_manager/tests/unit_python/api/test_import.py
+++ b/moon_manager/tests/unit_python/api/test_import.py
@@ -9,7 +9,7 @@ import api.test_policies as test_policies
import api.test_meta_data as test_categories
import api.test_data as test_data
import api.test_meta_rules as test_meta_rules
-import api.test_assignemnt as test_assignments
+import api.test_assignement as test_assignments
import api.test_rules as test_rules
import api.import_export_utilities as import_export_utilities
@@ -42,7 +42,8 @@ OBJECTS = [
"objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]},
{"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}],
"objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]},
- {"objects": [{"name": "test object", "description": "new description of the object", "extra": {"test": "test extra"},
+ {"objects": [{"name": "test object", "description": "new description of the object",
+ "extra": {"test": "test extra"},
"policies": [{"name": "test other policy"}]}]},
{"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}],
"objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}]},
@@ -225,7 +226,14 @@ def test_import_subject_object_action():
if counter == 2 or counter == 4:
clean_method(client)
- req = client.post("/import", content_type='application/json', data=json.dumps(element))
+
+ if counter == 3:
+ req = client.patch("/{}s/{}".format(type_element,perimeter_id), content_type='application/json',
+ data=json.dumps(
+ element["{}s".format(type_element)][0]))
+ else :
+ req = client.post("/import", content_type='application/json',
+ data=json.dumps(element))
if counter < 2:
assert req.status_code == 500
continue
@@ -237,10 +245,13 @@ def test_import_subject_object_action():
#assert counter < 2 #  this is an expected failure
#continue
- assert data == "Import ok !"
+ if counter != 3:
+ assert data == "Import ok !"
get_elements = utilities.get_json(client.get("/"+type_element + "s").data)
get_elements = get_elements[type_element + "s"]
+ perimeter_id = list(get_elements.keys())[0]
+
assert len(list(get_elements.keys())) == 1
values = list(get_elements.values())
assert values[0]["name"] == name
@@ -338,6 +349,7 @@ def test_import_meta_rules():
def test_import_subject_object_action_assignments():
client = utilities.register_client()
import_export_utilities.clean_all(client)
+
req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS))
data = utilities.get_json(req.data)
assert data == "Import ok !"
diff --git a/moon_manager/tests/unit_python/api/test_meta_data.py b/moon_manager/tests/unit_python/api/test_meta_data.py
index 4cb86913..e6cb0833 100644
--- a/moon_manager/tests/unit_python/api/test_meta_data.py
+++ b/moon_manager/tests/unit_python/api/test_meta_data.py
@@ -1,7 +1,10 @@
import json
import api.utilities as utilities
+from helpers import data_builder
+from uuid import uuid4
-#subject_categories_test
+
+# subject_categories_test
def get_subject_categories(client):
@@ -52,18 +55,35 @@ def test_add_subject_categories():
assert value['description'] == "description of {}".format("testuser")
-def test_add_subject_categories_with_empty_user():
+def test_add_subject_categories_with_existed_name():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, subject_categories = add_subject_categories(client, name)
+ assert req.status_code == 200
+ req, subject_categories = add_subject_categories(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Subject Category Existing'
+
+
+def test_add_subject_categories_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Category Name Invalid'
+
+
+def test_add_subject_categories_with_empty_name():
client = utilities.register_client()
- req, subject_categories = add_subject_categories(client, "")
+ req, subject_categories = add_subject_categories(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
-def test_add_subject_categories_with_user_contain_space():
+def test_add_subject_categories_with_name_contain_space():
client = utilities.register_client()
- req, subject_categories = add_subject_categories(client, "test user")
+ req, subject_categories = add_subject_categories(client, "test<z>user")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_delete_subject_categories():
@@ -79,8 +99,8 @@ def test_delete_subject_categories_without_id():
assert json.loads(req.data)["message"] == "400: Subject Category Unknown"
-#---------------------------------------------------------------------------
-#object_categories_test
+# ---------------------------------------------------------------------------
+# object_categories_test
def get_object_categories(client):
req = client.get("/object_categories")
@@ -130,18 +150,35 @@ def test_add_object_categories():
assert value['description'] == "description of {}".format("testuser")
-def test_add_object_categories_with_empty_user():
+def test_add_object_categories_with_existed_name():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, object_categories = add_object_categories(client, name)
+ assert req.status_code == 200
+ req, object_categories = add_object_categories(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Object Category Existing'
+
+
+def test_add_object_categories_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_object_categories(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Category Name Invalid'
+
+
+def test_add_object_categories_with_empty_name():
client = utilities.register_client()
- req, object_categories = add_object_categories(client, "")
+ req, object_categories = add_object_categories(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
-def test_add_object_categories_with_user_contain_space():
+def test_add_object_categories_with_name_contain_space():
client = utilities.register_client()
- req, object_categories = add_object_categories(client, "test user")
+ req, object_categories = add_object_categories(client, "test<a>user")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_delete_object_categories():
@@ -157,8 +194,8 @@ def test_delete_object_categories_without_id():
assert json.loads(req.data)["message"] == "400: Object Category Unknown"
-#---------------------------------------------------------------------------
-#action_categories_test
+# ---------------------------------------------------------------------------
+# action_categories_test
def get_action_categories(client):
req = client.get("/action_categories")
@@ -208,18 +245,35 @@ def test_add_action_categories():
assert value['description'] == "description of {}".format("testuser")
-def test_add_action_categories_with_empty_user():
+def test_add_action_categories_with_existed_name():
client = utilities.register_client()
- req, action_categories = add_action_categories(client, "")
+ name = uuid4().hex
+ req, action_categories = add_action_categories(client, name)
+ assert req.status_code == 200
+ req, action_categories = add_action_categories(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Action Category Existing'
+
+
+def test_add_action_categories_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_action_categories(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Category Name Invalid'
+
+
+def test_add_action_categories_with_empty_name():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
-def test_add_action_categories_with_user_contain_space():
+def test_add_action_categories_with_name_contain_space():
client = utilities.register_client()
- req, action_categories = add_action_categories(client, "test user")
+ req, action_categories = add_action_categories(client, "test<a>user")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_delete_action_categories():
@@ -233,3 +287,19 @@ def test_delete_action_categories_without_id():
req = delete_action_categories_without_id(client)
assert req.status_code == 400
assert json.loads(req.data)["message"] == "400: Action Category Unknown"
+
+
+def test_delete_data_categories_connected_to_meta_rule():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ client = utilities.register_client()
+ req = client.delete("/subject_categories/{}".format(subject_category_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Subject Category With Meta Rule Error'
+
+ req = client.delete("/object_categories/{}".format(object_category_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Object Category With Meta Rule Error'
+
+ req = client.delete("/action_categories/{}".format(action_category_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Action Category With Meta Rule Error'
diff --git a/moon_manager/tests/unit_python/api/test_meta_rules.py b/moon_manager/tests/unit_python/api/test_meta_rules.py
index 80d648b4..634f19da 100644
--- a/moon_manager/tests/unit_python/api/test_meta_rules.py
+++ b/moon_manager/tests/unit_python/api/test_meta_rules.py
@@ -1,6 +1,7 @@
import json
import api.utilities as utilities
from helpers import category_helper
+from helpers import data_builder
from uuid import uuid4
@@ -10,32 +11,36 @@ def get_meta_rules(client):
return req, meta_rules
-def add_meta_rules(client, name):
- subject_category = category_helper.add_subject_category(value={"name": "subject category name"+uuid4().hex, "description": "description 1"})
- subject_category_id = list(subject_category.keys())[0]
- object_category = category_helper.add_object_category(value={"name": "object category name"+ uuid4().hex, "description": "description 1"})
- object_category_id = list(object_category.keys())[0]
- action_category = category_helper.add_action_category(value={"name": "action category name"+uuid4().hex, "description": "description 1"})
- action_category_id = list(action_category.keys())[0]
-
- data = {
- "name": name,
- "subject_categories": [subject_category_id],
- "object_categories": [object_category_id],
- "action_categories": [action_category_id]
- }
+def add_meta_rules(client, name, data=None):
+ if not data:
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id = list(action_category.keys())[0]
+
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
req = client.post("/meta_rules", data=json.dumps(data),
headers={'Content-Type': 'application/json'})
meta_rules = utilities.get_json(req.data)
return req, meta_rules
-def add_meta_rules_without_subject_category_ids(client, name):
+def add_meta_rules_without_category_ids(client, name):
data = {
- "name": name,
+ "name": name + uuid4().hex,
"subject_categories": [],
- "object_categories": ["object_category_id1"],
- "action_categories": ["action_category_id1"]
+ "object_categories": [],
+ "action_categories": []
}
req = client.post("/meta_rules", data=json.dumps(data),
headers={'Content-Type': 'application/json'})
@@ -43,37 +48,45 @@ def add_meta_rules_without_subject_category_ids(client, name):
return req, meta_rules
-def update_meta_rules(client, name, metaRuleId):
- subject_category = category_helper.add_subject_category(
- value={"name": "subject category name update" + uuid4().hex, "description": "description 1"})
- subject_category_id = list(subject_category.keys())[0]
- object_category = category_helper.add_object_category(
- value={"name": "object category name update" + uuid4().hex, "description": "description 1"})
- object_category_id = list(object_category.keys())[0]
- action_category = category_helper.add_action_category(
- value={"name": "action category name update" + uuid4().hex, "description": "description 1"})
- action_category_id = list(action_category.keys())[0]
- data = {
- "name": name,
- "subject_categories": [subject_category_id],
- "object_categories": [object_category_id],
- "action_categories": [action_category_id]
- }
+def update_meta_rules(client, name, metaRuleId, data=None):
+ if not data:
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name update" + uuid4().hex,
+ "description": "description 1"})
+ subject_category_id = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name update" + uuid4().hex,
+ "description": "description 1"})
+ object_category_id = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name update" + uuid4().hex,
+ "description": "description 1"})
+ action_category_id = list(action_category.keys())[0]
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+
req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ headers={'Content-Type': 'application/json'})
meta_rules = utilities.get_json(req.data)
return req, meta_rules
-def update_meta_rules_without_subject_category_ids(client, name):
- data = {
- "name": name,
- "subject_categories": [],
- "object_categories": ["object_category_id1"],
- "action_categories": ["action_category_id1"]
- }
- req = client.post("/meta_rules", data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+def update_meta_rules_with_categories(client, name, data=None, meta_rule_id=None):
+ if not meta_rule_id:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+
+ req = client.patch("/meta_rules/{}".format(meta_rule_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
meta_rules = utilities.get_json(req.data)
return req, meta_rules
@@ -82,9 +95,7 @@ def delete_meta_rules(client, name):
request, meta_rules = get_meta_rules(client)
for key, value in meta_rules['meta_rules'].items():
if value['name'] == name:
- req = client.delete("/meta_rules/{}".format(key))
- break
- return req
+ return client.delete("/meta_rules/{}".format(key))
def delete_meta_rules_without_id(client):
@@ -102,38 +113,143 @@ def test_get_meta_rules():
def test_add_meta_rules():
client = utilities.register_client()
- req, meta_rules = add_meta_rules(client, "testuser")
+ meta_rule_name = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name)
assert req.status_code == 200
assert isinstance(meta_rules, dict)
value = list(meta_rules["meta_rules"].values())[0]
assert "meta_rules" in meta_rules
- assert value['name'] == "testuser"
+ assert value['name'] == meta_rule_name
-def test_add_meta_rules_with_empty_user():
+def test_add_two_meta_rules_with_same_categories_combination():
client = utilities.register_client()
- req, meta_rules = add_meta_rules(client, "")
+ meta_rule_name = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name)
+ assert req.status_code == 200
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name:
+ data = meta_rules['meta_rules'][meta_rule_id]
+
+ data['name'] = uuid4().hex
+ req, meta_rules = add_meta_rules(client, name=data['name'], data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Meta Rule Existing'
+
+
+def test_add_three_meta_rules_with_different_combination_but_similar_items():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name1)
+ assert req.status_code == 200
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name1:
+ data = meta_rules['meta_rules'][meta_rule_id]
+ break
+
+ meta_rule_name2 = uuid4().hex
+
+ req, meta_rules = add_meta_rules(client, meta_rule_name2)
+
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name2:
+ data['subject_categories'] += meta_rules['meta_rules'][meta_rule_id][
+ 'subject_categories']
+ data['object_categories'] += meta_rules['meta_rules'][meta_rule_id]['object_categories']
+ data['action_categories'] += meta_rules['meta_rules'][meta_rule_id]['action_categories']
+ break
+
+ data['name'] = uuid4().hex
+
+ req, meta_rules = add_meta_rules(client, name=data['name'], data=data)
+ assert req.status_code == 200
+
+
+def test_add_two_meta_rules_with_different_combination_but_similar_items():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ meta_rule_name2 = uuid4().hex
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data)
+ assert req.status_code == 200
+ data = {
+ "name": meta_rule_name2,
+ "subject_categories": [subject_category_id2],
+ "object_categories": [object_category_id1],
+ "action_categories": [action_category_id2]
+ }
+
+ req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data)
+ assert req.status_code == 200
+
+
+def test_add_meta_rule_with_existing_name_error():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, meta_rules = add_meta_rules(client, name)
+ assert req.status_code == 200
+ req, meta_rules = add_meta_rules(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Meta Rule Existing'
+
+
+def test_add_meta_rules_with_forbidden_char_in_name():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules(client, "<a>")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
-def test_add_meta_rules_with_user_contain_space():
+def test_add_meta_rules_with_blank_name():
client = utilities.register_client()
- req, meta_rules = add_meta_rules(client, "test user")
+ req, meta_rules = add_meta_rules(client, "")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == '400: Meta Rule Error'
def test_add_meta_rules_without_subject_categories():
client = utilities.register_client()
- req, meta_rules = add_meta_rules_without_subject_category_ids(client, "testuser")
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'subject_categories', [Empty Container]"
+ name_meta_rule = uuid4().hex
+ req, meta_rules = add_meta_rules_without_category_ids(client, name_meta_rule)
+ assert req.status_code == 200
def test_delete_meta_rules():
client = utilities.register_client()
- req = delete_meta_rules(client, "testuser")
+ name_meta_rule = uuid4().hex
+ req, meta_rules = add_meta_rules_without_category_ids(client, name_meta_rule)
+ meta_rule_id = next(iter(meta_rules['meta_rules']))
+ req = delete_meta_rules(client, meta_rules['meta_rules'][meta_rule_id]['name'])
assert req.status_code == 200
@@ -154,6 +270,70 @@ def test_update_meta_rules():
get_meta_rules(client)
+def test_update_meta_rule_with_combination_existed():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name1)
+ meta_rule_id1 = next(iter(meta_rules['meta_rules']))
+ data1 = meta_rules['meta_rules'][meta_rule_id1]
+
+ meta_rule_name2 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name2)
+ meta_rule_id2 = next(iter(meta_rules['meta_rules']))
+ data2 = meta_rules['meta_rules'][meta_rule_id2]
+ data1['name'] = data2['name']
+ req_update = update_meta_rules(client, name=meta_rule_name2, metaRuleId=meta_rule_id2,
+ data=data1)
+ assert req_update[0].status_code == 409
+ assert req_update[1]['message']== '409: Meta Rule Existing'
+
+
+def test_update_meta_rule_with_different_combination_but_same_data():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data)
+ assert req.status_code == 200
+
+ meta_rule_name2 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name2)
+ meta_rule_id2 = next(iter(meta_rules['meta_rules']))
+ data2 = {
+ "name": meta_rule_name2,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1],
+ "action_categories": [action_category_id1,action_category_id2]
+ }
+
+ req_update = update_meta_rules(client, name=meta_rule_name2, metaRuleId=meta_rule_id2,
+ data=data2)
+ assert req_update[0].status_code == 200
+
+
def test_update_meta_rules_without_id():
client = utilities.register_client()
req_update = update_meta_rules(client, "testuser", "")
@@ -161,15 +341,75 @@ def test_update_meta_rules_without_id():
assert json.loads(req_update[0].data)["message"] == "400: Meta Rule Unknown"
-def test_update_meta_rules_without_user():
+def test_update_meta_rules_without_name():
client = utilities.register_client()
- req_update = update_meta_rules(client, "", "")
+ req_update = update_meta_rules(client, "<br/>", "1234567")
assert req_update[0].status_code == 400
- assert json.loads(req_update[0].data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req_update[0].data)[
+ "message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_update_meta_rules_without_categories():
+ client = utilities.register_client()
+ req_update = update_meta_rules_with_categories(client, "testuser")
+ assert req_update[0].status_code == 200
-def test_update_meta_rules_without_subject_categories():
+def test_update_meta_rules_with_empty_categories():
client = utilities.register_client()
- req_update = update_meta_rules_without_subject_category_ids(client, "testuser")
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [""],
+ "object_categories": [""],
+ "action_categories": [""]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Subject Category Unknown'
+
+
+def test_update_meta_rules_with_empty_action_category():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [""]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Action Category Unknown'
+
+
+def test_update_meta_rules_with_empty_object_category():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id],
+ "object_categories": [""],
+ "action_categories": [action_category_id]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Object Category Unknown'
+
+
+def test_update_meta_rules_with_categories_and_one_empty():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id, ""],
+ "object_categories": [object_category_id, ""],
+ "action_categories": [action_category_id, ""]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
assert req_update[0].status_code == 400
- assert json.loads(req_update[0].data)["message"] == "Key: 'subject_categories', [Empty Container]"
+ assert req_update[1]['message'] == '400: Subject Category Unknown'
diff --git a/moon_manager/tests/unit_python/api/test_pdp.py b/moon_manager/tests/unit_python/api/test_pdp.py
index 1ac9b84f..53a87b21 100644
--- a/moon_manager/tests/unit_python/api/test_pdp.py
+++ b/moon_manager/tests/unit_python/api/test_pdp.py
@@ -69,16 +69,18 @@ def test_add_pdp():
def test_delete_pdp():
client = utilities.register_client()
request, pdp = get_pdp(client)
+ success_req = None
for key, value in pdp['pdps'].items():
if value['name'] == "testuser":
success_req = delete_pdp(client, key)
break
+ assert success_req
assert success_req.status_code == 200
-def test_add_pdp_with_empty_user():
+def test_add_pdp_with_forbidden_char_in_user():
data = {
- "name": "",
+ "name": "<a>",
"security_pipeline": ["policy_id_1", "policy_id_2"],
"keystone_project_id": "keystone_project_id",
"description": "description of testuser"
@@ -86,46 +88,20 @@ def test_add_pdp_with_empty_user():
client = utilities.register_client()
req, models = add_pdp(client, data)
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
-
-
-def test_add_pdp_with_user_contain_space():
- data = {
- "name": "test user",
- "security_pipeline": ["policy_id_1", "policy_id_2"],
- "keystone_project_id": "keystone_project_id",
- "description": "description of testuser"
- }
- client = utilities.register_client()
- req, models = add_pdp(client, data)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
-
-
-def test_add_pdp_without_security_pipeline():
- data = {
- "name": "testuser",
- "security_pipeline": [],
- "keystone_project_id": "keystone_project_id",
- "description": "description of testuser"
- }
- client = utilities.register_client()
- req, meta_rules = add_pdp(client, data)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'security_pipeline', [Empty Container]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
-def test_add_pdp_without_keystone():
+def test_add_pdp_with_forbidden_char_in_keystone():
data = {
"name": "testuser",
"security_pipeline": ["policy_id_1", "policy_id_2"],
- "keystone_project_id": "",
+ "keystone_project_id": "<a>",
"description": "description of testuser"
}
client = utilities.register_client()
req, meta_rules = add_pdp(client, data)
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'keystone_project_id', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'keystone_project_id', [Forbidden characters in string]"
def test_update_pdp():
@@ -183,19 +159,6 @@ def test_update_pdp_without_user():
"description": "description of testuser"
}
client = utilities.register_client()
- req_update = update_pdp(client, data, "")
- assert req_update[0].status_code == 400
- assert json.loads(req_update[0].data)["message"] == "Key: 'name', [Empty String]"
-
-
-def test_update_pdp_without_security_pipeline():
- data = {
- "name": "testuser",
- "security_pipeline": [],
- "keystone_project_id": "keystone_project_id",
- "description": "description of testuser"
- }
- client = utilities.register_client()
- req_update = update_pdp(client, data, "")
+ req_update = update_pdp(client, data, "<a>")
assert req_update[0].status_code == 400
- assert json.loads(req_update[0].data)["message"] == "Key: 'security_pipeline', [Empty Container]" \ No newline at end of file
+ assert json.loads(req_update[0].data)["message"] == "Forbidden characters in string"
diff --git a/moon_manager/tests/unit_python/api/test_perimeter.py b/moon_manager/tests/unit_python/api/test_perimeter.py
index 322d90c6..ff7b09d7 100644
--- a/moon_manager/tests/unit_python/api/test_perimeter.py
+++ b/moon_manager/tests/unit_python/api/test_perimeter.py
@@ -3,6 +3,7 @@
import json
import api.utilities as utilities
from helpers import data_builder as builder
+import helpers.policy_helper as policy_helper
from uuid import uuid4
@@ -12,33 +13,27 @@ def get_subjects(client):
return req, subjects
-def add_subjects(client, name):
- subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
- subject_category_name="subject_category1" + uuid4().hex,
- object_category_name="object_category1" + uuid4().hex,
- action_category_name="action_category1" + uuid4().hex,
- meta_rule_name="meta_rule_1" + uuid4().hex,
- model_name="model1" + uuid4().hex)
- data = {
- "name": name + uuid4().hex,
- "description": "description of {}".format(name),
- "password": "password for {}".format(name),
- "email": "{}@moon".format(name)
- }
- req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+def add_subjects(client, policy_id, name, perimeter_id=None, data=None):
+ if not data:
+ name = name + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ if not perimeter_id:
+ req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ else:
+ req = client.post("/policies/{}/subjects/{}".format(policy_id, perimeter_id),
+ data=json.dumps(
+ data),
+ headers={'Content-Type': 'application/json'})
subjects = utilities.get_json(req.data)
return req, subjects
-def delete_subject(client):
- subjects = get_subjects(client)
- value = subjects[1]['subjects']
- id = list(value.keys())[0]
- policy_id = builder.get_policy_id_with_subject_assignment()
- return client.delete("/policies/{}/subjects/{}".format(policy_id, id))
-
-
def delete_subjects_without_perimeter_id(client):
req = client.delete("/subjects/{}".format(""))
return req
@@ -54,18 +49,166 @@ def test_perimeter_get_subject():
def test_perimeter_add_subject():
client = utilities.register_client()
- req, subjects = add_subjects(client, "testuser")
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ req, subjects = add_subjects(client, policy_id, "testuser")
value = list(subjects["subjects"].values())[0]
assert req.status_code == 200
- assert "subjects" in subjects
- assert value["name"] is not None
- assert value["email"] is not None
+ assert value["name"]
+ assert value["email"]
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ name = "testuser"
+ perimeter_id = uuid4().hex
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ add_subjects(client, policy_id1, data['name'], perimeter_id=perimeter_id, data=data)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ req, subjects = add_subjects(client, policy_id2, data['name'],
+ perimeter_id=perimeter_id, data=data)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value["name"]
+ assert value["email"]
+ assert len(value['policy_list']) == 2
+ assert policy_id1 in value['policy_list']
+ assert policy_id2 in value['policy_list']
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_different_name():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ add_subjects(client, policy_id1, "testuser", perimeter_id=perimeter_id)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ req, subjects = add_subjects(client, policy_id2, "testuser", perimeter_id=perimeter_id)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_subject_name_with_new_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id,
+ data=data)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ value = list(subjects["subjects"].values())[0]
+ data = {
+ "name": value['name'],
+ "description": "description of {}".format(value['name']),
+ "password": "password for {}".format(value['name']),
+ "email": "{}@moon".format(value['name'])
+ }
+ req, subjects = add_subjects(client, policy_id2, None, data=data)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value["name"]
+ assert value["email"]
+ assert len(value['policy_list']) == 2
+ assert policy_id1 in value['policy_list']
+ assert policy_id2 in value['policy_list']
+
+
+def test_perimeter_add_same_subject_name_with_same_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id,
+ data=data)
+ value = list(subjects["subjects"].values())[0]
+ data = {
+ "name": value['name'],
+ "description": "description of {}".format(value['name']),
+ "password": "password for {}".format(value['name']),
+ "email": "{}@moon".format(value['name'])
+ }
+ req, subjects = add_subjects(client, policy_id1, None, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_existed_policy_id_in_list():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id, name, data=data)
+ perimeter_id = list(subjects["subjects"].values())[0]['id']
+ req, subjects = add_subjects(client, policy_id, name, perimeter_id=perimeter_id, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_subject_invalid_policy_id():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ name = "testuser"
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id + "0", "testuser", data)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_subject_policy_id_none():
+ client = utilities.register_client()
+ name = "testuser"
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, None, "testuser", data)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
-def test_perimeter_add_subject_without_name():
+def test_perimeter_add_subject_with_forbidden_char_in_name():
client = utilities.register_client()
data = {
- "name": "",
+ "name": "<a>",
"description": "description of {}".format(""),
"password": "password for {}".format(""),
"email": "{}@moon".format("")
@@ -73,26 +216,121 @@ def test_perimeter_add_subject_without_name():
req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
-def test_perimeter_add_subject_with_name_contain_spaces():
+def test_perimeter_update_subject_name():
client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
data = {
- "name": "test user",
- "description": "description of {}".format("test user"),
- "password": "password for {}".format("test user"),
- "email": "{}@moon".format("test user")
+ 'name': value1['name'] + "update"
}
- req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+ req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_subject_description():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update",
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_subject_description_and_name():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update",
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_subject_wrong_id():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, subjects = add_subjects(client, policy_id=policy_id1, name='testuser', data=data)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_update_subject_name_with_existed_one():
+ client = utilities.register_client()
+ name1 = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id1 = uuid4().hex
+ req, subjects = add_subjects(client, policy_id=policy_id1, name=name1,
+ perimeter_id=perimeter_id1)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id2 = uuid4().hex
+ name2 = 'testuser' + uuid4().hex
+ req, subjects = add_subjects(client, policy_id=policy_id1, name=name2,
+ perimeter_id=perimeter_id2)
+ data = {
+ 'name': value1['name'],
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id2), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
def test_perimeter_delete_subject():
client = utilities.register_client()
- req = delete_subject(client)
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ subject_id = list(subjects["subjects"].values())[0]["id"]
+ req = client.delete("/policies/{}/subjects/{}".format(policy_id, subject_id))
assert req.status_code == 200
@@ -109,31 +347,30 @@ def get_objects(client):
return req, objects
-def add_objects(client, name):
- subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy(
- subject_category_name="subject_category1" + uuid4().hex,
- object_category_name="object_category1" + uuid4().hex,
- action_category_name="action_category1" + uuid4().hex,
- meta_rule_name="meta_rule_1" + uuid4().hex,
- model_name="model1" + uuid4().hex)
- data = {
- "name": name + uuid4().hex,
- "description": "description of {}".format(name),
- }
- req = client.post("/policies/{}/objects/".format(policyId), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
+def add_objects(client, name, policyId=None, data=None, perimeter_id=None):
+ if not policyId:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ if not data:
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ }
+ if not perimeter_id:
+ req = client.post("/policies/{}/objects/".format(policyId), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ else:
+ req = client.post("/policies/{}/objects/{}".format(policyId, perimeter_id),
+ data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
objects = utilities.get_json(req.data)
return req, objects
-def delete_object(client):
- objects = get_objects(client)
- value = objects[1]['objects']
- id = list(value.keys())[0]
- policy_id = builder.get_policy_id_with_object_assignment()
- return client.delete("/policies/{}/objects/{}".format(policy_id, id))
-
-
def delete_objects_without_perimeter_id(client):
req = client.delete("/objects/{}".format(""))
return req
@@ -152,37 +389,279 @@ def test_perimeter_add_object():
req, objects = add_objects(client, "testuser")
value = list(objects["objects"].values())[0]
assert req.status_code == 200
- assert "objects" in objects
- assert value['name'] is not None
+ assert value['name']
+
+
+def test_perimeter_add_object_with_wrong_policy_id():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser", policyId='wrong')
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_object_with_policy_id_none():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "description": "description of {}".format("testuser"),
+ }
+ req = client.post("/policies/{}/objects/".format(None), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_same_object_name_with_new_policy_id():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_object_perimeter_id_with_new_policy_id():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_object_perimeter_id_with_different_name():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'] + 'different',
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_object_name_with_same_policy_id():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ value = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ value = list(objects["objects"].values())[0]
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
+ perimeter_id=value['id'])
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_update_object_name():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_object_description():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_object_description_and_name():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_object_wrong_id():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+
+
+def test_perimeter_update_object_name_with_existed_one():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data1 = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data1)
+ value1 = list(objects["objects"].values())[0]
+
+ name = 'testuser' + uuid4().hex
+
+ data2 = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data2)
+
+ value2 = list(objects["objects"].values())[0]
+ perimeter_id2 = value2['id']
+
+ data3 = {
+ 'name': value1['name']
+ }
+ req = client.patch("/objects/{}".format(perimeter_id2), data=json.dumps(data3),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Object Existing'
def test_perimeter_add_object_without_name():
client = utilities.register_client()
data = {
- "name": "",
+ "name": "<br/>",
"description": "description of {}".format(""),
}
req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_perimeter_add_object_with_name_contain_spaces():
client = utilities.register_client()
data = {
- "name": "test user",
+ "name": "test<a>user",
"description": "description of {}".format("test user"),
}
req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_perimeter_delete_object():
client = utilities.register_client()
- req = delete_object(client)
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ object_id = builder.create_object(policy_id)
+ req = client.delete("/policies/{}/objects/{}".format(policy_id, object_id))
assert req.status_code == 200
@@ -199,29 +678,30 @@ def get_actions(client):
return req, actions
-def add_actions(client, name):
- subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy(
- subject_category_name="subject_category1" + uuid4().hex,
- object_category_name="object_category1" + uuid4().hex,
- action_category_name="action_category1" + uuid4().hex,
- meta_rule_name="meta_rule_1" + uuid4().hex,
- model_name="model1" + uuid4().hex)
- data = {
- "name": name + uuid4().hex,
- "description": "description of {}".format(name),
- }
- req = client.post("/policies/{}/actions".format(policyId), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- actions = utilities.get_json(req.data)
- return req, actions
+def add_actions(client, name, policy_id=None, data=None, perimeter_id=None):
+ if not policy_id:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ if not data:
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ }
+ if not perimeter_id:
+ req = client.post("/policies/{}/actions/".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ else:
+ req = client.post("/policies/{}/actions/{}".format(policy_id, perimeter_id),
+ data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
-def delete_actions(client):
- actions = get_actions(client)
- value = actions[1]['actions']
- id = list(value.keys())[0]
- policy_id = builder.get_policy_id_with_action_assignment()
- return client.delete("/policies/{}/actions/{}".format(policy_id, id))
+ actions = utilities.get_json(req.data)
+ return req, actions
def delete_actions_without_perimeter_id(client):
@@ -242,40 +722,305 @@ def test_perimeter_add_actions():
req, actions = add_actions(client, "testuser")
value = list(actions["actions"].values())[0]
assert req.status_code == 200
- assert "actions" in actions
- assert value['name'] is not None
+ assert value['name']
+
+
+def test_perimeter_add_action_with_wrong_policy_id():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser", policy_id="wrong")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_action_with_policy_id_none():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "description": "description of {}".format("testuser"),
+ }
+ req = client.post("/policies/{}/actions/".format(None), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_same_action_name_with_new_policy_id():
+ client = utilities.register_client()
+ req, action = add_actions(client, "testuser")
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data)
+ value2 = list(action["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_action_perimeter_id_with_new_policy_id():
+ client = utilities.register_client()
+ req, action = add_actions(client, "testuser")
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ value2 = list(action["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_action_perimeter_id_with_different_name():
+ client = utilities.register_client()
+ req, action = add_actions(client, "testuser")
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'] + 'different',
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_action_name_with_same_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ req, action = add_actions(client, "testuser", policy_id=policy_id1)
+ value1 = list(action["actions"].values())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_action_perimeter_id_with_existed_policy_id_in_list():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ req, action = add_actions(client, "testuser", policy_id=policy_id1)
+ value1 = list(action["actions"].values())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
def test_perimeter_add_actions_without_name():
client = utilities.register_client()
data = {
- "name": "",
+ "name": "<a>",
"description": "description of {}".format(""),
}
req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
def test_perimeter_add_actions_with_name_contain_spaces():
client = utilities.register_client()
data = {
- "name": "test user",
+ "name": "test<a>user",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_subjects_without_policy_id():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_add_objects_without_policy_id():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/objects".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_add_action_without_policy_id():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser",
"description": "description of {}".format("test user"),
}
req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_perimeter_update_action_name():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_actions_description():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_actions_description_and_name():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_action_wrong_id():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_update_action_name_with_existed_one():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ req, actions = add_actions(client, "testuser")
+ value2 = list(actions["actions"].values())[0]
+ perimeter_id2 = value2['id']
+ data = {
+ 'name': value1['name'],
+ }
+ req = client.patch("/actions/{}".format(perimeter_id2), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Action Existing'
def test_perimeter_delete_actions():
client = utilities.register_client()
- req = delete_actions(client)
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ action_id = builder.create_action(policy_id)
+ req = client.delete("/policies/{}/actions/{}".format(policy_id, action_id))
assert req.status_code == 200
+def test_delete_subject_without_policy():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ req = client.delete("/subjects/{}".format(action_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_delete_objects_without_policy():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ req = client.delete("/objects/{}".format(action_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_delete_actions_without_policy():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ req = client.delete("/actions/{}".format(action_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
def test_perimeter_delete_actions_without_perimeter_id():
client = utilities.register_client()
req = delete_actions_without_perimeter_id(client)
diff --git a/moon_manager/tests/unit_python/api/test_policies.py b/moon_manager/tests/unit_python/api/test_policies.py
index cd50f4c7..76161d53 100644
--- a/moon_manager/tests/unit_python/api/test_policies.py
+++ b/moon_manager/tests/unit_python/api/test_policies.py
@@ -7,6 +7,8 @@ import json
from uuid import uuid4
import api.utilities as utilities
from helpers import model_helper
+from helpers import policy_helper
+from helpers import data_builder
def get_policies(client):
@@ -16,7 +18,7 @@ def get_policies(client):
def add_policies(client, name):
- req = model_helper.add_model(model_id="mls_model_id"+uuid4().hex)
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
model_id = list(req.keys())[0]
data = {
"name": name,
@@ -30,14 +32,6 @@ def add_policies(client, name):
return req, policies
-def delete_policies(client, name):
- request, policies = get_policies(client)
- for key, value in policies['policies'].items():
- req = client.delete("/policies/{}".format(key))
- break
- return req
-
-
def delete_policies_without_id(client):
req = client.delete("/policies/{}".format(""))
return req
@@ -63,15 +57,286 @@ def test_add_policies():
assert value["description"] == "description of {}".format(policy_name)
+def test_add_policies_without_model():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 200
+
+
+def test_add_policies_with_same_name():
+ name = uuid4().hex
+ policy_name = name
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 200
+ assert isinstance(policies, dict)
+ value = list(policies["policies"].values())[0]
+ assert "policies" in policies
+ assert value['name'] == policy_name
+ assert value["description"] == "description of {}".format(policy_name)
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_add_policy_with_empty_name():
+ policy_name = ""
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Content Error'
+
+
+def test_update_policies_with_model():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name + "-2",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+ assert json.loads(req.data)['policies'][policy_id]['name'] == policy_name + '-2'
+
+
+def test_update_policies_name_success():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": policy_name + "-2",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+ assert json.loads(req.data)['policies'][policy_id]['name'] == policy_name + '-2'
+
+
+def test_update_policies_model_unused():
+ policy_name = uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+
+
+def test_update_policy_name_with_existed_one():
+ policy_name1 = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name1,
+ "description": "description of {}".format(policy_name1),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id1 = next(iter(utilities.get_json(req.data)['policies']))
+
+ policy_name2 = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name2,
+ "description": "description of {}".format(policy_name2),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id2 = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": policy_name1,
+ "description": "description of {}".format(policy_name1),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id2), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_update_policies_with_empty_name():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": "",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Content Error'
+
+
+def test_update_policies_with_blank_model():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+
+
+def test_update_policies_connected_to_rules_with_blank_model():
+ client = utilities.register_client()
+ req, rules, policy_id = data_builder.add_rules(client)
+ req = client.get("/policies")
+ data = utilities.get_json(req.data)
+ for policy_obj_id in data['policies']:
+ if policy_obj_id == policy_id:
+ policy = data['policies'][policy_obj_id]
+ policy['model_id'] = ''
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(policy),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy update error'
+
+
def test_delete_policies():
client = utilities.register_client()
- req = delete_policies(client, "testuser")
+
+ policy = policy_helper.add_policies()
+ policy_id = list(policy.keys())[0]
+
+ req = client.delete("/policies/{}".format(policy_id))
assert req.status_code == 200
+def test_delete_policy_with_dependencies_rule():
+ client = utilities.register_client()
+ req, rules, policy_id = data_builder.add_rules(client)
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy With Rule Error'
+
+
+def test_delete_policy_with_dependencies_subject_data():
+ client = utilities.register_client()
+ req, rules, policy_id = data_builder.add_rules(client)
+ req = client.delete("/policies/{}/rules/{}".format(policy_id, next(iter(rules['rules']))))
+ assert req.status_code == 200
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy With Data Error'
+
+
+def test_delete_policy_with_dependencies_perimeter():
+ client = utilities.register_client()
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+
+ data = {
+ "name": 'testuser'+uuid4().hex,
+ "description": "description of {}".format(uuid4().hex),
+ "password": "password for {}".format(uuid4().hex),
+ "email": "{}@moon".format(uuid4().hex)
+ }
+ req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 200
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy With Perimeter Error'
+
+
def test_delete_policies_without_id():
client = utilities.register_client()
req = delete_policies_without_id(client)
assert req.status_code == 400
assert json.loads(req.data)["message"] == '400: Policy Unknown'
-
diff --git a/moon_manager/tests/unit_python/api/test_rules.py b/moon_manager/tests/unit_python/api/test_rules.py
index af1501e4..a3c21839 100644
--- a/moon_manager/tests/unit_python/api/test_rules.py
+++ b/moon_manager/tests/unit_python/api/test_rules.py
@@ -11,31 +11,11 @@ def get_rules(client, policy_id):
return req, rules
-def add_rules(client):
- sub_id, obj_id, act_id, meta_rule_id, policy_id = builder.create_new_policy("sub_cat" + uuid4().hex,
- "obj_cat" + uuid4().hex,
- "act_cat" + uuid4().hex)
- sub_data_id = builder.create_subject_data(policy_id, sub_id)
- obj_data_id = builder.create_object_data(policy_id, obj_id)
- act_data_id = builder.create_action_data(policy_id, act_id)
- data = {
- "meta_rule_id": meta_rule_id,
- "rule": [sub_data_id, obj_data_id, act_data_id],
- "instructions": (
- {"decision": "grant"},
- ),
- "enabled": True
- }
- req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
- headers={'Content-Type': 'application/json'})
- rules = utilities.get_json(req.data)
- return req, rules
-
-
def add_rules_without_policy_id(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
data = {
- "meta_rule_id": "meta_rule_id",
- "rule": ["sub_data_id", "obj_data_id", "act_data_id"],
+ "meta_rule_id": meta_rule_id,
+ "rule": [subject_category_id, object_category_id, action_category_id],
"instructions": (
{"decision": "grant"},
),
@@ -93,7 +73,7 @@ def test_get_rules():
def test_add_rules():
client = utilities.register_client()
- req, rules = add_rules(client, )
+ req, rules, policy = builder.add_rules(client, )
assert req.status_code == 200
@@ -103,13 +83,13 @@ def test_add_rules_without_policy_id():
assert req.status_code == 400
assert json.loads(req.data)["message"] == "400: Policy Unknown"
-
-def test_add_rules_without_meta_rule_id():
- policy_id = utilities.get_policy_id()
- client = utilities.register_client()
- req, rules = add_rules_without_meta_rule_id(client, policy_id)
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]"
+#
+# def test_add_rules_without_meta_rule_id():
+# policy_id = utilities.get_policy_id()
+# client = utilities.register_client()
+# req, rules = add_rules_without_meta_rule_id(client, policy_id)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]"
def test_add_rules_without_rule():
@@ -122,8 +102,9 @@ def test_add_rules_without_rule():
def test_delete_rules_with_invalid_parameters():
client = utilities.register_client()
- rules = delete_rules(client, "", "")
- assert rules.status_code == 404
+ req = delete_rules(client, "", "")
+ assert req.status_code == 404
+ # assert json.loads(req.data)["message"] == 'Invalid Key :rule not found'
def test_delete_rules_without_policy_id():
diff --git a/moon_manager/tests/unit_python/api/test_unit_models.py b/moon_manager/tests/unit_python/api/test_unit_models.py
index d754b976..6e93ed28 100644
--- a/moon_manager/tests/unit_python/api/test_unit_models.py
+++ b/moon_manager/tests/unit_python/api/test_unit_models.py
@@ -6,6 +6,8 @@
import json
import api.utilities as utilities
from helpers import data_builder as builder
+from helpers import policy_helper
+from helpers import model_helper
from uuid import uuid4
@@ -15,16 +17,15 @@ def get_models(client):
return req, models
-def add_models(client, name):
- subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule(
- subject_category_name="subject_category"+uuid4().hex,
- object_category_name="object_category"+uuid4().hex, action_category_name="action_category"+uuid4().hex,
- meta_rule_name="meta_rule" + uuid4().hex)
- data = {
- "name": name,
- "description": "description of {}".format(name),
- "meta_rules": [meta_rule_id]
- }
+def add_models(client, name, data=None):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+
+ if not data:
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
req = client.post("/models", data=json.dumps(data),
headers={'Content-Type': 'application/json'})
models = utilities.get_json(req.data)
@@ -32,10 +33,7 @@ def add_models(client, name):
def update_model(client, name, model_id):
- subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule(
- subject_category_name="subject_category" + uuid4().hex,
- object_category_name="object_category" + uuid4().hex, action_category_name="action_category" + uuid4().hex,
- meta_rule_name="meta_rule" + uuid4().hex)
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
data = {
"name": name,
@@ -60,13 +58,26 @@ def add_model_without_meta_rules_ids(client, name):
return req, models
-def update_model_without_meta_rules_ids(client, name):
+def add_model_with_empty_meta_rule_id(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [""]
+ }
+ req = client.post("/models", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def update_model_without_meta_rules_ids(client, model_id):
+ name = "model_id" + uuid4().hex
data = {
"name": name,
"description": "description of {}".format(name),
"meta_rules": []
}
- req = client.patch("/models", data=json.dumps(data),
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
models = utilities.get_json(req.data)
return req, models
@@ -86,6 +97,24 @@ def delete_models_without_id(client):
return req
+def test_delete_model_assigned_to_policy():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ req = client.delete("/models/{}".format(model_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Model With Policy Error'
+
+
def clean_models():
client = utilities.register_client()
req, models = get_models(client)
@@ -121,6 +150,64 @@ def test_delete_models():
assert req.status_code == 200
+def test_update_models_with_assigned_policy():
+ client = utilities.register_client()
+
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+ value = {
+ "name": "test_policy" + uuid4().hex,
+ "model_id": model_id,
+ "description": "test",
+ }
+ policy = policy_helper.add_policies(value=value)
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ "meta_rules": []
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Model With Policy Error"
+
+
+def test_update_models_with_no_assigned_policy():
+ client = utilities.register_client()
+
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ "meta_rules": []
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 200
+
+
+def test_add_models_with_meta_rule_key():
+ client = utilities.register_client()
+
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Invalid Key :meta_rules not found"
+
+
def test_delete_models_without_id():
client = utilities.register_client()
req = delete_models_without_id(client)
@@ -128,28 +215,80 @@ def test_delete_models_without_id():
assert json.loads(req.data)["message"] == "400: Model Unknown"
-def test_add_model_with_empty_user():
+def test_add_model_with_empty_name():
+ clean_models()
+ client = utilities.register_client()
+ req, models = add_models(client, "<br/>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_model_with_name_contain_space():
+ clean_models()
+ client = utilities.register_client()
+ req, models = add_models(client, "test<br>user")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_model_with_name_space():
clean_models()
client = utilities.register_client()
- req, models = add_models(client, "")
+ req, models = add_models(client, " ")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req.data)["message"] == '400: Model Unknown'
-def test_add_model_with_user_contain_space():
+def test_add_model_with_empty_meta_rule_id():
clean_models()
client = utilities.register_client()
- req, models = add_models(client, "test user")
+ req, meta_rules = add_model_with_empty_meta_rule_id(client, "testuser")
assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]"
+ assert json.loads(req.data)["message"] == '400: Meta Rule Unknown'
+
+
+def test_add_model_with_existed_name():
+ clean_models()
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, models = add_models(client, name)
+ assert req.status_code == 200
+ req, models = add_models(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Model Error'
+
+
+def test_add_model_with_existed_meta_rules_list():
+ clean_models()
+ client = utilities.register_client()
+ name = uuid4().hex
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ name = uuid4().hex
+ req, models = add_models(client=client, name=name, data=data)
+ assert req.status_code == 200
+
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ req, models = add_models(client=client, name=name, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Model Error'
def test_add_model_without_meta_rules():
clean_models()
client = utilities.register_client()
req, meta_rules = add_model_without_meta_rules_ids(client, "testuser")
- assert req.status_code == 400
- assert json.loads(req.data)["message"] == "Key: 'meta_rules', [Empty Container]"
+ assert req.status_code == 200
+ # assert json.loads(req.data)["message"] == "Key: 'meta_rules', [Empty Container]"
def test_update_model():
@@ -164,6 +303,26 @@ def test_update_model():
delete_models(client, "testuser")
+def test_update_model_name_with_space():
+ clean_models()
+ client = utilities.register_client()
+ req = add_models(client, "testuser")
+ model_id = list(req[1]['models'])[0]
+ req_update = update_model(client, " ", model_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]["message"] == '400: Model Unknown'
+
+
+def test_update_model_with_empty_name():
+ clean_models()
+ client = utilities.register_client()
+ req = add_models(client, "testuser")
+ model_id = list(req[1]['models'])[0]
+ req_update = update_model(client, "", model_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Model Unknown'
+
+
def test_update_meta_rules_without_id():
clean_models()
client = utilities.register_client()
@@ -172,15 +331,22 @@ def test_update_meta_rules_without_id():
assert json.loads(req_update[0].data)["message"] == "400: Model Unknown"
-def test_update_meta_rules_without_user():
+def test_update_meta_rules_without_name():
client = utilities.register_client()
- req_update = update_model(client, "", "")
+ req_update = update_model(client, "<a></a>", "1234567")
assert req_update[0].status_code == 400
- assert json.loads(req_update[0].data)["message"] == "Key: 'name', [Empty String]"
+ assert json.loads(req_update[0].data)[
+ "message"] == "Key: 'name', [Forbidden characters in string]"
def test_update_meta_rules_without_meta_rules():
+ value = {
+ "name": "mls_model_id" + uuid4().hex,
+ "description": "test",
+ "meta_rules": []
+ }
+ model = model_helper.add_model(value=value)
+ model_id = list(model.keys())[0]
client = utilities.register_client()
- req_update = update_model_without_meta_rules_ids(client, "testuser")
- assert req_update[0].status_code == 400
- assert json.loads(req_update[0].data)["message"] == "Key: 'meta_rules', [Empty Container]"
+ req_update = update_model_without_meta_rules_ids(client, model_id)
+ assert req_update[0].status_code == 200
diff --git a/moon_manager/tests/unit_python/conftest.py b/moon_manager/tests/unit_python/conftest.py
index d9899231..90a27e54 100644
--- a/moon_manager/tests/unit_python/conftest.py
+++ b/moon_manager/tests/unit_python/conftest.py
@@ -153,6 +153,24 @@ PODS = {
}
}
+SLAVES = {
+ "slaves": [
+ {
+ "context":
+ {
+ "cluster": "kubernetes",
+ "user": "kubernetes-admin"
+ },
+ "name": "kubernetes-admin@kubernetes",
+ "configured": True,
+ "wrapper_name": "mywrapper",
+ "ip": "NC",
+ "port": 31002,
+ "internal_port": 8080
+ }
+ ]
+}
+
def get_b64_conf(component=None):
if component in CONF:
@@ -211,6 +229,10 @@ def no_requests(monkeypatch):
json=PODS
)
m.register_uri(
+ 'GET', 'http://localhost/slaves',
+ json=SLAVES
+ )
+ m.register_uri(
'DELETE', 'http://orchestrator:8083/pods/{}'.format(list([PODS['pods'].keys()])[0]),
headers={"content-type": "application/json"}
)
diff --git a/moon_manager/tests/unit_python/helpers/data_builder.py b/moon_manager/tests/unit_python/helpers/data_builder.py
index 2a7c5979..91808cbe 100644
--- a/moon_manager/tests/unit_python/helpers/data_builder.py
+++ b/moon_manager/tests/unit_python/helpers/data_builder.py
@@ -10,6 +10,7 @@ from helpers import model_helper
from .meta_rule_helper import *
import api.utilities as utilities
import json
+from uuid import uuid4
def create_subject_category(name):
@@ -60,31 +61,57 @@ def create_pdp(policies_ids):
return value
-def create_new_policy(subject_category_name="subjectCategory", object_category_name="objectCategory",
- action_category_name="actionCategory",
- model_name="test_model" + uuid4().hex, policy_name="policy_1" + uuid4().hex,
- meta_rule_name="meta_rule1" + uuid4().hex):
+def create_new_policy(subject_category_name=None, object_category_name=None,
+ action_category_name=None, model_name=None, policy_name=None,
+ meta_rule_name=None):
+ if not subject_category_name:
+ subject_category_name = "subjectCategory_" + uuid4().hex
+ if not object_category_name:
+ object_category_name = "objectCategory_" + uuid4().hex
+ if not action_category_name:
+ action_category_name = "actionCategory_" + uuid4().hex
+
+ if not meta_rule_name:
+ meta_rule_name = "meta_rule_" + uuid4().hex
+
+ if not model_name:
+ model_name = "model_name_" + uuid4().hex
+ if not policy_name:
+ policy_name = "policy_name_" + uuid4().hex
+
subject_category_id, object_category_id, action_category_id, meta_rule_id = create_new_meta_rule(
subject_category_name=subject_category_name + uuid4().hex,
object_category_name=object_category_name + uuid4().hex,
- action_category_name=action_category_name + uuid4().hex, meta_rule_name=meta_rule_name + uuid4().hex)
- model = model_helper.add_model(value=create_model(meta_rule_id, model_name))
+ action_category_name=action_category_name + uuid4().hex,
+ meta_rule_name=meta_rule_name + uuid4().hex
+ )
+
+ model = model_helper.add_model(value=create_model(meta_rule_id, model_name + uuid4().hex))
model_id = list(model.keys())[0]
- value = create_policy(model_id, policy_name)
+ value = create_policy(model_id, policy_name + uuid4().hex)
policy = add_policies(value=value)
assert policy
policy_id = list(policy.keys())[0]
return subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id
-def create_new_meta_rule(subject_category_name="subjectCategory", object_category_name="objectCategory",
- action_category_name="actionCategory",
- meta_rule_name="meta_rule1" + uuid4().hex):
+def create_new_meta_rule(subject_category_name=None, object_category_name=None,
+ action_category_name=None, meta_rule_name=None):
+ if not subject_category_name:
+ subject_category_name = "subjectCategory_" + uuid4().hex
+ if not object_category_name:
+ object_category_name = "objectCategory_" + uuid4().hex
+ if not action_category_name:
+ action_category_name = "actionCategory_" + uuid4().hex
+
+ if not meta_rule_name:
+ meta_rule_name = "meta_rule_" + uuid4().hex
+
subject_category_id = create_subject_category(subject_category_name)
object_category_id = create_object_category(object_category_name)
action_category_id = create_action_category(action_category_name)
value = {"name": meta_rule_name,
- "algorithm": "name of the meta rule algorithm",
+ "description": "name of the meta rule algorithm",
"subject_categories": [subject_category_id],
"object_categories": [object_category_id],
"action_categories": [action_category_id]
@@ -125,7 +152,8 @@ def create_subject_data(policy_id, category_id):
"name": "subject-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- subject_data = add_subject_data(policy_id=policy_id, category_id=category_id, value=value).get('data')
+ subject_data = add_subject_data(policy_id=policy_id, category_id=category_id, value=value).get(
+ 'data')
assert subject_data
return list(subject_data.keys())[0]
@@ -135,7 +163,8 @@ def create_object_data(policy_id, category_id):
"name": "object-security-level",
"description": {"low": "", "medium": "", "high": ""},
}
- object_data = add_object_data(policy_id=policy_id, category_id=category_id, value=value).get('data')
+ object_data = add_object_data(policy_id=policy_id, category_id=category_id, value=value).get(
+ 'data')
return list(object_data.keys())[0]
@@ -144,7 +173,8 @@ def create_action_data(policy_id, category_id):
"name": "action-type",
"description": {"vm-action": "", "storage-action": "", },
}
- action_data = add_action_data(policy_id=policy_id, category_id=category_id, value=value).get('data')
+ action_data = add_action_data(policy_id=policy_id, category_id=category_id, value=value).get(
+ 'data')
return list(action_data.keys())[0]
@@ -207,3 +237,24 @@ def get_policy_id_with_action_assignment():
client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data),
headers={'Content-Type': 'application/json'})
return policy_id
+
+
+def add_rules(client):
+ sub_id, obj_id, act_id, meta_rule_id, policy_id = create_new_policy("sub_cat" + uuid4().hex,
+ "obj_cat" + uuid4().hex,
+ "act_cat" + uuid4().hex)
+ sub_data_id = create_subject_data(policy_id, sub_id)
+ obj_data_id = create_object_data(policy_id, obj_id)
+ act_data_id = create_action_data(policy_id, act_id)
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [sub_data_id, obj_data_id, act_data_id],
+ "instructions": (
+ {"decision": "grant"},
+ ),
+ "enabled": True
+ }
+ req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ rules = utilities.get_json(req.data)
+ return req, rules, policy_id
diff --git a/moon_manager/tests/unit_python/helpers/data_helper.py b/moon_manager/tests/unit_python/helpers/data_helper.py
index da6b9376..e1c05640 100644
--- a/moon_manager/tests/unit_python/helpers/data_helper.py
+++ b/moon_manager/tests/unit_python/helpers/data_helper.py
@@ -16,7 +16,7 @@ def add_action_data(policy_id, data_id=None, category_id=None, value=None):
def delete_action_data(policy_id, data_id):
from python_moondb.core import PolicyManager
- PolicyManager.delete_action_data("", policy_id, data_id)
+ PolicyManager.delete_action_data("", policy_id=policy_id, data_id=data_id)
def get_object_data(policy_id, data_id=None, category_id=None):
@@ -31,7 +31,7 @@ def add_object_data(policy_id, data_id=None, category_id=None, value=None):
def delete_object_data(policy_id, data_id):
from python_moondb.core import PolicyManager
- PolicyManager.delete_object_data("", policy_id, data_id)
+ PolicyManager.delete_object_data("", policy_id=policy_id, data_id=data_id)
def get_subject_data(policy_id, data_id=None, category_id=None):
@@ -46,7 +46,7 @@ def add_subject_data(policy_id, data_id=None, category_id=None, value=None):
def delete_subject_data(policy_id, data_id):
from python_moondb.core import PolicyManager
- PolicyManager.delete_subject_data("", policy_id, data_id)
+ PolicyManager.delete_subject_data("", policy_id=policy_id, data_id=data_id)
def get_actions(policy_id, perimeter_id=None):
diff --git a/moon_manager/tests/unit_python/helpers/model_helper.py b/moon_manager/tests/unit_python/helpers/model_helper.py
index d2ffb85b..73808e03 100644
--- a/moon_manager/tests/unit_python/helpers/model_helper.py
+++ b/moon_manager/tests/unit_python/helpers/model_helper.py
@@ -15,11 +15,8 @@ def get_models(model_id=None):
def add_model(model_id=None, value=None):
from python_moondb.core import ModelManager
if not value:
- subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule(
- subject_category_name="subject_category1"+uuid4().hex,
- object_category_name="object_category1"+uuid4().hex,
- action_category_name="action_category1"+uuid4().hex)
- name = "MLS" if model_id is None else "MLS " + model_id
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+ name = "MLS"+uuid4().hex if model_id is None else "MLS " + model_id
value = {
"name": name,
"description": "test",
diff --git a/moon_manager/tests/unit_python/helpers/policy_helper.py b/moon_manager/tests/unit_python/helpers/policy_helper.py
index c932ee3a..eddd0b8d 100644
--- a/moon_manager/tests/unit_python/helpers/policy_helper.py
+++ b/moon_manager/tests/unit_python/helpers/policy_helper.py
@@ -3,6 +3,8 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+from uuid import uuid4
+
def get_policies():
from python_moondb.core import PolicyManager
return PolicyManager.get_policies("admin")
@@ -12,7 +14,7 @@ def add_policies(policy_id=None, value=None):
from python_moondb.core import PolicyManager
if not value:
value = {
- "name": "test_policy",
+ "name": "test_policy"+ uuid4().hex,
"model_id": "",
"genre": "authz",
"description": "test",
diff --git a/moon_manager/tests/unit_python/requirements.txt b/moon_manager/tests/unit_python/requirements.txt
index 6c6e5bb8..d6f190e4 100644
--- a/moon_manager/tests/unit_python/requirements.txt
+++ b/moon_manager/tests/unit_python/requirements.txt
@@ -1,5 +1,5 @@
flask
flask_cors
flask_restful
-python_moondb
-python_moonutilities
+python_moondb==1.2.20
+python_moonutilities==1.4.20