aboutsummaryrefslogtreecommitdiffstats
path: root/old/moon_manager/tests/unit_python
diff options
context:
space:
mode:
Diffstat (limited to 'old/moon_manager/tests/unit_python')
-rw-r--r--old/moon_manager/tests/unit_python/api/import_export_utilities.py202
-rw-r--r--old/moon_manager/tests/unit_python/api/meta_data_test.py238
-rw-r--r--old/moon_manager/tests/unit_python/api/meta_rules_test.py162
-rw-r--r--old/moon_manager/tests/unit_python/api/test_assignement.py280
-rw-r--r--old/moon_manager/tests/unit_python/api/test_assignemnt.py270
-rw-r--r--old/moon_manager/tests/unit_python/api/test_data.py239
-rw-r--r--old/moon_manager/tests/unit_python/api/test_export.py282
-rw-r--r--old/moon_manager/tests/unit_python/api/test_import.py510
-rw-r--r--old/moon_manager/tests/unit_python/api/test_meta_data.py305
-rw-r--r--old/moon_manager/tests/unit_python/api/test_meta_rules.py415
-rw-r--r--old/moon_manager/tests/unit_python/api/test_pdp.py164
-rw-r--r--old/moon_manager/tests/unit_python/api/test_perimeter.py1028
-rw-r--r--old/moon_manager/tests/unit_python/api/test_policies.py342
-rw-r--r--old/moon_manager/tests/unit_python/api/test_rules.py129
-rw-r--r--old/moon_manager/tests/unit_python/api/test_unit_models.py352
-rw-r--r--old/moon_manager/tests/unit_python/api/utilities.py26
-rw-r--r--old/moon_manager/tests/unit_python/conftest.py254
-rw-r--r--old/moon_manager/tests/unit_python/helpers/__init__.py0
-rw-r--r--old/moon_manager/tests/unit_python/helpers/assignment_helper.py49
-rw-r--r--old/moon_manager/tests/unit_python/helpers/category_helper.py40
-rw-r--r--old/moon_manager/tests/unit_python/helpers/data_builder.py260
-rw-r--r--old/moon_manager/tests/unit_python/helpers/data_helper.py99
-rw-r--r--old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py49
-rw-r--r--old/moon_manager/tests/unit_python/helpers/model_helper.py48
-rw-r--r--old/moon_manager/tests/unit_python/helpers/pdp_helper.py23
-rw-r--r--old/moon_manager/tests/unit_python/helpers/policy_helper.py63
-rw-r--r--old/moon_manager/tests/unit_python/requirements.txt5
27 files changed, 5834 insertions, 0 deletions
diff --git a/old/moon_manager/tests/unit_python/api/import_export_utilities.py b/old/moon_manager/tests/unit_python/api/import_export_utilities.py
new file mode 100644
index 00000000..2ee2627d
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/import_export_utilities.py
@@ -0,0 +1,202 @@
+# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import api.test_unit_models as test_models
+import api.test_policies as test_policies
+import api.test_perimeter as test_perimeter
+import api.test_meta_data as test_categories
+import api.test_data as test_data
+import api.test_meta_rules as test_meta_rules
+import api.test_assignement as test_assignments
+import api.test_rules as test_rules
+import logging
+
+logger = logging.getLogger("moon.manager.test.api." + __name__)
+
+
+def clean_models(client):
+ req, models = test_models.get_models(client)
+ for key in models["models"]:
+ client.delete("/models/{}".format(key))
+
+
+def clean_policies(client):
+ req, policies = test_policies.get_policies(client)
+ for key in policies["policies"]:
+ req = client.delete("/policies/{}".format(key))
+ assert req.status_code == 200
+
+
+def clean_subjects(client):
+ subjects = test_perimeter.get_subjects(client)
+ logger.info("subjects {}".format(subjects))
+ for key in subjects[1]["subjects"]:
+ subject = subjects[1]["subjects"][key]
+ policy_keys = subject["policy_list"]
+ logger.info("subjects policy_keys {}".format(policy_keys))
+ for policy_key in policy_keys:
+ client.delete("/policies/{}/subjects/{}".format(policy_key, key))
+
+
+def clean_objects(client):
+ objects = test_perimeter.get_objects(client)
+ logger.info("objects {}".format(objects))
+ for key in objects[1]["objects"]:
+ object_ = objects[1]["objects"][key]
+ policy_keys = object_["policy_list"]
+ logger.info("objects policy_keys {}".format(policy_keys))
+ for policy_key in policy_keys:
+ client.delete("/policies/{}/objects/{}".format(policy_key, key))
+
+
+def clean_actions(client):
+ actions = test_perimeter.get_actions(client)
+ actions = test_perimeter.get_actions(client)
+ logger.info("actions {}".format(actions))
+ for key in actions[1]["actions"]:
+ action = actions[1]["actions"][key]
+ policy_keys = action["policy_list"]
+ logger.info("action policy_keys {}".format(policy_keys))
+ for policy_key in policy_keys:
+ client.delete("/policies/{}/actions/{}".format(policy_key, key))
+
+
+def clean_subject_categories(client):
+ req, categories = test_categories.get_subject_categories(client)
+ logger.info(categories)
+ for key in categories["subject_categories"]:
+ client.delete("/subject_categories/{}".format(key))
+
+
+def clean_object_categories(client):
+ req, categories = test_categories.get_object_categories(client)
+ logger.info(categories)
+ for key in categories["object_categories"]:
+ client.delete("/object_categories/{}".format(key))
+
+
+def clean_action_categories(client):
+ req, categories = test_categories.get_action_categories(client)
+ logger.info(categories)
+ for key in categories["action_categories"]:
+ client.delete("/action_categories/{}".format(key))
+
+
+def clean_subject_data(client):
+ req, policies = test_policies.get_policies(client)
+ logger.info("clean_subject_data on {}".format(policies))
+ for policy_key in policies["policies"]:
+ req, data = test_data.get_subject_data(client, policy_id=policy_key)
+ logger.info("============= data {}".format(data))
+ for data_item in data["subject_data"]:
+ if data_item["data"]:
+ for data_id in data_item["data"]:
+ logger.info("============= Deleting {}/{}".format(policy_key, data_id))
+ client.delete("/policies/{}/subject_data/{}/{}".format(policy_key, data_item['category_id'], data_id))
+
+
+def clean_object_data(client):
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, data = test_data.get_object_data(client, policy_id=policy_key)
+ for data_item in data["object_data"]:
+ if data_item["data"]:
+ for data_id in data_item["data"]:
+ logger.info("============= object_data {}/{}".format(policy_key, data_id))
+ client.delete("/policies/{}/object_data/{}/{}".format(policy_key, data_item['category_id'], data_id))
+
+
+def clean_action_data(client):
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, data = test_data.get_action_data(client, policy_id=policy_key)
+ for data_item in data["action_data"]:
+ if data_item["data"]:
+ for data_id in data_item["data"]:
+ logger.info("============= action_data {}/{}".format(policy_key, data_id))
+ client.delete("/policies/{}/action_data/{}/{}".format(policy_key, data_item['category_id'], data_id))
+
+
+def clean_meta_rule(client):
+ req, meta_rules = test_meta_rules.get_meta_rules(client)
+ meta_rules = meta_rules["meta_rules"]
+ for meta_rule_key in meta_rules:
+ logger.info("clean_meta_rule.meta_rule_key={}".format(meta_rule_key))
+ logger.info("clean_meta_rule.meta_rule={}".format(meta_rules[meta_rule_key]))
+ client.delete("/meta_rules/{}".format(meta_rule_key))
+
+
+def clean_subject_assignments(client):
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, assignments = test_assignments.get_subject_assignment(client, policy_key)
+ for key in assignments["subject_assignments"]:
+ subject_key = assignments["subject_assignments"][key]["subject_id"]
+ cat_key = assignments["subject_assignments"][key]["category_id"]
+ data_keys = assignments["subject_assignments"][key]["assignments"]
+ for data_key in data_keys:
+ client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_key, subject_key,
+ cat_key, data_key))
+
+
+def clean_object_assignments(client):
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, assignments = test_assignments.get_object_assignment(client, policy_key)
+ for key in assignments["object_assignments"]:
+ object_key = assignments["object_assignments"][key]["object_id"]
+ cat_key = assignments["object_assignments"][key]["category_id"]
+ data_keys = assignments["object_assignments"][key]["assignments"]
+ for data_key in data_keys:
+ client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_key, object_key,
+ cat_key, data_key))
+
+
+def clean_action_assignments(client):
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, assignments = test_assignments.get_action_assignment(client, policy_key)
+ for key in assignments["action_assignments"]:
+ action_key = assignments["action_assignments"][key]["action_id"]
+ cat_key = assignments["action_assignments"][key]["category_id"]
+ data_keys = assignments["action_assignments"][key]["assignments"]
+ for data_key in data_keys:
+ client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_key, action_key,
+ cat_key, data_key))
+
+
+def clean_rules(client):
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, rules = test_rules.get_rules(client, policy_key)
+ rules = rules["rules"]["rules"]
+ for rule_key in rules:
+ req = client.delete("/policies/{}/rules/{}".format(policy_key, rule_key["id"]))
+
+
+def clean_all(client):
+ clean_rules(client)
+
+ clean_subject_assignments(client)
+ clean_object_assignments(client)
+ clean_action_assignments(client)
+
+
+ clean_subject_data(client)
+ clean_object_data(client)
+ clean_action_data(client)
+
+ clean_actions(client)
+ clean_objects(client)
+ clean_subjects(client)
+
+ clean_subject_categories(client)
+ clean_object_categories(client)
+ clean_action_categories(client)
+
+
+ clean_policies(client)
+ clean_models(client)
+ clean_meta_rule(client) \ No newline at end of file
diff --git a/old/moon_manager/tests/unit_python/api/meta_data_test.py b/old/moon_manager/tests/unit_python/api/meta_data_test.py
new file mode 100644
index 00000000..8609f0b5
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/meta_data_test.py
@@ -0,0 +1,238 @@
+import json
+import api.utilities as utilities
+
+#subject_categories_test
+
+
+def get_subject_categories(client):
+ req = client.get("/subject_categories")
+ subject_categories = utilities.get_json(req.data)
+ return req, subject_categories
+
+
+def add_subject_categories(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/subject_categories", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_categories = utilities.get_json(req.data)
+ return req, subject_categories
+
+
+def delete_subject_categories(client, name):
+ request, subject_categories = get_subject_categories(client)
+ for key, value in subject_categories['subject_categories'].items():
+ if value['name'] == name:
+ req = client.delete("/subject_categories/{}".format(key))
+ break
+ return req
+
+
+def delete_subject_categories_without_id(client):
+ req = client.delete("/subject_categories/{}".format(""))
+ return req
+
+
+def test_get_subject_categories():
+ client = utilities.register_client()
+ req, subject_categories = get_subject_categories(client)
+ assert req.status_code == 200
+ assert isinstance(subject_categories, dict)
+ assert "subject_categories" in subject_categories
+
+
+def test_add_subject_categories():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(subject_categories, dict)
+ value = list(subject_categories["subject_categories"].values())[0]
+ assert "subject_categories" in subject_categories
+ assert value['name'] == "testuser"
+ assert value['description'] == "description of {}".format("testuser")
+
+
+def test_add_subject_categories_with_empty_user():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, "")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "Empty String"
+
+
+def test_add_subject_categories_with_user_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, "test user")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "String contains space"
+
+
+def test_delete_subject_categories():
+ client = utilities.register_client()
+ req = delete_subject_categories(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_subject_categories_without_id():
+ client = utilities.register_client()
+ req = delete_subject_categories_without_id(client)
+ assert req.status_code == 500
+
+
+#---------------------------------------------------------------------------
+#object_categories_test
+
+def get_object_categories(client):
+ req = client.get("/object_categories")
+ object_categories = utilities.get_json(req.data)
+ return req, object_categories
+
+
+def add_object_categories(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/object_categories", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_categories = utilities.get_json(req.data)
+ return req, object_categories
+
+
+def delete_object_categories(client, name):
+ request, object_categories = get_object_categories(client)
+ for key, value in object_categories['object_categories'].items():
+ if value['name'] == name:
+ req = client.delete("/object_categories/{}".format(key))
+ break
+ return req
+
+
+def delete_object_categories_without_id(client):
+ req = client.delete("/object_categories/{}".format(""))
+ return req
+
+
+def test_get_object_categories():
+ client = utilities.register_client()
+ req, object_categories = get_object_categories(client)
+ assert req.status_code == 200
+ assert isinstance(object_categories, dict)
+ assert "object_categories" in object_categories
+
+
+def test_add_object_categories():
+ client = utilities.register_client()
+ req, object_categories = add_object_categories(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(object_categories, dict)
+ value = list(object_categories["object_categories"].values())[0]
+ assert "object_categories" in object_categories
+ assert value['name'] == "testuser"
+ assert value['description'] == "description of {}".format("testuser")
+
+
+def test_add_object_categories_with_empty_user():
+ client = utilities.register_client()
+ req, object_categories = add_object_categories(client, "")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "Empty String"
+
+
+def test_add_object_categories_with_user_contain_space():
+ client = utilities.register_client()
+ req, object_categories = add_object_categories(client, "test user")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "String contains space"
+
+
+def test_delete_object_categories():
+ client = utilities.register_client()
+ req = delete_object_categories(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_object_categories_without_id():
+ client = utilities.register_client()
+ req = delete_object_categories_without_id(client)
+ assert req.status_code == 500
+
+
+#---------------------------------------------------------------------------
+#action_categories_test
+
+def get_action_categories(client):
+ req = client.get("/action_categories")
+ action_categories = utilities.get_json(req.data)
+ return req, action_categories
+
+
+def add_action_categories(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/action_categories", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_categories = utilities.get_json(req.data)
+ return req, action_categories
+
+
+def delete_action_categories(client, name):
+ request, action_categories = get_action_categories(client)
+ for key, value in action_categories['action_categories'].items():
+ if value['name'] == name:
+ req = client.delete("/action_categories/{}".format(key))
+ break
+ return req
+
+
+def delete_action_categories_without_id(client):
+ req = client.delete("/action_categories/{}".format(""))
+ return req
+
+
+def test_get_action_categories():
+ client = utilities.register_client()
+ req, action_categories = get_action_categories(client)
+ assert req.status_code == 200
+ assert isinstance(action_categories, dict)
+ assert "action_categories" in action_categories
+
+
+def test_add_action_categories():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(action_categories, dict)
+ value = list(action_categories["action_categories"].values())[0]
+ assert "action_categories" in action_categories
+ assert value['name'] == "testuser"
+ assert value['description'] == "description of {}".format("testuser")
+
+
+def test_add_action_categories_with_empty_user():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "Empty String"
+
+
+def test_add_action_categories_with_user_contain_space():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "test user")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "String contains space"
+
+
+def test_delete_action_categories():
+ client = utilities.register_client()
+ req = delete_action_categories(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_action_categories_without_id():
+ client = utilities.register_client()
+ req = delete_action_categories_without_id(client)
+ assert req.status_code == 500
diff --git a/old/moon_manager/tests/unit_python/api/meta_rules_test.py b/old/moon_manager/tests/unit_python/api/meta_rules_test.py
new file mode 100644
index 00000000..a87c16f3
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/meta_rules_test.py
@@ -0,0 +1,162 @@
+import json
+import api.utilities as utilities
+
+
+def get_meta_rules(client):
+ req = client.get("/meta_rules")
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def add_meta_rules(client, name):
+ data = {
+ "name": name,
+ "subject_categories": ["subject_category_id1",
+ "subject_category_id2"],
+ "object_categories": ["object_category_id1"],
+ "action_categories": ["action_category_id1"]
+ }
+ req = client.post("/meta_rules", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def add_meta_rules_without_subject_category_ids(client, name):
+ data = {
+ "name": name,
+ "subject_categories": [],
+ "object_categories": ["object_category_id1"],
+ "action_categories": ["action_category_id1"]
+ }
+ req = client.post("/meta_rules", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def update_meta_rules(client, name, metaRuleId):
+ data = {
+ "name": name,
+ "subject_categories": ["subject_category_id1_update",
+ "subject_category_id2_update"],
+ "object_categories": ["object_category_id1_update"],
+ "action_categories": ["action_category_id1_update"]
+ }
+ req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def update_meta_rules_without_subject_category_ids(client, name):
+ data = {
+ "name": name,
+ "subject_categories": [],
+ "object_categories": ["object_category_id1"],
+ "action_categories": ["action_category_id1"]
+ }
+ req = client.post("/meta_rules", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def delete_meta_rules(client, name):
+ request, meta_rules = get_meta_rules(client)
+ for key, value in meta_rules['meta_rules'].items():
+ if value['name'] == name:
+ req = client.delete("/meta_rules/{}".format(key))
+ break
+ return req
+
+
+def delete_meta_rules_without_id(client):
+ req = client.delete("/meta_rules/{}".format(""))
+ return req
+
+
+def test_get_meta_rules():
+ client = utilities.register_client()
+ req, meta_rules = get_meta_rules(client)
+ assert req.status_code == 200
+ assert isinstance(meta_rules, dict)
+ assert "meta_rules" in meta_rules
+
+
+def test_add_meta_rules():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(meta_rules, dict)
+ value = list(meta_rules["meta_rules"].values())[0]
+ assert "meta_rules" in meta_rules
+ assert value['name'] == "testuser"
+ assert value["subject_categories"][0] == "subject_category_id1"
+ assert value["object_categories"][0] == "object_category_id1"
+ assert value["action_categories"][0] == "action_category_id1"
+
+
+def test_add_meta_rules_with_empty_user():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules(client, "")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "Empty String"
+
+
+def test_add_meta_rules_with_user_contain_space():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules(client, "test user")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == "String contains space"
+
+
+def test_add_meta_rules_without_subject_categories():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules_without_subject_category_ids(client, "testuser")
+ assert req.status_code == 500
+ assert json.loads(req.data)["message"] == 'Empty Container'
+
+
+def test_delete_meta_rules():
+ client = utilities.register_client()
+ req = delete_meta_rules(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_meta_rules_without_id():
+ client = utilities.register_client()
+ req = delete_meta_rules_without_id(client)
+ assert req.status_code == 500
+
+
+def test_update_meta_rules():
+ client = utilities.register_client()
+ req = add_meta_rules(client, "testuser")
+ meta_rule_id = list(req[1]['meta_rules'])[0]
+ req_update = update_meta_rules(client, "testuser", meta_rule_id)
+ assert req_update[0].status_code == 200
+ value = list(req_update[1]["meta_rules"].values())[0]
+ assert value["subject_categories"][0] == "subject_category_id1_update"
+ delete_meta_rules(client, "testuser")
+ get_meta_rules(client)
+
+
+def test_update_meta_rules_without_id():
+ client = utilities.register_client()
+ req_update = update_meta_rules(client, "testuser", "")
+ assert req_update[0].status_code == 500
+
+
+def test_update_meta_rules_without_user():
+ client = utilities.register_client()
+ req_update = update_meta_rules(client, "", "")
+ assert req_update[0].status_code == 500
+ assert json.loads(req_update[0].data)["message"] == "Empty String"
+
+
+def test_update_meta_rules_without_subject_categories():
+ client = utilities.register_client()
+ req_update = update_meta_rules_without_subject_category_ids(client, "testuser")
+ assert req_update[0].status_code == 500
+ assert json.loads(req_update[0].data)["message"] == "Empty Container"
diff --git a/old/moon_manager/tests/unit_python/api/test_assignement.py b/old/moon_manager/tests/unit_python/api/test_assignement.py
new file mode 100644
index 00000000..b56fb420
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_assignement.py
@@ -0,0 +1,280 @@
+import api.utilities as utilities
+import json
+from helpers import data_builder as builder
+from uuid import uuid4
+
+
+# subject_categories_test
+
+
+def get_subject_assignment(client, policy_id):
+ req = client.get("/policies/{}/subject_assignments".format(policy_id))
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ subject_id = builder.create_subject(policy_id)
+ data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ data = {
+ "id": subject_id,
+ "category_id": subject_category_id,
+ "data_id": data_id
+ }
+ req = client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment_without_cat_id(client):
+
+ data = {
+ "id": "subject_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/subject_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def delete_subject_assignment(client, policy_id, sub_id, cat_id,data_id):
+ req = client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_id, sub_id, cat_id,data_id))
+ return req
+
+
+def test_add_subject_assignment():
+ client = utilities.register_client()
+ req, subject_assignment = add_subject_assignment(client)
+ assert req.status_code == 200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+# def test_add_subject_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, subject_assignment = add_subject_assignment_without_cat_id(client)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_get_subject_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+def test_delete_subject_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(client, policy_id)
+ value = subject_assignment["subject_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_subject_assignment(client,
+ policy_id,
+ value[_id]['subject_id'],
+ value[_id]['category_id'],
+ value[_id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_subject_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_subject_assignment(client, "", "id1", "111", "data_id1")
+ assert success_req.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+# object_categories_test
+
+
+def get_object_assignment(client, policy_id):
+ req = client.get("/policies/{}/object_assignments".format(policy_id))
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ object_id = builder.create_object(policy_id)
+ data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id)
+
+ data = {
+ "id": object_id,
+ "category_id": object_category_id,
+ "data_id": data_id
+ }
+
+ req = client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment_without_cat_id(client):
+
+ data = {
+ "id": "object_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/object_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def delete_object_assignment(client, policy_id, obj_id, cat_id, data_id):
+ req = client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_id, obj_id, cat_id, data_id))
+ return req
+
+
+def test_get_object_assignment():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ client = utilities.register_client()
+ req, object_assignment = get_object_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(object_assignment, dict)
+ assert "object_assignments" in object_assignment
+
+
+def test_add_object_assignment():
+ client = utilities.register_client()
+ req, object_assignment = add_object_assignment(client)
+ assert req.status_code == 200
+ assert "object_assignments" in object_assignment
+
+
+# def test_add_object_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, object_assignment = add_object_assignment_without_cat_id(client)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_object_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(client, policy_id)
+ value = object_assignment["object_assignments"]
+ _id = list(value.keys())[0]
+ success_req = delete_object_assignment(client,
+ policy_id,
+ value[_id]['object_id'],
+ value[_id]['category_id'],
+ value[_id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_object_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_object_assignment(client, "", "id1", "111", "data_id1")
+ assert success_req.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+# action_categories_test
+
+
+def get_action_assignment(client, policy_id):
+ req = client.get("/policies/{}/action_assignments".format(policy_id))
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ action_id = builder.create_action(policy_id)
+ data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id)
+
+ data = {
+ "id": action_id,
+ "category_id": action_category_id,
+ "data_id": data_id
+ }
+ req = client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment_without_cat_id(client):
+
+ data = {
+ "id": "action_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/action_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def delete_action_assignment(client, policy_id, action_id, cat_id, data_id):
+ req = client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_id, action_id, cat_id, data_id))
+ return req
+
+
+def test_get_action_assignment():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ client = utilities.register_client()
+ req, action_assignment = get_action_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(action_assignment, dict)
+ assert "action_assignments" in action_assignment
+
+
+def test_add_action_assignment():
+ client = utilities.register_client()
+ req, action_assignment = add_action_assignment(client)
+ assert req.status_code == 200
+ assert "action_assignments" in action_assignment
+
+
+# def test_add_action_assignment_without_cat_id():
+# client = utilities.register_client()
+# req, action_assignment = add_action_assignment_without_cat_id(client)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_action_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(client, policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_action_assignment(client,
+ policy_id,
+ value[id]['action_id'],
+ value[id]['category_id'],
+ value[id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_action_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_action_assignment(client, "", "id1", "111", "data_id1")
+ assert success_req.status_code == 404
+
+# ---------------------------------------------------------------------------
diff --git a/old/moon_manager/tests/unit_python/api/test_assignemnt.py b/old/moon_manager/tests/unit_python/api/test_assignemnt.py
new file mode 100644
index 00000000..22c727af
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_assignemnt.py
@@ -0,0 +1,270 @@
+import api.utilities as utilities
+import json
+from helpers import data_builder as builder
+from uuid import uuid4
+
+
+# subject_categories_test
+
+
+def get_subject_assignment(client, policy_id):
+ req = client.get("/policies/{}/subject_assignments".format(policy_id))
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ subject_id = builder.create_subject(policy_id)
+ data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ data = {
+ "id": subject_id,
+ "category_id": subject_category_id,
+ "data_id": data_id
+ }
+ req = client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def add_subject_assignment_without_cat_id(client):
+
+ data = {
+ "id": "subject_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/subject_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_assignment = utilities.get_json(req.data)
+ return req, subject_assignment
+
+
+def delete_subject_assignment(client, policy_id, sub_id, cat_id,data_id):
+ req = client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_id, sub_id, cat_id,data_id))
+ return req
+
+
+def test_add_subject_assignment():
+ client = utilities.register_client()
+ req, subject_assignment = add_subject_assignment(client)
+ assert req.status_code == 200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+def test_add_subject_assignment_without_cat_id():
+ client = utilities.register_client()
+ req, subject_assignment = add_subject_assignment_without_cat_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_get_subject_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(subject_assignment, dict)
+ assert "subject_assignments" in subject_assignment
+
+
+def test_delete_subject_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_subject_assignment()
+ req, subject_assignment = get_subject_assignment(client, policy_id)
+ value = subject_assignment["subject_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_subject_assignment(client, policy_id, value[id]['subject_id'], value[id]['category_id'],value[id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_subject_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_subject_assignment(client, "", "id1", "111" ,"data_id1")
+ assert success_req.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+
+# object_categories_test
+
+
+def get_object_assignment(client, policy_id):
+ req = client.get("/policies/{}/object_assignments".format(policy_id))
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ object_id = builder.create_object(policy_id)
+ data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id)
+
+ data = {
+ "id": object_id,
+ "category_id": object_category_id,
+ "data_id": data_id
+ }
+
+ req = client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def add_object_assignment_without_cat_id(client):
+
+ data = {
+ "id": "object_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/object_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_assignment = utilities.get_json(req.data)
+ return req, object_assignment
+
+
+def delete_object_assignment(client, policy_id, obj_id, cat_id, data_id):
+ req = client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_id, obj_id, cat_id, data_id))
+ return req
+
+
+def test_get_object_assignment():
+ policy_id = builder.get_policy_id_with_object_assignment()
+ client = utilities.register_client()
+ req, object_assignment = get_object_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(object_assignment, dict)
+ assert "object_assignments" in object_assignment
+
+
+def test_add_object_assignment():
+ client = utilities.register_client()
+ req, object_assignment = add_object_assignment(client)
+ assert req.status_code == 200
+ assert "object_assignments" in object_assignment
+
+
+def test_add_object_assignment_without_cat_id():
+ client = utilities.register_client()
+ req, object_assignment = add_object_assignment_without_cat_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_object_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_object_assignment()
+ req, object_assignment = get_object_assignment(client, policy_id)
+ value = object_assignment["object_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_object_assignment(client, policy_id, value[id]['object_id'], value[id]['category_id'],value[id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_object_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_object_assignment(client, "", "id1", "111","data_id1")
+ assert success_req.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+
+# action_categories_test
+
+
+def get_action_assignment(client, policy_id):
+ req = client.get("/policies/{}/action_assignments".format(policy_id))
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ action_id = builder.create_action(policy_id)
+ data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id)
+
+ data = {
+ "id": action_id,
+ "category_id": action_category_id,
+ "data_id": data_id
+ }
+ req = client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def add_action_assignment_without_cat_id(client):
+
+ data = {
+ "id": "action_id",
+ "category_id": "",
+ "data_id": "data_id"
+ }
+ req = client.post("/policies/{}/action_assignments".format("1111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_assignment = utilities.get_json(req.data)
+ return req, action_assignment
+
+
+def delete_action_assignment(client, policy_id, action_id, cat_id, data_id):
+ req = client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_id, action_id, cat_id, data_id))
+ return req
+
+
+def test_get_action_assignment():
+ policy_id = builder.get_policy_id_with_action_assignment()
+ client = utilities.register_client()
+ req, action_assignment = get_action_assignment(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(action_assignment, dict)
+ assert "action_assignments" in action_assignment
+
+
+def test_add_action_assignment():
+ client = utilities.register_client()
+ req, action_assignment = add_action_assignment(client)
+ assert req.status_code == 200
+ assert "action_assignments" in action_assignment
+
+
+def test_add_action_assignment_without_cat_id():
+ client = utilities.register_client()
+ req, action_assignment = add_action_assignment_without_cat_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]"
+
+
+def test_delete_action_assignment():
+ client = utilities.register_client()
+ policy_id = builder.get_policy_id_with_action_assignment()
+ req, action_assignment = get_action_assignment(client, policy_id)
+ value = action_assignment["action_assignments"]
+ id = list(value.keys())[0]
+ success_req = delete_action_assignment(client, policy_id, value[id]['action_id'], value[id]['category_id'],value[id]['assignments'][0])
+ assert success_req.status_code == 200
+
+
+def test_delete_action_assignment_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_action_assignment(client, "", "id1", "111" ,"data_id1")
+ assert success_req.status_code == 404
+
+# ---------------------------------------------------------------------------
diff --git a/old/moon_manager/tests/unit_python/api/test_data.py b/old/moon_manager/tests/unit_python/api/test_data.py
new file mode 100644
index 00000000..433f69e6
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_data.py
@@ -0,0 +1,239 @@
+# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import api.utilities as utilities
+import json
+from helpers import data_builder as builder
+from uuid import uuid4
+
+# subject_categories_test
+
+
+def get_subject_data(client, policy_id, category_id=None):
+ if category_id is None:
+ req = client.get("/policies/{}/subject_data".format(policy_id))
+ else:
+ req = client.get("/policies/{}/subject_data/{}".format(policy_id, category_id))
+ subject_data = utilities.get_json(req.data)
+ return req, subject_data
+
+
+def add_subject_data(client, name):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/policies/{}/subject_data/{}".format(policy_id, subject_category_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_data = utilities.get_json(req.data)
+ return req, subject_data
+
+
+def delete_subject_data(client, policy_id, category_id, data_id):
+ req = client.delete("/policies/{}/subject_data/{}/{}".format(policy_id,category_id,data_id))
+ return req
+
+
+def test_get_subject_data():
+ policy_id = utilities.get_policy_id()
+ client = utilities.register_client()
+ req, subject_data = get_subject_data(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(subject_data, dict)
+ assert "subject_data" in subject_data
+
+
+def test_add_subject_data():
+ client = utilities.register_client()
+ req, subject_data = add_subject_data(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(subject_data, dict)
+ value = subject_data["subject_data"]['data']
+ assert "subject_data" in subject_data
+ id = list(value.keys())[0]
+ assert value[id]['name'] == "testuser"
+ assert value[id]['description'] == "description of {}".format("testuser")
+
+
+def test_delete_subject_data():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id,policy_id = builder.create_new_policy()
+ data_id = builder.create_subject_data(policy_id,subject_category_id)
+ success_req = delete_subject_data(client, policy_id, subject_category_id, data_id )
+ assert success_req.status_code == 200
+
+
+def test_add_subject_data_with_forbidden_char_in_user():
+ client = utilities.register_client()
+ req, subject_data = add_subject_data(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_subject_data_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_subject_data(client, "", "", "")
+ assert success_req.status_code == 404
+
+# ---------------------------------------------------------------------------
+# object_categories_test
+
+
+def get_object_data(client, policy_id, category_id=None):
+ if category_id is None:
+ req = client.get("/policies/{}/object_data".format(policy_id))
+ else:
+ req = client.get("/policies/{}/object_data/{}".format(policy_id, category_id))
+ object_data = utilities.get_json(req.data)
+ return req, object_data
+
+
+def add_object_data(client, name):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/policies/{}/object_data/{}".format(policy_id, object_category_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_data = utilities.get_json(req.data)
+ return req, object_data
+
+
+def delete_object_data(client, policy_id, category_id, data_id):
+ req = client.delete("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id))
+ return req
+
+
+def test_get_object_data():
+ policy_id = utilities.get_policy_id()
+ client = utilities.register_client()
+ req, object_data = get_object_data(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(object_data, dict)
+ assert "object_data" in object_data
+
+
+def test_add_object_data():
+ client = utilities.register_client()
+ req, object_data = add_object_data(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(object_data, dict)
+ value = object_data["object_data"]['data']
+ assert "object_data" in object_data
+ _id = list(value.keys())[0]
+ assert value[_id]['name'] == "testuser"
+ assert value[_id]['description'] == "description of {}".format("testuser")
+
+
+def test_delete_object_data():
+ client = utilities.register_client()
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_object_data(policy_id, object_category_id)
+
+ success_req = delete_object_data(client, policy_id, data_id, object_category_id)
+ assert success_req.status_code == 200
+
+
+def test_add_object_data_with_forbidden_char_in_user():
+ client = utilities.register_client()
+ req, subject_data = add_object_data(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_object_data_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_object_data(client, "", "", "")
+ assert success_req.status_code == 404
+
+# ---------------------------------------------------------------------------
+# action_categories_test
+
+
+def get_action_data(client, policy_id, category_id=None):
+ if category_id is None:
+ req = client.get("/policies/{}/action_data".format(policy_id))
+ else:
+ req = client.get("/policies/{}/action_data/{}".format(policy_id, category_id))
+ action_data = utilities.get_json(req.data)
+ return req, action_data
+
+
+def add_action_data(client, name):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/policies/{}/action_data/{}".format(policy_id, action_category_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_data = utilities.get_json(req.data)
+ return req, action_data
+
+
+def delete_action_data(client, policy_id, categorgy_id, data_id):
+ req = client.delete("/policies/{}/action_data/{}/{}".format(policy_id, categorgy_id, data_id))
+ return req
+
+
+def test_get_action_data():
+ policy_id = utilities.get_policy_id()
+ client = utilities.register_client()
+ req, action_data = get_action_data(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(action_data, dict)
+ assert "action_data" in action_data
+
+
+def test_add_action_data():
+ client = utilities.register_client()
+ req, action_data = add_action_data(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(action_data, dict)
+ value = action_data["action_data"]['data']
+ assert "action_data" in action_data
+ id = list(value.keys())[0]
+ assert value[id]['name'] == "testuser"
+ assert value[id]['description'] == "description of {}".format("testuser")
+
+
+def test_delete_action_data():
+ client = utilities.register_client()
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ data_id = builder.create_action_data(policy_id, action_category_id)
+
+ success_req = delete_action_data(client, policy_id, data_id, action_category_id)
+
+ assert success_req.status_code == 200
+
+
+def test_add_action_data_with_forbidden_char_in_user():
+ client = utilities.register_client()
+ req, action_data = add_action_data(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_action_data_without_policy_id():
+ client = utilities.register_client()
+ success_req = delete_action_data(client, "", "", "")
+ assert success_req.status_code == 404
+# ---------------------------------------------------------------------------
diff --git a/old/moon_manager/tests/unit_python/api/test_export.py b/old/moon_manager/tests/unit_python/api/test_export.py
new file mode 100644
index 00000000..ac8e8d17
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_export.py
@@ -0,0 +1,282 @@
+# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import json
+import api.utilities as utilities
+import api.import_export_utilities as import_export_utilities
+
+
+MODEL_WITHOUT_META_RULES = {"models": [{"name": "test model", "description": "model description", "meta_rules": []}]}
+
+POLICIES = {"models": [{"name": "test model", "description": "", "meta_rules": []}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}]}
+
+SUBJECTS_OBJECTS_ACTIONS = {"models": [{"name": "test model", "description": "", "meta_rules": []}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}],
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}]}
+
+
+SUBJECT_OBJECT_ACTION_CATEGORIES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action category description"}]}
+
+SUBJECT_OBJECT_ACTION_DATA = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}],
+ "subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action category description"}],
+ "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}],
+ "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}
+
+
+META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "object action description"}],
+ "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}
+
+
+ASSIGNMENTS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}],
+ "subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action category description"}],
+ "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}],
+ "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}],
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object e0", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action e0", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}],
+ "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}],
+ "object_assignments": [{"object": {"name": "test object e0"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}],
+ "action_assignments": [{"action": {"name": "test action e0"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}]}
+
+RULES = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}],
+ "subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action category description"}],
+ "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}],
+ "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}],
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object e1", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action e1", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}],
+ "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}],
+ "object_assignments": [{"object": {"name": "test object e1"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}],
+ "action_assignments": [{"action": {"name": "test action e1"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}],
+ "rules": [{"meta_rule": {"name": "meta rule"}, "rule": {"subject_data": [{"name": "test subject data"}], "object_data": [{"name": "test object data"}], "action_data": [{"name": "test action data"}]}, "policy": {"name":"test policy"}, "instructions": {"decision": "grant"}, "enabled": True}]
+ }
+
+
+def test_export_models():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(MODEL_WITHOUT_META_RULES))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+
+ assert "content" in data
+ assert "models" in data["content"]
+ assert isinstance(data["content"]["models"], list)
+ assert len(data["content"]["models"]) == 1
+ model = data["content"]["models"][0]
+ assert model["name"] == "test model"
+ assert model["description"] == "model description"
+ assert isinstance(model["meta_rules"], list)
+ assert len(model["meta_rules"]) == 0
+
+
+def test_export_policies():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(POLICIES))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+
+ assert "content" in data
+ assert "policies" in data["content"]
+ assert isinstance(data["content"]["policies"], list)
+ assert len(data["content"]["policies"]) == 1
+ policy = data["content"]["policies"][0]
+ assert policy["name"] == "test policy"
+ assert policy["genre"] == "authz"
+ assert policy["description"] == "policy description"
+ assert "model" in policy
+ assert "name" in policy["model"]
+ model = policy["model"]
+ assert model["name"] == "test model"
+
+
+def test_export_subject_object_action():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECTS_OBJECTS_ACTIONS))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "s"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ element = data["content"][key][0]
+ if type_element == "subject":
+ assert element["name"] == "testuser"
+ else:
+ assert element["name"] == "test "+ type_element
+ assert element["description"] == "description of the " + type_element
+ assert "policies" in element
+ assert isinstance(element["policies"], list)
+ assert len(element["policies"]) == 1
+ assert isinstance(element["policies"][0], dict)
+ assert element["policies"][0]["name"] == "test policy"
+ assert isinstance(element["extra"], dict)
+ key_dict = "field_extra_" + type_element
+ value_dict = "value extra " + type_element
+ assert key_dict in element["extra"]
+ assert element["extra"][key_dict] == value_dict
+
+
+def test_export_subject_object_action_categories():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECT_OBJECT_ACTION_CATEGORIES))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "_categories"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ category = data["content"][key][0]
+ assert category["name"] == "test " + type_element + " categories"
+ assert category["description"] == type_element + " category description"
+
+
+def test_export_subject_object_action_data():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECT_OBJECT_ACTION_DATA))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "_data"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ data_elt = data["content"][key][0]
+ assert data_elt["name"] == "test " + type_element + " data"
+ assert data_elt["description"] == type_element + " data description"
+ assert isinstance(data_elt["policy"], dict)
+ assert data_elt["policy"]["name"] == "test policy"
+ assert isinstance(data_elt["category"], dict)
+ assert data_elt["category"]["name"] == "test " + type_element + " categories"
+
+
+def test_export_assignments():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(ASSIGNMENTS))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ type_elements = ["subject", "object", "action"]
+ for type_element in type_elements:
+ key = type_element + "_assignments"
+ assert key in data["content"]
+ assert isinstance(data["content"][key], list)
+ assert len(data["content"][key]) == 1
+ assignment_elt = data["content"][key][0]
+ assert type_element in assignment_elt
+ assert isinstance(assignment_elt[type_element], dict)
+ if type_element == "subject":
+ assert assignment_elt[type_element]["name"] == "testuser"
+ else:
+ assert assignment_elt[type_element]["name"] == "test " + type_element + " e0"
+ assert "category" in assignment_elt
+ assert isinstance(assignment_elt["category"], dict)
+ assert assignment_elt["category"]["name"] == "test " + type_element + " categories"
+ assert "assignments" in assignment_elt
+ assert isinstance(assignment_elt["assignments"], list)
+ assert len(assignment_elt["assignments"]) == 1
+ assert assignment_elt["assignments"][0]["name"] == "test " + type_element + " data"
+
+ import_export_utilities.clean_all(client)
+
+
+def test_export_rules():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(RULES))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req = client.get("/export")
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+ assert "content" in data
+ assert "rules" in data["content"]
+ assert isinstance(data["content"]["rules"], list)
+ assert len(data["content"]["rules"]) == 1
+ rule = data["content"]["rules"][0]
+ assert "instructions" in rule
+ assert "decision" in rule["instructions"]
+ assert rule["instructions"]["decision"] == "grant"
+ assert "enabled" in rule
+ assert rule["enabled"]
+ assert "meta_rule" in rule
+ assert rule["meta_rule"]["name"] == "meta rule"
+ assert "policy" in rule
+ assert rule["policy"]["name"] == "test policy"
+ assert "rule" in rule
+ rule = rule["rule"]
+ assert "subject_data" in rule
+ assert isinstance(rule["subject_data"], list)
+ assert len(rule["subject_data"]) == 1
+ assert rule["subject_data"][0]["name"] == "test subject data"
+ assert "object_data" in rule
+ assert isinstance(rule["object_data"], list)
+ assert len(rule["object_data"]) == 1
+ assert rule["object_data"][0]["name"] == "test object data"
+ assert "action_data" in rule
+ assert isinstance(rule["action_data"], list)
+ assert len(rule["action_data"]) == 1
+ assert rule["action_data"][0]["name"] == "test action data"
diff --git a/old/moon_manager/tests/unit_python/api/test_import.py b/old/moon_manager/tests/unit_python/api/test_import.py
new file mode 100644
index 00000000..af5f753a
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_import.py
@@ -0,0 +1,510 @@
+# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import api.utilities as utilities
+import api.test_unit_models as test_models
+import api.test_policies as test_policies
+import api.test_meta_data as test_categories
+import api.test_data as test_data
+import api.test_meta_rules as test_meta_rules
+import api.test_assignement as test_assignments
+import api.test_rules as test_rules
+import api.import_export_utilities as import_export_utilities
+
+import json
+
+
+MODEL_WITHOUT_META_RULES = [
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}]},
+ {"models": [{"name": "test model", "description": "new description", "meta_rules": [], "override": True}]},
+ {"models": [{"name": "test model", "description": "description not taken into account", "meta_rules": [], "override": False}]}
+ ]
+
+POLICIES = [
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "new description not taken into account", "model": {"name" : "test model"}, "mandatory": True}]},
+ {"policies": [{"name": "test policy", "genre": "not authz ?", "description": "generates an exception", "model": {"name" : "test model"}, "override": True}]},
+ {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [{"name": "test policy", "genre": "not authz ?", "description": "changes taken into account", "model": {"name" : "test model"}, "override": True}]},
+]
+
+SUBJECTS = [{"subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]},
+ {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]},
+ {"subjects": [{"name": "testuser", "description": "new description of the subject", "extra": {"email": "new-email@test.com"}, "policies": [{"name": "test other policy"}]}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": [{"name": "test policy"}]}]}]
+
+
+OBJECTS = [
+ {"objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]},
+ {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]},
+ {"objects": [{"name": "test object", "description": "new description of the object",
+ "extra": {"test": "test extra"},
+ "policies": [{"name": "test other policy"}]}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}]},
+]
+
+
+ACTIONS = [{"actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]},
+ {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]},
+ {"actions": [{"name": "test action", "description": "new description of the action", "extra": {"test": "test extra"}, "policies": [{"name": "test other policy"}]}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": [{"name": "test policy"}]}]}]
+
+
+SUBJECT_CATEGORIES = [{"subject_categories": [{"name": "test subject categories", "description": "subject category description"}]},
+ {"subject_categories": [{"name": "test subject categories", "description": "new subject category description"}]}]
+
+
+OBJECT_CATEGORIES = [{"object_categories": [{"name": "test object categories", "description": "object category description"}]},
+ {"object_categories": [{"name": "test object categories", "description": "new object category description"}]}]
+
+
+ACTION_CATEGORIES = [{"action_categories": [{"name": "test action categories", "description": "action category description"}]},
+ {"action_categories": [{"name": "test action categories", "description": "new action category description"}]}]
+
+# meta_rules import is needed otherwise the search for data do not work !!!
+PRE_DATA = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}, {"name": "other good meta rule"}]}],
+ "policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}],
+ "subject_categories": [{"name": "test subject categories", "description": "subject category description"}, {"name": "other test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}, {"name": "other test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "action category description"}, {"name": "other test action categories", "description": "action category description"}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]},
+ {"name": "other good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "other test subject categories"}], "object_categories": [{"name": "other test object categories"}], "action_categories": [{"name": "other test action categories"}]}]}
+
+SUBJECT_DATA = [{"subject_data": [{"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]},
+ {"subject_data": [{"name": "not valid subject data", "description": "", "policies": [{}], "category": {"name": "test subject categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "subject_data": [{"name": "one valid subject data", "description": "description", "policies": [{}], "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}]},
+ {"subject_data": [{"name": "valid subject data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test subject categories"}}]}]
+
+OBJECT_DATA = [{"object_data": [{"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]},
+ {"object_data": [{"name": "not valid object data", "description": "", "policies": [{}], "category": {"name": "test object categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "object_data": [{"name": "one valid object data", "description": "description", "policies": [{}], "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}]},
+ {"object_data": [{"name": "valid object data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test object categories"}}]}]
+
+
+ACTION_DATA = [{"action_data": [{"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]},
+ {"action_data": [{"name": "not valid action data", "description": "", "policies": [{}], "category": {"name": "test action categories"}}]},
+ {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "action_data": [{"name": "one valid action data", "description": "description", "policies": [{}], "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]},
+ {"action_data": [{"name": "valid action data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test action categories"}}]}]
+
+
+PRE_META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "object action description"}]}
+
+META_RULES = [{"meta_rules" :[{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "not valid category"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "not valid category"}], "action_categories": [{"name": "test action categories"}]}]},
+ {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "not valid category"}]}]},
+ {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}]
+
+
+PRE_ASSIGNMENTS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}],
+ "policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name" : "test model"}, "mandatory": True}],
+ "subject_categories": [{"name": "test subject categories", "description": "subject category description"}],
+ "object_categories": [{"name": "test object categories", "description": "object category description"}],
+ "action_categories": [{"name": "test action categories", "description": "object action description"}],
+ "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": [{"name": "test policy"}]}],
+ "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}],
+ "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": [{"name": "test policy"}]}],
+ "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}],
+ "subject_data": [{"name": "subject data", "description": "test subject data", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}],
+ "object_data": [{"name": "object data", "description": "test object data", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}],
+ "action_data": [{"name": "action data", "description": "test action data", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]}
+
+
+SUBJECT_ASSIGNMENTS = [{"subject_assignments": [{"subject": {"name": "unknonw"}, "category" : {"name": "test subject categories"}, "assignments": [{"name": "subject data"}]}]},
+ {"subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "unknown"}, "assignments": [{"name": "subject data"}]}]},
+ {"subject_assignments": [{"subject": {"name": "testuser"}, "category" : {"name": "test subject categories"}, "assignments": [{"name": "unknwon"}]}]},
+ {"subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "subject data"}]}]}]
+
+OBJECT_ASSIGNMENTS = [{"object_assignments": [{"object": {"name": "unknown"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "object data"}]}]},
+ {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "unknown"}, "assignments": [{"name": "object data"}]}]},
+ {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "unknown"}]}]},
+ {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "object data"}]}]}]
+
+ACTION_ASSIGNMENTS = [{"action_assignments": [{"action": {"name": "unknown"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "action data"}]}]},
+ {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "unknown"}, "assignments": [{"name": "action data"}]}]},
+ {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "unknown"}]}]},
+ {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "action data"}]}]}]
+
+RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "unknown subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "unknown object data"}], "action_data": [{"name": "action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "unknown action data"}]}}]},
+ {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}]
+
+
+def test_import_models_without_new_meta_rules():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ counter = 0
+ for models_description in MODEL_WITHOUT_META_RULES:
+ req = client.post("/import", content_type='application/json', data=json.dumps(models_description))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+ req, models = test_models.get_models(client)
+ models = models["models"]
+ assert len(list(models.keys())) == 1
+ values = list(models.values())
+ assert values[0]["name"] == "test model"
+ if counter == 0:
+ assert len(values[0]["description"]) == 0
+ if counter == 1 or counter == 2:
+ assert values[0]["description"] == "new description"
+ counter = counter + 1
+ import_export_utilities.clean_all(client)
+
+
+def test_import_policies():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ counter = -1
+ for policy_description in POLICIES:
+ counter = counter + 1
+ req = client.post("/import", content_type='application/json', data=json.dumps(policy_description))
+ try:
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+ except Exception:
+ assert counter == 2 # this is an expected failure
+ continue
+
+ req, policies = test_policies.get_policies(client)
+ policies = policies["policies"]
+ assert len(list(policies.keys())) == 1
+ values = list(policies.values())
+ assert values[0]["name"] == "test policy"
+ if counter < 3:
+ assert values[0]["genre"] == "authz"
+ assert values[0]["description"] == "description"
+ else:
+ assert values[0]["genre"] == "not authz ?"
+ assert values[0]["description"] == "changes taken into account"
+ assert len(values[0]["model_id"]) > 0
+ import_export_utilities.clean_all(client)
+
+
+def test_import_subject_object_action():
+ client = utilities.register_client()
+ type_elements = ["object", "action"]
+
+ for type_element in type_elements:
+ import_export_utilities.clean_all(client)
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECTS
+ clean_method = import_export_utilities.clean_subjects
+ name = "testuser"
+ key_extra = "email"
+ value_extra = "new-email@test.com"
+ elif type_element == "object":
+ elements = OBJECTS
+ clean_method = import_export_utilities.clean_objects
+ name = "test object"
+ key_extra = "test"
+ value_extra = "test extra"
+ else:
+ elements = ACTIONS
+ clean_method = import_export_utilities.clean_actions
+ name = "test action"
+ key_extra = "test"
+ value_extra = "test extra"
+
+ for element in elements:
+ counter = counter + 1
+ if counter == 2 or counter == 4:
+ clean_method(client)
+
+
+ if counter == 3:
+ req = client.patch("/{}s/{}".format(type_element,perimeter_id), content_type='application/json',
+ data=json.dumps(
+ element["{}s".format(type_element)][0]))
+ else :
+ req = client.post("/import", content_type='application/json',
+ data=json.dumps(element))
+ if counter < 2:
+ assert req.status_code == 500
+ continue
+
+ try:
+ data = utilities.get_json(req.data)
+ except Exception as e:
+ assert False
+ #assert counter < 2 #  this is an expected failure
+ #continue
+
+ if counter != 3:
+ assert data == "Import ok !"
+ get_elements = utilities.get_json(client.get("/"+type_element + "s").data)
+ get_elements = get_elements[type_element + "s"]
+
+ perimeter_id = list(get_elements.keys())[0]
+
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == name
+ if counter == 2 or counter == 4:
+ assert values[0]["description"] == "description of the " + type_element
+ #assert not values[0]["extra"]
+ if counter == 3:
+ assert values[0]["description"] == "new description of the " + type_element
+ assert values[0]["extra"][key_extra] == value_extra
+
+ # assert len(values[0]["policy_list"]) == 1
+ import_export_utilities.clean_all(client)
+
+
+def test_import_subject_object_action_categories():
+ client = utilities.register_client()
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_utilities.clean_all(client)
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECT_CATEGORIES
+ get_method = test_categories.get_subject_categories
+ elif type_element == "object":
+ elements = OBJECT_CATEGORIES
+ get_method = test_categories.get_object_categories
+ else:
+ elements = ACTION_CATEGORIES
+ get_method = test_categories.get_action_categories
+
+ for element in elements:
+ req = client.post("/import", content_type='application/json', data=json.dumps(element))
+ counter = counter + 1
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+ req, get_elements = get_method(client)
+ get_elements = get_elements[type_element + "_categories"]
+ assert len(list(get_elements.keys())) == 1
+ values = list(get_elements.values())
+ assert values[0]["name"] == "test " + type_element + " categories"
+ assert values[0]["description"] == type_element + " category description"
+
+
+def test_import_meta_rules():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ # import some categories
+ req = client.post("/import", content_type='application/json', data=json.dumps(PRE_META_RULES))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ counter = -1
+ for meta_rule in META_RULES:
+ counter = counter + 1
+ req = client.post("/import", content_type='application/json', data=json.dumps(meta_rule))
+ if counter != 3:
+ assert req.status_code == 500
+ continue
+ else:
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+ assert req.status_code == 200
+
+ req, meta_rules = test_meta_rules.get_meta_rules(client)
+ meta_rules = meta_rules["meta_rules"]
+ key = list(meta_rules.keys())[0]
+ assert isinstance(meta_rules,dict)
+ assert meta_rules[key]["name"] == "good meta rule"
+ assert meta_rules[key]["description"] == "valid meta rule"
+ assert len(meta_rules[key]["subject_categories"]) == 1
+ assert len(meta_rules[key]["object_categories"]) == 1
+ assert len(meta_rules[key]["action_categories"]) == 1
+
+ subject_category_key = meta_rules[key]["subject_categories"][0]
+ object_category_key = meta_rules[key]["object_categories"][0]
+ action_category_key = meta_rules[key]["action_categories"][0]
+
+ req, sub_cat = test_categories.get_subject_categories(client)
+ sub_cat = sub_cat["subject_categories"]
+ assert sub_cat[subject_category_key]["name"] == "test subject categories"
+
+ req, ob_cat = test_categories.get_object_categories(client)
+ ob_cat = ob_cat["object_categories"]
+ assert ob_cat[object_category_key]["name"] == "test object categories"
+
+ req, ac_cat = test_categories.get_action_categories(client)
+ ac_cat = ac_cat["action_categories"]
+ assert ac_cat[action_category_key]["name"] == "test action categories"
+
+ import_export_utilities.clean_all(client)
+
+
+def test_import_subject_object_action_assignments():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+
+ req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ counter = -1
+ if type_element == "subject":
+ datas = SUBJECT_ASSIGNMENTS
+ get_method = test_assignments.get_subject_assignment
+ elif type_element == "object":
+ datas = OBJECT_ASSIGNMENTS
+ get_method = test_assignments.get_object_assignment
+ else:
+ datas = ACTION_ASSIGNMENTS
+ get_method = test_assignments.get_action_assignment
+
+ for assignments in datas:
+ counter = counter + 1
+ req = client.post("/import", content_type='application/json', data=json.dumps(assignments))
+ if counter != 3:
+ assert req.status_code == 500
+ continue
+ else:
+ assert data == "Import ok !"
+ assert req.status_code == 200
+ req, policies = test_policies.get_policies(client)
+ for policy_key in policies["policies"]:
+ req, get_assignments = get_method(client, policy_key)
+ get_assignments = get_assignments[type_element+"_assignments"]
+ assert len(get_assignments) == 1
+
+
+def test_import_rules():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS))
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ counter = -1
+ for rule in RULES:
+ counter = counter + 1
+ req = client.post("/import", content_type='application/json', data=json.dumps(rule))
+
+ if counter < 5:
+ assert req.status_code == 500
+ continue
+
+ assert req.status_code == 200
+
+ req, rules = test_rules.test_get_rules()
+ rules = rules["rules"]
+ rules = rules["rules"]
+ assert len(rules) == 1
+ rules = rules[0]
+ assert rules["enabled"]
+ assert rules["instructions"]["decision"] == "grant"
+
+ req, meta_rules = test_meta_rules.get_meta_rules(client)
+ assert meta_rules["meta_rules"][list(meta_rules["meta_rules"].keys())[0]]["name"] == "good meta rule"
+
+
+def test_import_subject_object_action_data():
+ client = utilities.register_client()
+ type_elements = ["subject", "object", "action"]
+
+ for type_element in type_elements:
+ import_export_utilities.clean_all(client)
+ req = client.post("/import", content_type='application/json', data=json.dumps(PRE_DATA))
+ counter = -1
+ # set the getters and the comparison values
+ if type_element == "subject":
+ elements = SUBJECT_DATA
+ get_method = test_data.get_subject_data
+ get_categories = test_categories.get_subject_categories
+ elif type_element == "object":
+ elements = OBJECT_DATA
+ get_method = test_data.get_object_data
+ get_categories = test_categories.get_object_categories
+ else:
+ elements = ACTION_DATA
+ get_method = test_data.get_action_data
+ get_categories = test_categories.get_action_categories
+
+ for element in elements:
+ req = client.post("/import", content_type='application/json', data=json.dumps(element))
+ counter = counter + 1
+ if counter == 0 or counter == 1:
+ assert req.status_code == 500
+ continue
+ assert req.status_code == 200
+ data = utilities.get_json(req.data)
+ assert data == "Import ok !"
+
+ req, policies = test_policies.get_policies(client)
+ policies = policies["policies"]
+ req, categories = get_categories(client)
+ categories = categories[type_element + "_categories"]
+ case_tested = False
+ for policy_key in policies.keys():
+ policy = policies[policy_key]
+ for category_key in categories:
+ req, get_elements = get_method(client, policy_id=policy_key, category_id=category_key)
+ if len(get_elements[type_element+"_data"]) == 0:
+ continue
+
+ # do this because the backend gives an element with empty data if the policy_key,
+ # category_key couple does not have any data...
+ get_elements = get_elements[type_element+"_data"]
+ if len(get_elements[0]["data"]) == 0:
+ continue
+
+ if policy["name"] == "test policy":
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ if counter == 2:
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "one valid " + type_element + " data"
+ if counter == 3:
+ assert len(el["data"].keys()) == 2
+ el1 = el["data"][list(el["data"].keys())[0]]
+ el2 = el["data"][list(el["data"].keys())[1]]
+ if "value" in el1:
+ el1 = el1["value"]
+ el2 = el2["value"]
+ assert (el1["name"] == "one valid " + type_element + " data" and el2["name"] == "valid " + type_element + " data") or (el2["name"] == "one valid " + type_element + " data" and el1["name"] == "valid " + type_element + " data")
+ assert el1["description"] == "description"
+ assert el2["description"] == "description"
+
+ case_tested = True
+
+ if policy["name"] == "test other policy":
+ if counter == 4:
+ assert len(get_elements) == 1
+ el = get_elements[0]
+ assert isinstance(el["data"], dict)
+ assert len(el["data"].keys()) == 1
+ el = el["data"][list(el["data"].keys())[0]]
+ if "value" in el:
+ el = el["value"]
+ assert el["name"] == "valid " + type_element + " data"
+ assert el["description"] == "new description"
+ case_tested = True
+
+ assert case_tested is True
+
+
+def test_clean():
+ client = utilities.register_client()
+ import_export_utilities.clean_all(client)
+ #restore the database as previously
+ utilities.get_policy_id()
diff --git a/old/moon_manager/tests/unit_python/api/test_meta_data.py b/old/moon_manager/tests/unit_python/api/test_meta_data.py
new file mode 100644
index 00000000..e6cb0833
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_meta_data.py
@@ -0,0 +1,305 @@
+import json
+import api.utilities as utilities
+from helpers import data_builder
+from uuid import uuid4
+
+
+# subject_categories_test
+
+
+def get_subject_categories(client):
+ req = client.get("/subject_categories")
+ subject_categories = utilities.get_json(req.data)
+ return req, subject_categories
+
+
+def add_subject_categories(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/subject_categories", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subject_categories = utilities.get_json(req.data)
+ return req, subject_categories
+
+
+def delete_subject_categories(client, name):
+ request, subject_categories = get_subject_categories(client)
+ for key, value in subject_categories['subject_categories'].items():
+ if value['name'] == name:
+ return client.delete("/subject_categories/{}".format(key))
+
+
+def delete_subject_categories_without_id(client):
+ req = client.delete("/subject_categories/{}".format(""))
+ return req
+
+
+def test_get_subject_categories():
+ client = utilities.register_client()
+ req, subject_categories = get_subject_categories(client)
+ assert req.status_code == 200
+ assert isinstance(subject_categories, dict)
+ assert "subject_categories" in subject_categories
+
+
+def test_add_subject_categories():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(subject_categories, dict)
+ value = list(subject_categories["subject_categories"].values())[0]
+ assert "subject_categories" in subject_categories
+ assert value['name'] == "testuser"
+ assert value['description'] == "description of {}".format("testuser")
+
+
+def test_add_subject_categories_with_existed_name():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, subject_categories = add_subject_categories(client, name)
+ assert req.status_code == 200
+ req, subject_categories = add_subject_categories(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Subject Category Existing'
+
+
+def test_add_subject_categories_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Category Name Invalid'
+
+
+def test_add_subject_categories_with_empty_name():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_subject_categories_with_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_subject_categories(client, "test<z>user")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_subject_categories():
+ client = utilities.register_client()
+ req = delete_subject_categories(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_subject_categories_without_id():
+ client = utilities.register_client()
+ req = delete_subject_categories_without_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Subject Category Unknown"
+
+
+# ---------------------------------------------------------------------------
+# object_categories_test
+
+def get_object_categories(client):
+ req = client.get("/object_categories")
+ object_categories = utilities.get_json(req.data)
+ return req, object_categories
+
+
+def add_object_categories(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/object_categories", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ object_categories = utilities.get_json(req.data)
+ return req, object_categories
+
+
+def delete_object_categories(client, name):
+ request, object_categories = get_object_categories(client)
+ for key, value in object_categories['object_categories'].items():
+ if value['name'] == name:
+ return client.delete("/object_categories/{}".format(key))
+
+
+def delete_object_categories_without_id(client):
+ req = client.delete("/object_categories/{}".format(""))
+ return req
+
+
+def test_get_object_categories():
+ client = utilities.register_client()
+ req, object_categories = get_object_categories(client)
+ assert req.status_code == 200
+ assert isinstance(object_categories, dict)
+ assert "object_categories" in object_categories
+
+
+def test_add_object_categories():
+ client = utilities.register_client()
+ req, object_categories = add_object_categories(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(object_categories, dict)
+ value = list(object_categories["object_categories"].values())[0]
+ assert "object_categories" in object_categories
+ assert value['name'] == "testuser"
+ assert value['description'] == "description of {}".format("testuser")
+
+
+def test_add_object_categories_with_existed_name():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, object_categories = add_object_categories(client, name)
+ assert req.status_code == 200
+ req, object_categories = add_object_categories(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Object Category Existing'
+
+
+def test_add_object_categories_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_object_categories(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Category Name Invalid'
+
+
+def test_add_object_categories_with_empty_name():
+ client = utilities.register_client()
+ req, object_categories = add_object_categories(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_object_categories_with_name_contain_space():
+ client = utilities.register_client()
+ req, object_categories = add_object_categories(client, "test<a>user")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_object_categories():
+ client = utilities.register_client()
+ req = delete_object_categories(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_object_categories_without_id():
+ client = utilities.register_client()
+ req = delete_object_categories_without_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Object Category Unknown"
+
+
+# ---------------------------------------------------------------------------
+# action_categories_test
+
+def get_action_categories(client):
+ req = client.get("/action_categories")
+ action_categories = utilities.get_json(req.data)
+ return req, action_categories
+
+
+def add_action_categories(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name)
+ }
+ req = client.post("/action_categories", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ action_categories = utilities.get_json(req.data)
+ return req, action_categories
+
+
+def delete_action_categories(client, name):
+ request, action_categories = get_action_categories(client)
+ for key, value in action_categories['action_categories'].items():
+ if value['name'] == name:
+ return client.delete("/action_categories/{}".format(key))
+
+
+def delete_action_categories_without_id(client):
+ req = client.delete("/action_categories/{}".format(""))
+ return req
+
+
+def test_get_action_categories():
+ client = utilities.register_client()
+ req, action_categories = get_action_categories(client)
+ assert req.status_code == 200
+ assert isinstance(action_categories, dict)
+ assert "action_categories" in action_categories
+
+
+def test_add_action_categories():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(action_categories, dict)
+ value = list(action_categories["action_categories"].values())[0]
+ assert "action_categories" in action_categories
+ assert value['name'] == "testuser"
+ assert value['description'] == "description of {}".format("testuser")
+
+
+def test_add_action_categories_with_existed_name():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, action_categories = add_action_categories(client, name)
+ assert req.status_code == 200
+ req, action_categories = add_action_categories(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Action Category Existing'
+
+
+def test_add_action_categories_name_contain_space():
+ client = utilities.register_client()
+ req, subject_categories = add_action_categories(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Category Name Invalid'
+
+
+def test_add_action_categories_with_empty_name():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_action_categories_with_name_contain_space():
+ client = utilities.register_client()
+ req, action_categories = add_action_categories(client, "test<a>user")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_delete_action_categories():
+ client = utilities.register_client()
+ req = delete_action_categories(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_delete_action_categories_without_id():
+ client = utilities.register_client()
+ req = delete_action_categories_without_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Action Category Unknown"
+
+
+def test_delete_data_categories_connected_to_meta_rule():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ client = utilities.register_client()
+ req = client.delete("/subject_categories/{}".format(subject_category_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Subject Category With Meta Rule Error'
+
+ req = client.delete("/object_categories/{}".format(object_category_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Object Category With Meta Rule Error'
+
+ req = client.delete("/action_categories/{}".format(action_category_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Action Category With Meta Rule Error'
diff --git a/old/moon_manager/tests/unit_python/api/test_meta_rules.py b/old/moon_manager/tests/unit_python/api/test_meta_rules.py
new file mode 100644
index 00000000..634f19da
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_meta_rules.py
@@ -0,0 +1,415 @@
+import json
+import api.utilities as utilities
+from helpers import category_helper
+from helpers import data_builder
+from uuid import uuid4
+
+
+def get_meta_rules(client):
+ req = client.get("/meta_rules")
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def add_meta_rules(client, name, data=None):
+ if not data:
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id = list(action_category.keys())[0]
+
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ req = client.post("/meta_rules", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def add_meta_rules_without_category_ids(client, name):
+ data = {
+ "name": name + uuid4().hex,
+ "subject_categories": [],
+ "object_categories": [],
+ "action_categories": []
+ }
+ req = client.post("/meta_rules", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def update_meta_rules(client, name, metaRuleId, data=None):
+ if not data:
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name update" + uuid4().hex,
+ "description": "description 1"})
+ subject_category_id = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name update" + uuid4().hex,
+ "description": "description 1"})
+ object_category_id = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name update" + uuid4().hex,
+ "description": "description 1"})
+ action_category_id = list(action_category.keys())[0]
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+
+ req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def update_meta_rules_with_categories(client, name, data=None, meta_rule_id=None):
+ if not meta_rule_id:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+
+ req = client.patch("/meta_rules/{}".format(meta_rule_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ meta_rules = utilities.get_json(req.data)
+ return req, meta_rules
+
+
+def delete_meta_rules(client, name):
+ request, meta_rules = get_meta_rules(client)
+ for key, value in meta_rules['meta_rules'].items():
+ if value['name'] == name:
+ return client.delete("/meta_rules/{}".format(key))
+
+
+def delete_meta_rules_without_id(client):
+ req = client.delete("/meta_rules/{}".format(""))
+ return req
+
+
+def test_get_meta_rules():
+ client = utilities.register_client()
+ req, meta_rules = get_meta_rules(client)
+ assert req.status_code == 200
+ assert isinstance(meta_rules, dict)
+ assert "meta_rules" in meta_rules
+
+
+def test_add_meta_rules():
+ client = utilities.register_client()
+ meta_rule_name = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name)
+ assert req.status_code == 200
+ assert isinstance(meta_rules, dict)
+ value = list(meta_rules["meta_rules"].values())[0]
+ assert "meta_rules" in meta_rules
+ assert value['name'] == meta_rule_name
+
+
+def test_add_two_meta_rules_with_same_categories_combination():
+ client = utilities.register_client()
+ meta_rule_name = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name)
+ assert req.status_code == 200
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name:
+ data = meta_rules['meta_rules'][meta_rule_id]
+
+ data['name'] = uuid4().hex
+ req, meta_rules = add_meta_rules(client, name=data['name'], data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Meta Rule Existing'
+
+
+def test_add_three_meta_rules_with_different_combination_but_similar_items():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name1)
+ assert req.status_code == 200
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name1:
+ data = meta_rules['meta_rules'][meta_rule_id]
+ break
+
+ meta_rule_name2 = uuid4().hex
+
+ req, meta_rules = add_meta_rules(client, meta_rule_name2)
+
+ for meta_rule_id in meta_rules['meta_rules']:
+ if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name2:
+ data['subject_categories'] += meta_rules['meta_rules'][meta_rule_id][
+ 'subject_categories']
+ data['object_categories'] += meta_rules['meta_rules'][meta_rule_id]['object_categories']
+ data['action_categories'] += meta_rules['meta_rules'][meta_rule_id]['action_categories']
+ break
+
+ data['name'] = uuid4().hex
+
+ req, meta_rules = add_meta_rules(client, name=data['name'], data=data)
+ assert req.status_code == 200
+
+
+def test_add_two_meta_rules_with_different_combination_but_similar_items():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ meta_rule_name2 = uuid4().hex
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data)
+ assert req.status_code == 200
+ data = {
+ "name": meta_rule_name2,
+ "subject_categories": [subject_category_id2],
+ "object_categories": [object_category_id1],
+ "action_categories": [action_category_id2]
+ }
+
+ req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data)
+ assert req.status_code == 200
+
+
+def test_add_meta_rule_with_existing_name_error():
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, meta_rules = add_meta_rules(client, name)
+ assert req.status_code == 200
+ req, meta_rules = add_meta_rules(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Meta Rule Existing'
+
+
+def test_add_meta_rules_with_forbidden_char_in_name():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules(client, "<a>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_meta_rules_with_blank_name():
+ client = utilities.register_client()
+ req, meta_rules = add_meta_rules(client, "")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Meta Rule Error'
+
+
+def test_add_meta_rules_without_subject_categories():
+ client = utilities.register_client()
+ name_meta_rule = uuid4().hex
+ req, meta_rules = add_meta_rules_without_category_ids(client, name_meta_rule)
+ assert req.status_code == 200
+
+
+def test_delete_meta_rules():
+ client = utilities.register_client()
+ name_meta_rule = uuid4().hex
+ req, meta_rules = add_meta_rules_without_category_ids(client, name_meta_rule)
+ meta_rule_id = next(iter(meta_rules['meta_rules']))
+ req = delete_meta_rules(client, meta_rules['meta_rules'][meta_rule_id]['name'])
+ assert req.status_code == 200
+
+
+def test_delete_meta_rules_without_id():
+ client = utilities.register_client()
+ req = delete_meta_rules_without_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Meta Rule Unknown"
+
+
+def test_update_meta_rules():
+ client = utilities.register_client()
+ req = add_meta_rules(client, "testuser")
+ meta_rule_id = list(req[1]['meta_rules'])[0]
+ req_update = update_meta_rules(client, "testuser", meta_rule_id)
+ assert req_update[0].status_code == 200
+ delete_meta_rules(client, "testuser")
+ get_meta_rules(client)
+
+
+def test_update_meta_rule_with_combination_existed():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name1)
+ meta_rule_id1 = next(iter(meta_rules['meta_rules']))
+ data1 = meta_rules['meta_rules'][meta_rule_id1]
+
+ meta_rule_name2 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name2)
+ meta_rule_id2 = next(iter(meta_rules['meta_rules']))
+ data2 = meta_rules['meta_rules'][meta_rule_id2]
+ data1['name'] = data2['name']
+ req_update = update_meta_rules(client, name=meta_rule_name2, metaRuleId=meta_rule_id2,
+ data=data1)
+ assert req_update[0].status_code == 409
+ assert req_update[1]['message']== '409: Meta Rule Existing'
+
+
+def test_update_meta_rule_with_different_combination_but_same_data():
+ client = utilities.register_client()
+ meta_rule_name1 = uuid4().hex
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id1 = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id1 = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id1 = list(action_category.keys())[0]
+ subject_category = category_helper.add_subject_category(
+ value={"name": "subject category name" + uuid4().hex, "description": "description 1"})
+ subject_category_id2 = list(subject_category.keys())[0]
+ object_category = category_helper.add_object_category(
+ value={"name": "object category name" + uuid4().hex, "description": "description 1"})
+ object_category_id2 = list(object_category.keys())[0]
+ action_category = category_helper.add_action_category(
+ value={"name": "action category name" + uuid4().hex, "description": "description 1"})
+ action_category_id2 = list(action_category.keys())[0]
+
+ data = {
+ "name": meta_rule_name1,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1, object_category_id2],
+ "action_categories": [action_category_id1, action_category_id2]
+ }
+ req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data)
+ assert req.status_code == 200
+
+ meta_rule_name2 = uuid4().hex
+ req, meta_rules = add_meta_rules(client, meta_rule_name2)
+ meta_rule_id2 = next(iter(meta_rules['meta_rules']))
+ data2 = {
+ "name": meta_rule_name2,
+ "subject_categories": [subject_category_id1, subject_category_id2],
+ "object_categories": [object_category_id1],
+ "action_categories": [action_category_id1,action_category_id2]
+ }
+
+ req_update = update_meta_rules(client, name=meta_rule_name2, metaRuleId=meta_rule_id2,
+ data=data2)
+ assert req_update[0].status_code == 200
+
+
+def test_update_meta_rules_without_id():
+ client = utilities.register_client()
+ req_update = update_meta_rules(client, "testuser", "")
+ assert req_update[0].status_code == 400
+ assert json.loads(req_update[0].data)["message"] == "400: Meta Rule Unknown"
+
+
+def test_update_meta_rules_without_name():
+ client = utilities.register_client()
+ req_update = update_meta_rules(client, "<br/>", "1234567")
+ assert req_update[0].status_code == 400
+ assert json.loads(req_update[0].data)[
+ "message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_update_meta_rules_without_categories():
+ client = utilities.register_client()
+ req_update = update_meta_rules_with_categories(client, "testuser")
+ assert req_update[0].status_code == 200
+
+
+def test_update_meta_rules_with_empty_categories():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [""],
+ "object_categories": [""],
+ "action_categories": [""]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Subject Category Unknown'
+
+
+def test_update_meta_rules_with_empty_action_category():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [""]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Action Category Unknown'
+
+
+def test_update_meta_rules_with_empty_object_category():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id],
+ "object_categories": [""],
+ "action_categories": [action_category_id]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Object Category Unknown'
+
+
+def test_update_meta_rules_with_categories_and_one_empty():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule()
+ data = {
+ "name": "testuser",
+ "subject_categories": [subject_category_id, ""],
+ "object_categories": [object_category_id, ""],
+ "action_categories": [action_category_id, ""]
+ }
+ req_update = update_meta_rules_with_categories(client, "testuser", data=data,
+ meta_rule_id=meta_rule_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Subject Category Unknown'
diff --git a/old/moon_manager/tests/unit_python/api/test_pdp.py b/old/moon_manager/tests/unit_python/api/test_pdp.py
new file mode 100644
index 00000000..53a87b21
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_pdp.py
@@ -0,0 +1,164 @@
+import json
+import api.utilities as utilities
+from helpers import data_builder as builder
+from uuid import uuid4
+
+
+def get_pdp(client):
+ req = client.get("/pdp")
+ pdp = utilities.get_json(req.data)
+ return req, pdp
+
+
+def add_pdp(client, data):
+ req = client.post("/pdp", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ pdp = utilities.get_json(req.data)
+ return req, pdp
+
+
+def update_pdp(client, data, pdp_id):
+ req = client.patch("/pdp/{}".format(pdp_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ pdp = utilities.get_json(req.data)
+ return req, pdp
+
+
+def delete_pdp(client, key):
+ req = client.delete("/pdp/{}".format(key))
+ return req
+
+
+def delete_pdp_without_id(client):
+ req = client.delete("/pdp/{}".format(""))
+ return req
+
+
+def test_get_pdp():
+ client = utilities.register_client()
+ req, pdp = get_pdp(client)
+ assert req.status_code == 200
+ assert isinstance(pdp, dict)
+ assert "pdps" in pdp
+
+
+def test_add_pdp():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data = {
+ "name": "testuser",
+ "security_pipeline": [policy_id],
+ "keystone_project_id": "keystone_project_id",
+ "description": "description of testuser"
+ }
+ client = utilities.register_client()
+ req, pdp = add_pdp(client, data)
+ assert req.status_code == 200
+ assert isinstance(pdp, dict)
+ value = list(pdp["pdps"].values())[0]
+ assert "pdps" in pdp
+ assert value['name'] == "testuser"
+ assert value["description"] == "description of {}".format("testuser")
+ assert value["keystone_project_id"] == "keystone_project_id"
+
+
+def test_delete_pdp():
+ client = utilities.register_client()
+ request, pdp = get_pdp(client)
+ success_req = None
+ for key, value in pdp['pdps'].items():
+ if value['name'] == "testuser":
+ success_req = delete_pdp(client, key)
+ break
+ assert success_req
+ assert success_req.status_code == 200
+
+
+def test_add_pdp_with_forbidden_char_in_user():
+ data = {
+ "name": "<a>",
+ "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "keystone_project_id": "keystone_project_id",
+ "description": "description of testuser"
+ }
+ client = utilities.register_client()
+ req, models = add_pdp(client, data)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_pdp_with_forbidden_char_in_keystone():
+ data = {
+ "name": "testuser",
+ "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "keystone_project_id": "<a>",
+ "description": "description of testuser"
+ }
+ client = utilities.register_client()
+ req, meta_rules = add_pdp(client, data)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'keystone_project_id', [Forbidden characters in string]"
+
+
+def test_update_pdp():
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1"+uuid4().hex,
+ object_category_name="object_category1"+uuid4().hex,
+ action_category_name="action_category1"+uuid4().hex,
+ meta_rule_name="meta_rule_1"+uuid4().hex,
+ model_name="model1"+uuid4().hex)
+ data_add = {
+ "name": "testuser",
+ "security_pipeline": [policy_id],
+ "keystone_project_id": "keystone_project_id",
+ "description": "description of testuser"
+ }
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id_update = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ data_update = {
+ "name": "testuser",
+ "security_pipeline": [policy_id_update],
+ "keystone_project_id": "keystone_project_id_update",
+ "description": "description of testuser"
+ }
+ client = utilities.register_client()
+ req = add_pdp(client, data_add)
+ pdp_id = list(req[1]['pdps'])[0]
+ req_update = update_pdp(client, data_update, pdp_id)
+ assert req_update[0].status_code == 200
+ value = list(req_update[1]["pdps"].values())[0]
+ assert value["keystone_project_id"] == "keystone_project_id_update"
+ request, pdp = get_pdp(client)
+ for key, value in pdp['pdps'].items():
+ if value['name'] == "testuser":
+ delete_pdp(client, key)
+ break
+
+
+def test_update_pdp_without_id():
+ client = utilities.register_client()
+ req_update = update_pdp(client, "testuser", "")
+ assert req_update[0].status_code == 400
+ assert json.loads(req_update[0].data)["message"] == 'Invalid Key :name not found'
+
+
+def test_update_pdp_without_user():
+ data = {
+ "name": "",
+ "security_pipeline": ["policy_id_1", "policy_id_2"],
+ "keystone_project_id": "keystone_project_id",
+ "description": "description of testuser"
+ }
+ client = utilities.register_client()
+ req_update = update_pdp(client, data, "<a>")
+ assert req_update[0].status_code == 400
+ assert json.loads(req_update[0].data)["message"] == "Forbidden characters in string"
diff --git a/old/moon_manager/tests/unit_python/api/test_perimeter.py b/old/moon_manager/tests/unit_python/api/test_perimeter.py
new file mode 100644
index 00000000..ff7b09d7
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_perimeter.py
@@ -0,0 +1,1028 @@
+# import moon_manager
+# import moon_manager.api
+import json
+import api.utilities as utilities
+from helpers import data_builder as builder
+import helpers.policy_helper as policy_helper
+from uuid import uuid4
+
+
+def get_subjects(client):
+ req = client.get("/subjects")
+ subjects = utilities.get_json(req.data)
+ return req, subjects
+
+
+def add_subjects(client, policy_id, name, perimeter_id=None, data=None):
+ if not data:
+ name = name + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ if not perimeter_id:
+ req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ else:
+ req = client.post("/policies/{}/subjects/{}".format(policy_id, perimeter_id),
+ data=json.dumps(
+ data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ return req, subjects
+
+
+def delete_subjects_without_perimeter_id(client):
+ req = client.delete("/subjects/{}".format(""))
+ return req
+
+
+def test_perimeter_get_subject():
+ client = utilities.register_client()
+ req, subjects = get_subjects(client)
+ assert req.status_code == 200
+ assert isinstance(subjects, dict)
+ assert "subjects" in subjects
+
+
+def test_perimeter_add_subject():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value["name"]
+ assert value["email"]
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ name = "testuser"
+ perimeter_id = uuid4().hex
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ add_subjects(client, policy_id1, data['name'], perimeter_id=perimeter_id, data=data)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ req, subjects = add_subjects(client, policy_id2, data['name'],
+ perimeter_id=perimeter_id, data=data)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value["name"]
+ assert value["email"]
+ assert len(value['policy_list']) == 2
+ assert policy_id1 in value['policy_list']
+ assert policy_id2 in value['policy_list']
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_different_name():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ add_subjects(client, policy_id1, "testuser", perimeter_id=perimeter_id)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ req, subjects = add_subjects(client, policy_id2, "testuser", perimeter_id=perimeter_id)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_subject_name_with_new_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id,
+ data=data)
+ policies2 = policy_helper.add_policies()
+ policy_id2 = list(policies2.keys())[0]
+ value = list(subjects["subjects"].values())[0]
+ data = {
+ "name": value['name'],
+ "description": "description of {}".format(value['name']),
+ "password": "password for {}".format(value['name']),
+ "email": "{}@moon".format(value['name'])
+ }
+ req, subjects = add_subjects(client, policy_id2, None, data=data)
+ value = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value["name"]
+ assert value["email"]
+ assert len(value['policy_list']) == 2
+ assert policy_id1 in value['policy_list']
+ assert policy_id2 in value['policy_list']
+
+
+def test_perimeter_add_same_subject_name_with_same_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id = uuid4().hex
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id,
+ data=data)
+ value = list(subjects["subjects"].values())[0]
+ data = {
+ "name": value['name'],
+ "description": "description of {}".format(value['name']),
+ "password": "password for {}".format(value['name']),
+ "email": "{}@moon".format(value['name'])
+ }
+ req, subjects = add_subjects(client, policy_id1, None, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_subject_perimeter_id_with_existed_policy_id_in_list():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ name = "testuser" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id, name, data=data)
+ perimeter_id = list(subjects["subjects"].values())[0]['id']
+ req, subjects = add_subjects(client, policy_id, name, perimeter_id=perimeter_id, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_subject_invalid_policy_id():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ name = "testuser"
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, policy_id + "0", "testuser", data)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_subject_policy_id_none():
+ client = utilities.register_client()
+ name = "testuser"
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ "password": "password for {}".format(name),
+ "email": "{}@moon".format(name)
+ }
+ req, subjects = add_subjects(client, None, "testuser", data)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_subject_with_forbidden_char_in_name():
+ client = utilities.register_client()
+ data = {
+ "name": "<a>",
+ "description": "description of {}".format(""),
+ "password": "password for {}".format(""),
+ "email": "{}@moon".format("")
+ }
+ req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_update_subject_name():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_subject_description():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update",
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_subject_description_and_name():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update",
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["subjects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_subject_wrong_id():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, subjects = add_subjects(client, policy_id=policy_id1, name='testuser', data=data)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_update_subject_name_with_existed_one():
+ client = utilities.register_client()
+ name1 = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ perimeter_id1 = uuid4().hex
+ req, subjects = add_subjects(client, policy_id=policy_id1, name=name1,
+ perimeter_id=perimeter_id1)
+ value1 = list(subjects["subjects"].values())[0]
+ perimeter_id2 = uuid4().hex
+ name2 = 'testuser' + uuid4().hex
+ req, subjects = add_subjects(client, policy_id=policy_id1, name=name2,
+ perimeter_id=perimeter_id2)
+ data = {
+ 'name': value1['name'],
+ }
+ req = client.patch("/subjects/{}".format(perimeter_id2), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+
+
+def test_perimeter_delete_subject():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ req, subjects = add_subjects(client, policy_id, "testuser")
+ subject_id = list(subjects["subjects"].values())[0]["id"]
+ req = client.delete("/policies/{}/subjects/{}".format(policy_id, subject_id))
+ assert req.status_code == 200
+
+
+def test_perimeter_delete_subjects_without_perimeter_id():
+ client = utilities.register_client()
+ req = delete_subjects_without_perimeter_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Subject Unknown"
+
+
+def get_objects(client):
+ req = client.get("/objects")
+ objects = utilities.get_json(req.data)
+ return req, objects
+
+
+def add_objects(client, name, policyId=None, data=None, perimeter_id=None):
+ if not policyId:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+ if not data:
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ }
+ if not perimeter_id:
+ req = client.post("/policies/{}/objects/".format(policyId), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ else:
+ req = client.post("/policies/{}/objects/{}".format(policyId, perimeter_id),
+ data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ objects = utilities.get_json(req.data)
+ return req, objects
+
+
+def delete_objects_without_perimeter_id(client):
+ req = client.delete("/objects/{}".format(""))
+ return req
+
+
+def test_perimeter_get_object():
+ client = utilities.register_client()
+ req, objects = get_objects(client)
+ assert req.status_code == 200
+ assert isinstance(objects, dict)
+ assert "objects" in objects
+
+
+def test_perimeter_add_object():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value['name']
+
+
+def test_perimeter_add_object_with_wrong_policy_id():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser", policyId='wrong')
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_object_with_policy_id_none():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "description": "description of {}".format("testuser"),
+ }
+ req = client.post("/policies/{}/objects/".format(None), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_same_object_name_with_new_policy_id():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_object_perimeter_id_with_new_policy_id():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_object_perimeter_id_with_different_name():
+ client = utilities.register_client()
+ req, objects = add_objects(client, "testuser")
+ value1 = list(objects["objects"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'] + 'different',
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_object_name_with_same_policy_id():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ value = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+ value = list(objects["objects"].values())[0]
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data,
+ perimeter_id=value['id'])
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_update_object_name():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_object_description():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_object_description_and_name():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ objects = utilities.get_json(req.data)
+ value2 = list(objects["objects"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_object_wrong_id():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data)
+
+ value1 = list(objects["objects"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/objects/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+
+
+def test_perimeter_update_object_name_with_existed_one():
+ client = utilities.register_client()
+ name = 'testuser' + uuid4().hex
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data1 = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data1)
+ value1 = list(objects["objects"].values())[0]
+
+ name = 'testuser' + uuid4().hex
+
+ data2 = {
+ "name": name,
+ "description": "description of {}".format('testuser'),
+ }
+ req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data2)
+
+ value2 = list(objects["objects"].values())[0]
+ perimeter_id2 = value2['id']
+
+ data3 = {
+ 'name': value1['name']
+ }
+ req = client.patch("/objects/{}".format(perimeter_id2), data=json.dumps(data3),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Object Existing'
+
+
+def test_perimeter_add_object_without_name():
+ client = utilities.register_client()
+ data = {
+ "name": "<br/>",
+ "description": "description of {}".format(""),
+ }
+ req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_object_with_name_contain_spaces():
+ client = utilities.register_client()
+ data = {
+ "name": "test<a>user",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_delete_object():
+ client = utilities.register_client()
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ object_id = builder.create_object(policy_id)
+ req = client.delete("/policies/{}/objects/{}".format(policy_id, object_id))
+ assert req.status_code == 200
+
+
+def test_perimeter_delete_objects_without_perimeter_id():
+ client = utilities.register_client()
+ req = delete_objects_without_perimeter_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Object Unknown"
+
+
+def get_actions(client):
+ req = client.get("/actions")
+ actions = utilities.get_json(req.data)
+ return req, actions
+
+
+def add_actions(client, name, policy_id=None, data=None, perimeter_id=None):
+ if not policy_id:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex,
+ model_name="model1" + uuid4().hex)
+
+ if not data:
+ data = {
+ "name": name + uuid4().hex,
+ "description": "description of {}".format(name),
+ }
+ if not perimeter_id:
+ req = client.post("/policies/{}/actions/".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ else:
+ req = client.post("/policies/{}/actions/{}".format(policy_id, perimeter_id),
+ data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ actions = utilities.get_json(req.data)
+ return req, actions
+
+
+def delete_actions_without_perimeter_id(client):
+ req = client.delete("/actions/{}".format(""))
+ return req
+
+
+def test_perimeter_get_actions():
+ client = utilities.register_client()
+ req, actions = get_actions(client)
+ assert req.status_code == 200
+ assert isinstance(actions, dict)
+ assert "actions" in actions
+
+
+def test_perimeter_add_actions():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value = list(actions["actions"].values())[0]
+ assert req.status_code == 200
+ assert value['name']
+
+
+def test_perimeter_add_action_with_wrong_policy_id():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser", policy_id="wrong")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_action_with_policy_id_none():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser" + uuid4().hex,
+ "description": "description of {}".format("testuser"),
+ }
+ req = client.post("/policies/{}/actions/".format(None), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
+
+
+def test_perimeter_add_same_action_name_with_new_policy_id():
+ client = utilities.register_client()
+ req, action = add_actions(client, "testuser")
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data)
+ value2 = list(action["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_action_perimeter_id_with_new_policy_id():
+ client = utilities.register_client()
+ req, action = add_actions(client, "testuser")
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ value2 = list(action["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['id'] == value2['id']
+ assert value1['name'] == value2['name']
+
+
+def test_perimeter_add_same_action_perimeter_id_with_different_name():
+ client = utilities.register_client()
+ req, action = add_actions(client, "testuser")
+ value1 = list(action["actions"].values())[0]
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ data = {
+ "name": value1['name'] + 'different',
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_add_same_action_name_with_same_policy_id():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ req, action = add_actions(client, "testuser", policy_id=policy_id1)
+ value1 = list(action["actions"].values())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_same_action_perimeter_id_with_existed_policy_id_in_list():
+ client = utilities.register_client()
+ policies1 = policy_helper.add_policies()
+ policy_id1 = list(policies1.keys())[0]
+ req, action = add_actions(client, "testuser", policy_id=policy_id1)
+ value1 = list(action["actions"].values())[0]
+ data = {
+ "name": value1['name'],
+ "description": "description of {}".format('testuser'),
+ }
+ req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data,
+ perimeter_id=value1['id'])
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_perimeter_add_actions_without_name():
+ client = utilities.register_client()
+ data = {
+ "name": "<a>",
+ "description": "description of {}".format(""),
+ }
+ req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_perimeter_add_actions_with_name_contain_spaces():
+ client = utilities.register_client()
+ data = {
+ "name": "test<a>user",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_subjects_without_policy_id():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_add_objects_without_policy_id():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/objects".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_add_action_without_policy_id():
+ client = utilities.register_client()
+ data = {
+ "name": "testuser",
+ "description": "description of {}".format("test user"),
+ }
+ req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_perimeter_update_action_name():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] == value2['description']
+
+
+def test_perimeter_update_actions_description():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_actions_description_and_name():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ subjects = utilities.get_json(req.data)
+ value2 = list(subjects["actions"].values())[0]
+ assert req.status_code == 200
+ assert value1['name'] + 'update' == value2['name']
+ assert value1['id'] == value2['id']
+ assert value1['description'] + 'update' == value2['description']
+
+
+def test_perimeter_update_action_wrong_id():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ perimeter_id = value1['id']
+ data = {
+ 'name': value1['name'] + "update",
+ 'description': value1['description'] + "update"
+ }
+ req = client.patch("/actions/{}".format(perimeter_id + "wrong"), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.'
+
+
+def test_perimeter_update_action_name_with_existed_one():
+ client = utilities.register_client()
+ req, actions = add_actions(client, "testuser")
+ value1 = list(actions["actions"].values())[0]
+ req, actions = add_actions(client, "testuser")
+ value2 = list(actions["actions"].values())[0]
+ perimeter_id2 = value2['id']
+ data = {
+ 'name': value1['name'],
+ }
+ req = client.patch("/actions/{}".format(perimeter_id2), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Action Existing'
+
+
+def test_perimeter_delete_actions():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+ action_id = builder.create_action(policy_id)
+ req = client.delete("/policies/{}/actions/{}".format(policy_id, action_id))
+ assert req.status_code == 200
+
+
+def test_delete_subject_without_policy():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ req = client.delete("/subjects/{}".format(action_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_delete_objects_without_policy():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ req = client.delete("/objects/{}".format(action_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_delete_actions_without_policy():
+ client = utilities.register_client()
+
+ policies = policy_helper.add_policies()
+ policy_id = list(policies.keys())[0]
+
+ action_id = builder.create_action(policy_id)
+
+ req = client.delete("/actions/{}".format(action_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+
+def test_perimeter_delete_actions_without_perimeter_id():
+ client = utilities.register_client()
+ req = delete_actions_without_perimeter_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Action Unknown"
diff --git a/old/moon_manager/tests/unit_python/api/test_policies.py b/old/moon_manager/tests/unit_python/api/test_policies.py
new file mode 100644
index 00000000..76161d53
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_policies.py
@@ -0,0 +1,342 @@
+# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import json
+from uuid import uuid4
+import api.utilities as utilities
+from helpers import model_helper
+from helpers import policy_helper
+from helpers import data_builder
+
+
+def get_policies(client):
+ req = client.get("/policies")
+ policies = utilities.get_json(req.data)
+ return req, policies
+
+
+def add_policies(client, name):
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policies = utilities.get_json(req.data)
+ return req, policies
+
+
+def delete_policies_without_id(client):
+ req = client.delete("/policies/{}".format(""))
+ return req
+
+
+def test_get_policies():
+ client = utilities.register_client()
+ req, policies = get_policies(client)
+ assert req.status_code == 200
+ assert isinstance(policies, dict)
+ assert "policies" in policies
+
+
+def test_add_policies():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 200
+ assert isinstance(policies, dict)
+ value = list(policies["policies"].values())[0]
+ assert "policies" in policies
+ assert value['name'] == policy_name
+ assert value["description"] == "description of {}".format(policy_name)
+
+
+def test_add_policies_without_model():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 200
+
+
+def test_add_policies_with_same_name():
+ name = uuid4().hex
+ policy_name = name
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 200
+ assert isinstance(policies, dict)
+ value = list(policies["policies"].values())[0]
+ assert "policies" in policies
+ assert value['name'] == policy_name
+ assert value["description"] == "description of {}".format(policy_name)
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_add_policy_with_empty_name():
+ policy_name = ""
+ client = utilities.register_client()
+ req, policies = add_policies(client, policy_name)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Content Error'
+
+
+def test_update_policies_with_model():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name + "-2",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+ assert json.loads(req.data)['policies'][policy_id]['name'] == policy_name + '-2'
+
+
+def test_update_policies_name_success():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": policy_name + "-2",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+ assert json.loads(req.data)['policies'][policy_id]['name'] == policy_name + '-2'
+
+
+def test_update_policies_model_unused():
+ policy_name = uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+
+
+def test_update_policy_name_with_existed_one():
+ policy_name1 = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name1,
+ "description": "description of {}".format(policy_name1),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id1 = next(iter(utilities.get_json(req.data)['policies']))
+
+ policy_name2 = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name2,
+ "description": "description of {}".format(policy_name2),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id2 = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": policy_name1,
+ "description": "description of {}".format(policy_name1),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id2), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Policy Already Exists'
+
+
+def test_update_policies_with_empty_name():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": "",
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Content Error'
+
+
+def test_update_policies_with_blank_model():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies/", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ policy_id = next(iter(utilities.get_json(req.data)['policies']))
+
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": "",
+ "genre": "genre"
+ }
+
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 200
+
+
+def test_update_policies_connected_to_rules_with_blank_model():
+ client = utilities.register_client()
+ req, rules, policy_id = data_builder.add_rules(client)
+ req = client.get("/policies")
+ data = utilities.get_json(req.data)
+ for policy_obj_id in data['policies']:
+ if policy_obj_id == policy_id:
+ policy = data['policies'][policy_obj_id]
+ policy['model_id'] = ''
+ req = client.patch("/policies/{}".format(policy_id), data=json.dumps(policy),
+ headers={'Content-Type': 'application/json'})
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy update error'
+
+
+def test_delete_policies():
+ client = utilities.register_client()
+
+ policy = policy_helper.add_policies()
+ policy_id = list(policy.keys())[0]
+
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 200
+
+
+def test_delete_policy_with_dependencies_rule():
+ client = utilities.register_client()
+ req, rules, policy_id = data_builder.add_rules(client)
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy With Rule Error'
+
+
+def test_delete_policy_with_dependencies_subject_data():
+ client = utilities.register_client()
+ req, rules, policy_id = data_builder.add_rules(client)
+ req = client.delete("/policies/{}/rules/{}".format(policy_id, next(iter(rules['rules']))))
+ assert req.status_code == 200
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy With Data Error'
+
+
+def test_delete_policy_with_dependencies_perimeter():
+ client = utilities.register_client()
+ policy = policy_helper.add_policies()
+ policy_id = next(iter(policy))
+
+ data = {
+ "name": 'testuser'+uuid4().hex,
+ "description": "description of {}".format(uuid4().hex),
+ "password": "password for {}".format(uuid4().hex),
+ "email": "{}@moon".format(uuid4().hex)
+ }
+ req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 200
+ req = client.delete("/policies/{}".format(policy_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy With Perimeter Error'
+
+
+def test_delete_policies_without_id():
+ client = utilities.register_client()
+ req = delete_policies_without_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Policy Unknown'
diff --git a/old/moon_manager/tests/unit_python/api/test_rules.py b/old/moon_manager/tests/unit_python/api/test_rules.py
new file mode 100644
index 00000000..a3c21839
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_rules.py
@@ -0,0 +1,129 @@
+import api.utilities as utilities
+import json
+from helpers import data_builder as builder
+from uuid import uuid4
+from helpers import policy_helper
+
+
+def get_rules(client, policy_id):
+ req = client.get("/policies/{}/rules".format(policy_id))
+ rules = utilities.get_json(req.data)
+ return req, rules
+
+
+def add_rules_without_policy_id(client):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [subject_category_id, object_category_id, action_category_id],
+ "instructions": (
+ {"decision": "grant"},
+ ),
+ "enabled": True
+ }
+ req = client.post("/policies/{}/rules".format(None), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ rules = utilities.get_json(req.data)
+ return req, rules
+
+
+def add_rules_without_meta_rule_id(client, policy_id):
+ data = {
+ "meta_rule_id": "",
+ "rule": ["subject_data_id2", "object_data_id2", "action_data_id2"],
+ "instructions": (
+ {"decision": "grant"},
+ ),
+ "enabled": True
+ }
+ req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ rules = utilities.get_json(req.data)
+ return req, rules
+
+
+def add_rules_without_rule(client, policy_id):
+ data = {
+ "meta_rule_id": "meta_rule_id1",
+ "instructions": (
+ {"decision": "grant"},
+ ),
+ "enabled": True
+ }
+ req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ rules = utilities.get_json(req.data)
+ return req, rules
+
+
+def delete_rules(client, policy_id, meta_rule_id):
+ req = client.delete("/policies/{}/rules/{}".format(policy_id, meta_rule_id))
+ return req
+
+
+def test_get_rules():
+ policy_id = utilities.get_policy_id()
+ client = utilities.register_client()
+ req, rules = get_rules(client, policy_id)
+ assert req.status_code == 200
+ assert isinstance(rules, dict)
+ assert "rules" in rules
+ return req, rules
+
+
+def test_add_rules():
+ client = utilities.register_client()
+ req, rules, policy = builder.add_rules(client, )
+ assert req.status_code == 200
+
+
+def test_add_rules_without_policy_id():
+ client = utilities.register_client()
+ req, rules = add_rules_without_policy_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Policy Unknown"
+
+#
+# def test_add_rules_without_meta_rule_id():
+# policy_id = utilities.get_policy_id()
+# client = utilities.register_client()
+# req, rules = add_rules_without_meta_rule_id(client, policy_id)
+# assert req.status_code == 400
+# assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]"
+
+
+def test_add_rules_without_rule():
+ policy_id = utilities.get_policy_id()
+ client = utilities.register_client()
+ req, rules = add_rules_without_rule(client, policy_id)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == 'Invalid Key :rule not found'
+
+
+def test_delete_rules_with_invalid_parameters():
+ client = utilities.register_client()
+ req = delete_rules(client, "", "")
+ assert req.status_code == 404
+ # assert json.loads(req.data)["message"] == 'Invalid Key :rule not found'
+
+
+def test_delete_rules_without_policy_id():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy()
+ sub_data_id = builder.create_subject_data(policy_id, subject_category_id)
+ obj_data_id = builder.create_object_data(policy_id, object_category_id)
+ act_data_id = builder.create_action_data(policy_id, action_category_id)
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [sub_data_id, obj_data_id, act_data_id],
+ "instructions": (
+ {"decision": "grant"},
+ ),
+ "enabled": True
+ }
+ client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ req, added_rules = get_rules(client, policy_id)
+ id = list(added_rules["rules"]["rules"])[0]["id"]
+ rules = delete_rules(client, None, id)
+ assert rules.status_code == 200
diff --git a/old/moon_manager/tests/unit_python/api/test_unit_models.py b/old/moon_manager/tests/unit_python/api/test_unit_models.py
new file mode 100644
index 00000000..6e93ed28
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/test_unit_models.py
@@ -0,0 +1,352 @@
+# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+import json
+import api.utilities as utilities
+from helpers import data_builder as builder
+from helpers import policy_helper
+from helpers import model_helper
+from uuid import uuid4
+
+
+def get_models(client):
+ req = client.get("/models")
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def add_models(client, name, data=None):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+
+ if not data:
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ req = client.post("/models", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def update_model(client, name, model_id):
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def add_model_without_meta_rules_ids(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": []
+ }
+ req = client.post("/models", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def add_model_with_empty_meta_rule_id(client, name):
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [""]
+ }
+ req = client.post("/models", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def update_model_without_meta_rules_ids(client, model_id):
+ name = "model_id" + uuid4().hex
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": []
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ models = utilities.get_json(req.data)
+ return req, models
+
+
+def delete_models(client, name):
+ request, models = get_models(client)
+ for key, value in models['models'].items():
+ if value['name'] == name:
+ req = client.delete("/models/{}".format(key))
+ break
+ return req
+
+
+def delete_models_without_id(client):
+ req = client.delete("/models/{}".format(""))
+ return req
+
+
+def test_delete_model_assigned_to_policy():
+ policy_name = "testuser" + uuid4().hex
+ client = utilities.register_client()
+ req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(req.keys())[0]
+ data = {
+ "name": policy_name,
+ "description": "description of {}".format(policy_name),
+ "model_id": model_id,
+ "genre": "genre"
+ }
+ req = client.post("/policies", data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ req = client.delete("/models/{}".format(model_id))
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Model With Policy Error'
+
+
+def clean_models():
+ client = utilities.register_client()
+ req, models = get_models(client)
+ for key, value in models['models'].items():
+ print(key)
+ print(value)
+ client.delete("/models/{}".format(key))
+
+
+def test_get_models():
+ client = utilities.register_client()
+ req, models = get_models(client)
+ assert req.status_code == 200
+ assert isinstance(models, dict)
+ assert "models" in models
+
+
+def test_add_models():
+ clean_models()
+ client = utilities.register_client()
+ req, models = add_models(client, "testuser")
+ assert req.status_code == 200
+ assert isinstance(models, dict)
+ model_id = list(models["models"])[0]
+ assert "models" in models
+ assert models['models'][model_id]['name'] == "testuser"
+ assert models['models'][model_id]["description"] == "description of {}".format("testuser")
+
+
+def test_delete_models():
+ client = utilities.register_client()
+ req = delete_models(client, "testuser")
+ assert req.status_code == 200
+
+
+def test_update_models_with_assigned_policy():
+ client = utilities.register_client()
+
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+ value = {
+ "name": "test_policy" + uuid4().hex,
+ "model_id": model_id,
+ "description": "test",
+ }
+ policy = policy_helper.add_policies(value=value)
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ "meta_rules": []
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Model With Policy Error"
+
+
+def test_update_models_with_no_assigned_policy():
+ client = utilities.register_client()
+
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+ "meta_rules": []
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 200
+
+
+def test_add_models_with_meta_rule_key():
+ client = utilities.register_client()
+
+ model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex)
+ model_id = list(model.keys())[0]
+
+ data = {
+ "name": "model_" + uuid4().hex,
+ "description": "description of model_2",
+
+ }
+ req = client.patch("/models/{}".format(model_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Invalid Key :meta_rules not found"
+
+
+def test_delete_models_without_id():
+ client = utilities.register_client()
+ req = delete_models_without_id(client)
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "400: Model Unknown"
+
+
+def test_add_model_with_empty_name():
+ clean_models()
+ client = utilities.register_client()
+ req, models = add_models(client, "<br/>")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_model_with_name_contain_space():
+ clean_models()
+ client = utilities.register_client()
+ req, models = add_models(client, "test<br>user")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_add_model_with_name_space():
+ clean_models()
+ client = utilities.register_client()
+ req, models = add_models(client, " ")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Model Unknown'
+
+
+def test_add_model_with_empty_meta_rule_id():
+ clean_models()
+ client = utilities.register_client()
+ req, meta_rules = add_model_with_empty_meta_rule_id(client, "testuser")
+ assert req.status_code == 400
+ assert json.loads(req.data)["message"] == '400: Meta Rule Unknown'
+
+
+def test_add_model_with_existed_name():
+ clean_models()
+ client = utilities.register_client()
+ name = uuid4().hex
+ req, models = add_models(client, name)
+ assert req.status_code == 200
+ req, models = add_models(client, name)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Model Error'
+
+
+def test_add_model_with_existed_meta_rules_list():
+ clean_models()
+ client = utilities.register_client()
+ name = uuid4().hex
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ name = uuid4().hex
+ req, models = add_models(client=client, name=name, data=data)
+ assert req.status_code == 200
+
+ data = {
+ "name": name,
+ "description": "description of {}".format(name),
+ "meta_rules": [meta_rule_id]
+ }
+ req, models = add_models(client=client, name=name, data=data)
+ assert req.status_code == 409
+ assert json.loads(req.data)["message"] == '409: Model Error'
+
+
+def test_add_model_without_meta_rules():
+ clean_models()
+ client = utilities.register_client()
+ req, meta_rules = add_model_without_meta_rules_ids(client, "testuser")
+ assert req.status_code == 200
+ # assert json.loads(req.data)["message"] == "Key: 'meta_rules', [Empty Container]"
+
+
+def test_update_model():
+ clean_models()
+ client = utilities.register_client()
+ req = add_models(client, "testuser")
+ model_id = list(req[1]['models'])[0]
+ req_update = update_model(client, "testuser", model_id)
+ assert req_update[0].status_code == 200
+ model_id = list(req_update[1]["models"])[0]
+ assert req_update[1]["models"][model_id]["meta_rules"][0] is not None
+ delete_models(client, "testuser")
+
+
+def test_update_model_name_with_space():
+ clean_models()
+ client = utilities.register_client()
+ req = add_models(client, "testuser")
+ model_id = list(req[1]['models'])[0]
+ req_update = update_model(client, " ", model_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]["message"] == '400: Model Unknown'
+
+
+def test_update_model_with_empty_name():
+ clean_models()
+ client = utilities.register_client()
+ req = add_models(client, "testuser")
+ model_id = list(req[1]['models'])[0]
+ req_update = update_model(client, "", model_id)
+ assert req_update[0].status_code == 400
+ assert req_update[1]['message'] == '400: Model Unknown'
+
+
+def test_update_meta_rules_without_id():
+ clean_models()
+ client = utilities.register_client()
+ req_update = update_model(client, "testuser", "")
+ assert req_update[0].status_code == 400
+ assert json.loads(req_update[0].data)["message"] == "400: Model Unknown"
+
+
+def test_update_meta_rules_without_name():
+ client = utilities.register_client()
+ req_update = update_model(client, "<a></a>", "1234567")
+ assert req_update[0].status_code == 400
+ assert json.loads(req_update[0].data)[
+ "message"] == "Key: 'name', [Forbidden characters in string]"
+
+
+def test_update_meta_rules_without_meta_rules():
+ value = {
+ "name": "mls_model_id" + uuid4().hex,
+ "description": "test",
+ "meta_rules": []
+ }
+ model = model_helper.add_model(value=value)
+ model_id = list(model.keys())[0]
+ client = utilities.register_client()
+ req_update = update_model_without_meta_rules_ids(client, model_id)
+ assert req_update[0].status_code == 200
diff --git a/old/moon_manager/tests/unit_python/api/utilities.py b/old/moon_manager/tests/unit_python/api/utilities.py
new file mode 100644
index 00000000..2e51fec8
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/api/utilities.py
@@ -0,0 +1,26 @@
+import json
+from uuid import uuid4
+
+def get_json(data):
+ return json.loads(data.decode("utf-8"))
+
+
+def register_client():
+ import moon_manager.server
+ server = moon_manager.server.create_server()
+ client = server.app.test_client()
+ return client
+
+
+def get_policy_id():
+ from helpers import policy_helper
+ value = {
+ "name": "test_policy"+uuid4().hex,
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ policy_helper.add_policies(value=value)
+ req = policy_helper.get_policies()
+ policy_id = list(req.keys())[0]
+ return policy_id
diff --git a/old/moon_manager/tests/unit_python/conftest.py b/old/moon_manager/tests/unit_python/conftest.py
new file mode 100644
index 00000000..90a27e54
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/conftest.py
@@ -0,0 +1,254 @@
+import base64
+import json
+import logging
+import pytest
+import requests_mock
+
+CONF = {
+ "openstack": {
+ "keystone": {
+ "url": "http://keystone:5000/v3",
+ "user": "admin",
+ "check_token": False,
+ "password": "p4ssw0rd",
+ "domain": "default",
+ "certificate": False,
+ "project": "admin"
+ }
+ },
+ "components": {
+ "wrapper": {
+ "bind": "0.0.0.0",
+ "port": 8080,
+ "container": "wukongsun/moon_wrapper:v4.3",
+ "timeout": 5,
+ "hostname": "wrapper"
+ },
+ "manager": {
+ "bind": "0.0.0.0",
+ "port": 8082,
+ "container": "wukongsun/moon_manager:v4.3",
+ "hostname": "manager"
+ },
+ "port_start": 31001,
+ "orchestrator": {
+ "bind": "0.0.0.0",
+ "port": 8083,
+ "container": "wukongsun/moon_orchestrator:v4.3",
+ "hostname": "orchestrator"
+ },
+ "pipeline": {
+ "interface": {
+ "bind": "0.0.0.0",
+ "port": 8080,
+ "container": "wukongsun/moon_interface:v4.3",
+ "hostname": "interface"
+ },
+ "authz": {
+ "bind": "0.0.0.0",
+ "port": 8081,
+ "container": "wukongsun/moon_authz:v4.3",
+ "hostname": "authz"
+ },
+ }
+ },
+ "logging": {
+ "handlers": {
+ "file": {
+ "filename": "/tmp/moon.log",
+ "class": "logging.handlers.RotatingFileHandler",
+ "level": "DEBUG",
+ "formatter": "custom",
+ "backupCount": 3,
+ "maxBytes": 1048576
+ },
+ "console": {
+ "class": "logging.StreamHandler",
+ "formatter": "brief",
+ "level": "INFO",
+ "stream": "ext://sys.stdout"
+ }
+ },
+ "formatters": {
+ "brief": {
+ "format": "%(levelname)s %(name)s %(message)-30s"
+ },
+ "custom": {
+ "format": "%(asctime)-15s %(levelname)s %(name)s %(message)s"
+ }
+ },
+ "root": {
+ "handlers": [
+ "console"
+ ],
+ "level": "ERROR"
+ },
+ "version": 1,
+ "loggers": {
+ "moon": {
+ "handlers": [
+ "console",
+ "file"
+ ],
+ "propagate": False,
+ "level": "DEBUG"
+ }
+ }
+ },
+ "slave": {
+ "name": None,
+ "master": {
+ "url": None,
+ "login": None,
+ "password": None
+ }
+ },
+ "docker": {
+ "url": "tcp://172.88.88.1:2376",
+ "network": "moon"
+ },
+ "database": {
+ "url": "sqlite:///database.db",
+ # "url": "mysql+pymysql://moon:p4sswOrd1@db/moon",
+ "driver": "sql"
+ },
+ "messenger": {
+ "url": "rabbit://moon:p4sswOrd1@messenger:5672/moon"
+ },
+}
+
+COMPONENTS = (
+ "logging",
+ "openstack/keystone",
+ "database",
+ "slave",
+ "components/manager",
+ "components/orchestrator"
+)
+
+PODS = {
+ "pods": {
+ "721760dd-de5f-11e7-8001-3863bbb766f3": [
+ {
+ "pdp_id": "b3d3e18abf3340e8b635fd49e6634ccd",
+ "port": 8080,
+ "genre": "interface",
+ "name": "interface-paltry",
+ "keystone_project_id": "a64beb1cc224474fb4badd43173e7101",
+ "namespace": "moon",
+ "container": "wukongsun/moon_interface:v4.3"
+ },
+ {
+ "pdp_id": "b3d3e18abf3340e8b635fd49e6634ccd",
+ "meta_rule_id": "f8f49a779ceb47b3ac810f01ef71b4e0",
+ "port": 8081,
+ "genre": "authz",
+ "name": "authz-economic",
+ "policy_id": "f8f49a779ceb47b3ac810f01ef71b4e0",
+ "keystone_project_id": "a64beb1cc224474fb4badd43173e7101",
+ "namespace": "moon",
+ "container": "wukongsun/moon_authz:v4.3"
+ }
+ ]
+ }
+}
+
+SLAVES = {
+ "slaves": [
+ {
+ "context":
+ {
+ "cluster": "kubernetes",
+ "user": "kubernetes-admin"
+ },
+ "name": "kubernetes-admin@kubernetes",
+ "configured": True,
+ "wrapper_name": "mywrapper",
+ "ip": "NC",
+ "port": 31002,
+ "internal_port": 8080
+ }
+ ]
+}
+
+
+def get_b64_conf(component=None):
+ if component in CONF:
+ return base64.b64encode(
+ json.dumps(
+ CONF[component]).encode('utf-8') + b"\n").decode('utf-8')
+ elif "/" in component:
+ key1, _, key2 = component.partition("/")
+ return base64.b64encode(
+ json.dumps(
+ CONF[key1][key2]).encode('utf-8') + b"\n").decode('utf-8')
+ else:
+ return base64.b64encode(
+ json.dumps(CONF).encode('utf-8') + b"\n").decode('utf-8')
+
+
+@pytest.fixture(autouse=True)
+def no_requests(monkeypatch):
+ """ Modify the response from Requests module
+ """
+ with requests_mock.Mocker(real_http=True) as m:
+ for component in COMPONENTS:
+ m.register_uri(
+ 'GET', 'http://consul:8500/v1/kv/{}'.format(component),
+ json=[{'Key': component, 'Value': get_b64_conf(component)}]
+ )
+ m.register_uri(
+ 'POST', 'http://keystone:5000/v3/auth/tokens',
+ headers={'X-Subject-Token': "111111111"}
+ )
+ m.register_uri(
+ 'DELETE', 'http://keystone:5000/v3/auth/tokens',
+ headers={'X-Subject-Token': "111111111"}
+ )
+
+ def match_request_text(request):
+ # request.url may be None, or '' prevents a TypeError.
+ return 'http://keystone:5000/v3/users?name=testuser' in request.url
+
+ m.register_uri(
+ requests_mock.ANY, '/v3/users',
+ additional_matcher=match_request_text,
+ json={"users": {}}
+ )
+ m.register_uri(
+ 'POST', 'http://keystone:5000/v3/users/',
+ json={"users": [{"id": "1111111111111"}]}
+ )
+ m.register_uri(
+ 'POST', 'http://orchestrator:8083/pods',
+ json=PODS,
+ headers={"content-type": "application/json"}
+ )
+ m.register_uri(
+ 'GET', 'http://orchestrator:8083/pods',
+ json=PODS
+ )
+ m.register_uri(
+ 'GET', 'http://localhost/slaves',
+ json=SLAVES
+ )
+ m.register_uri(
+ 'DELETE', 'http://orchestrator:8083/pods/{}'.format(list([PODS['pods'].keys()])[0]),
+ headers={"content-type": "application/json"}
+ )
+
+ print("Start populating the DB.")
+ from python_moondb.db_manager import init_engine, main
+ engine = init_engine()
+ print("engine={}".format(engine))
+ main("upgrade", logging.getLogger("db_manager"), engine)
+ print("End populating the DB.")
+ yield m
+
+# @pytest.fixture(autouse=True, scope="session")
+# def manage_database():
+# from moon_db.db_manager import init_engine, run
+# engine = init_engine()
+# run("upgrade", logging.getLogger("db_manager"), engine)
+# yield
+# print("Will close the DB")
diff --git a/old/moon_manager/tests/unit_python/helpers/__init__.py b/old/moon_manager/tests/unit_python/helpers/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/__init__.py
diff --git a/old/moon_manager/tests/unit_python/helpers/assignment_helper.py b/old/moon_manager/tests/unit_python/helpers/assignment_helper.py
new file mode 100644
index 00000000..22a56e38
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/assignment_helper.py
@@ -0,0 +1,49 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def get_action_assignments(policy_id, action_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_action_assignments("", policy_id, action_id, category_id)
+
+
+def add_action_assignment(policy_id, action_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def delete_action_assignment(policy_id, action_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id)
+
+
+def get_object_assignments(policy_id, object_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_object_assignments("", policy_id, object_id, category_id)
+
+
+def add_object_assignment(policy_id, object_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def delete_object_assignment(policy_id, object_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id)
+
+
+def get_subject_assignments(policy_id, subject_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id)
+
+
+def add_subject_assignment(policy_id, subject_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
+
+def delete_subject_assignment(policy_id, subject_id, category_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id)
+
diff --git a/old/moon_manager/tests/unit_python/helpers/category_helper.py b/old/moon_manager/tests/unit_python/helpers/category_helper.py
new file mode 100644
index 00000000..6c419ca8
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/category_helper.py
@@ -0,0 +1,40 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+
+def add_subject_category(cat_id=None, value=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.add_subject_category(user_id=None, category_id=cat_id, value=value)
+ return category
+
+
+def get_subject_category(cat_id=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.get_subject_categories(user_id=None, category_id=cat_id)
+ return category
+
+
+def add_object_category(cat_id=None, value=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.add_object_category(user_id=None, category_id=cat_id, value=value)
+ return category
+
+
+def get_object_category(cat_id=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.get_object_categories(user_id=None, category_id=cat_id)
+ return category
+
+
+def add_action_category(cat_id=None, value=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.add_action_category(user_id=None, category_id=cat_id, value=value)
+ return category
+
+
+def get_action_category(cat_id=None):
+ from python_moondb.core import ModelManager
+ category = ModelManager.get_action_categories(user_id=None, category_id=cat_id)
+ return category
diff --git a/old/moon_manager/tests/unit_python/helpers/data_builder.py b/old/moon_manager/tests/unit_python/helpers/data_builder.py
new file mode 100644
index 00000000..91808cbe
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/data_builder.py
@@ -0,0 +1,260 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from .category_helper import *
+from .policy_helper import *
+from .data_helper import *
+from helpers import model_helper
+from .meta_rule_helper import *
+import api.utilities as utilities
+import json
+from uuid import uuid4
+
+
+def create_subject_category(name):
+ subject_category = add_subject_category(
+ value={"name": name + uuid4().hex, "description": "description 1"})
+ return list(subject_category.keys())[0]
+
+
+def create_object_category(name):
+ object_category = add_object_category(
+ value={"name": name + uuid4().hex, "description": "description 1"})
+ return list(object_category.keys())[0]
+
+
+def create_action_category(name):
+ action_category = add_action_category(
+ value={"name": name + uuid4().hex, "description": "description 1"})
+ return list(action_category.keys())[0]
+
+
+def create_model(meta_rule_id, model_name="test_model"):
+ value = {
+ "name": model_name + uuid4().hex,
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+
+ }
+ return value
+
+
+def create_policy(model_id, policy_name="policy_1"):
+ value = {
+ "name": policy_name,
+ "model_id": model_id,
+ "genre": "authz",
+ "description": "test",
+ }
+ return value
+
+
+def create_pdp(policies_ids):
+ value = {
+ "name": "test_pdp",
+ "security_pipeline": policies_ids,
+ "keystone_project_id": "keystone_project_id1",
+ "description": "...",
+ }
+ return value
+
+
+def create_new_policy(subject_category_name=None, object_category_name=None,
+ action_category_name=None, model_name=None, policy_name=None,
+ meta_rule_name=None):
+ if not subject_category_name:
+ subject_category_name = "subjectCategory_" + uuid4().hex
+ if not object_category_name:
+ object_category_name = "objectCategory_" + uuid4().hex
+ if not action_category_name:
+ action_category_name = "actionCategory_" + uuid4().hex
+
+ if not meta_rule_name:
+ meta_rule_name = "meta_rule_" + uuid4().hex
+
+ if not model_name:
+ model_name = "model_name_" + uuid4().hex
+ if not policy_name:
+ policy_name = "policy_name_" + uuid4().hex
+
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = create_new_meta_rule(
+ subject_category_name=subject_category_name + uuid4().hex,
+ object_category_name=object_category_name + uuid4().hex,
+ action_category_name=action_category_name + uuid4().hex,
+ meta_rule_name=meta_rule_name + uuid4().hex
+ )
+
+ model = model_helper.add_model(value=create_model(meta_rule_id, model_name + uuid4().hex))
+ model_id = list(model.keys())[0]
+ value = create_policy(model_id, policy_name + uuid4().hex)
+ policy = add_policies(value=value)
+ assert policy
+ policy_id = list(policy.keys())[0]
+ return subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id
+
+
+def create_new_meta_rule(subject_category_name=None, object_category_name=None,
+ action_category_name=None, meta_rule_name=None):
+ if not subject_category_name:
+ subject_category_name = "subjectCategory_" + uuid4().hex
+ if not object_category_name:
+ object_category_name = "objectCategory_" + uuid4().hex
+ if not action_category_name:
+ action_category_name = "actionCategory_" + uuid4().hex
+
+ if not meta_rule_name:
+ meta_rule_name = "meta_rule_" + uuid4().hex
+
+ subject_category_id = create_subject_category(subject_category_name)
+ object_category_id = create_object_category(object_category_name)
+ action_category_id = create_action_category(action_category_name)
+ value = {"name": meta_rule_name,
+ "description": "name of the meta rule algorithm",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ meta_rule = add_meta_rule(value=value)
+ return subject_category_id, object_category_id, action_category_id, list(meta_rule.keys())[0]
+
+
+def create_subject(policy_id):
+ value = {
+ "name": "testuser" + uuid4().hex,
+ "description": "test",
+ }
+ subject = add_subject(policy_id=policy_id, value=value)
+ return list(subject.keys())[0]
+
+
+def create_object(policy_id):
+ value = {
+ "name": "testobject" + uuid4().hex,
+ "description": "test",
+ }
+ object = add_object(policy_id=policy_id, value=value)
+ return list(object.keys())[0]
+
+
+def create_action(policy_id):
+ value = {
+ "name": "testaction" + uuid4().hex,
+ "description": "test",
+ }
+ action = add_action(policy_id=policy_id, value=value)
+ return list(action.keys())[0]
+
+
+def create_subject_data(policy_id, category_id):
+ value = {
+ "name": "subject-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ subject_data = add_subject_data(policy_id=policy_id, category_id=category_id, value=value).get(
+ 'data')
+ assert subject_data
+ return list(subject_data.keys())[0]
+
+
+def create_object_data(policy_id, category_id):
+ value = {
+ "name": "object-security-level",
+ "description": {"low": "", "medium": "", "high": ""},
+ }
+ object_data = add_object_data(policy_id=policy_id, category_id=category_id, value=value).get(
+ 'data')
+ return list(object_data.keys())[0]
+
+
+def create_action_data(policy_id, category_id):
+ value = {
+ "name": "action-type",
+ "description": {"vm-action": "", "storage-action": "", },
+ }
+ action_data = add_action_data(policy_id=policy_id, category_id=category_id, value=value).get(
+ 'data')
+ return list(action_data.keys())[0]
+
+
+def get_policy_id_with_subject_assignment():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ subject_id = create_subject(policy_id)
+ data_id = create_subject_data(policy_id=policy_id, category_id=subject_category_id)
+
+ data = {
+ "id": subject_id,
+ "category_id": subject_category_id,
+ "data_id": data_id
+ }
+ client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ return policy_id
+
+
+def get_policy_id_with_object_assignment():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ object_id = create_object(policy_id)
+ data_id = create_object_data(policy_id=policy_id, category_id=object_category_id)
+
+ data = {
+ "id": object_id,
+ "category_id": object_category_id,
+ "data_id": data_id
+ }
+
+ client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ return policy_id
+
+
+def get_policy_id_with_action_assignment():
+ client = utilities.register_client()
+ subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy(
+ subject_category_name="subject_category1" + uuid4().hex,
+ object_category_name="object_category1" + uuid4().hex,
+ action_category_name="action_category1" + uuid4().hex,
+ meta_rule_name="meta_rule_1" + uuid4().hex)
+ action_id = create_action(policy_id)
+ data_id = create_action_data(policy_id=policy_id, category_id=action_category_id)
+
+ data = {
+ "id": action_id,
+ "category_id": action_category_id,
+ "data_id": data_id
+ }
+ client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ return policy_id
+
+
+def add_rules(client):
+ sub_id, obj_id, act_id, meta_rule_id, policy_id = create_new_policy("sub_cat" + uuid4().hex,
+ "obj_cat" + uuid4().hex,
+ "act_cat" + uuid4().hex)
+ sub_data_id = create_subject_data(policy_id, sub_id)
+ obj_data_id = create_object_data(policy_id, obj_id)
+ act_data_id = create_action_data(policy_id, act_id)
+ data = {
+ "meta_rule_id": meta_rule_id,
+ "rule": [sub_data_id, obj_data_id, act_data_id],
+ "instructions": (
+ {"decision": "grant"},
+ ),
+ "enabled": True
+ }
+ req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data),
+ headers={'Content-Type': 'application/json'})
+ rules = utilities.get_json(req.data)
+ return req, rules, policy_id
diff --git a/old/moon_manager/tests/unit_python/helpers/data_helper.py b/old/moon_manager/tests/unit_python/helpers/data_helper.py
new file mode 100644
index 00000000..e1c05640
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/data_helper.py
@@ -0,0 +1,99 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+
+def get_action_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_action_data("", policy_id, data_id, category_id)
+
+
+def add_action_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action_data("", policy_id, data_id, category_id, value)
+
+
+def delete_action_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action_data("", policy_id=policy_id, data_id=data_id)
+
+
+def get_object_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_object_data("", policy_id, data_id, category_id)
+
+
+def add_object_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object_data("", policy_id, data_id, category_id, value)
+
+
+def delete_object_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object_data("", policy_id=policy_id, data_id=data_id)
+
+
+def get_subject_data(policy_id, data_id=None, category_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subject_data("", policy_id, data_id, category_id)
+
+
+def add_subject_data(policy_id, data_id=None, category_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value)
+
+
+def delete_subject_data(policy_id, data_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject_data("", policy_id=policy_id, data_id=data_id)
+
+
+def get_actions(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_actions("", policy_id, perimeter_id)
+
+
+def add_action(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_action("", policy_id, perimeter_id, value)
+
+
+def delete_action(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_action("", policy_id, perimeter_id)
+
+
+def get_objects(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_objects("", policy_id, perimeter_id)
+
+
+def add_object(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_object("", policy_id, perimeter_id, value)
+
+
+def delete_object(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_object("", policy_id, perimeter_id)
+
+
+def get_subjects(policy_id, perimeter_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_subjects("", policy_id, perimeter_id)
+
+
+def add_subject(policy_id, perimeter_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.add_subject("", policy_id, perimeter_id, value)
+
+
+def delete_subject(policy_id, perimeter_id):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_subject("", policy_id, perimeter_id)
+
+
+def get_available_metadata(policy_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_available_metadata("", policy_id)
diff --git a/old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py b/old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py
new file mode 100644
index 00000000..e882706b
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py
@@ -0,0 +1,49 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from helpers import data_builder as builder
+from uuid import uuid4
+
+
+def set_meta_rule(meta_rule_id, value=None):
+ from python_moondb.core import ModelManager
+ if not value:
+ action_category_id = builder.create_action_category("action_category_id1"+uuid4().hex)
+ subject_category_id = builder.create_subject_category("subject_category_id1"+uuid4().hex)
+ object_category_id = builder.create_object_category("object_category_id1"+uuid4().hex)
+ value = {
+ "name": "MLS_meta_rule",
+ "description": "test",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ return ModelManager.set_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value)
+
+
+def add_meta_rule(meta_rule_id=None, value=None):
+ from python_moondb.core import ModelManager
+ if not value:
+ action_category_id = builder.create_action_category("action_category_id1"+uuid4().hex)
+ subject_category_id = builder.create_subject_category("subject_category_id1"+uuid4().hex)
+ object_category_id = builder.create_object_category("object_category_id1"+uuid4().hex)
+ value = {
+ "name": "MLS_meta_rule"+uuid4().hex,
+ "description": "test",
+ "subject_categories": [subject_category_id],
+ "object_categories": [object_category_id],
+ "action_categories": [action_category_id]
+ }
+ return ModelManager.add_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value)
+
+
+def get_meta_rules(meta_rule_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.get_meta_rules(user_id=None, meta_rule_id=meta_rule_id)
+
+
+def delete_meta_rules(meta_rule_id=None):
+ from python_moondb.core import ModelManager
+ ModelManager.delete_meta_rule(user_id=None, meta_rule_id=meta_rule_id)
diff --git a/old/moon_manager/tests/unit_python/helpers/model_helper.py b/old/moon_manager/tests/unit_python/helpers/model_helper.py
new file mode 100644
index 00000000..73808e03
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/model_helper.py
@@ -0,0 +1,48 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from helpers import data_builder as builder
+from uuid import uuid4
+
+
+def get_models(model_id=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.get_models(user_id=None, model_id=model_id)
+
+
+def add_model(model_id=None, value=None):
+ from python_moondb.core import ModelManager
+ if not value:
+ subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule()
+ name = "MLS"+uuid4().hex if model_id is None else "MLS " + model_id
+ value = {
+ "name": name,
+ "description": "test",
+ "meta_rules": [meta_rule_id]
+ }
+ return ModelManager.add_model(user_id=None, model_id=model_id, value=value)
+
+
+def delete_models(uuid=None, name=None):
+ from python_moondb.core import ModelManager
+ if not uuid:
+ for model_id, model_value in get_models():
+ if name == model_value['name']:
+ uuid = model_id
+ break
+ ModelManager.delete_model(user_id=None, model_id=uuid)
+
+
+def delete_all_models():
+ from python_moondb.core import ModelManager
+ models_values = get_models()
+ print(models_values)
+ for model_id, model_value in models_values.items():
+ ModelManager.delete_model(user_id=None, model_id=model_id)
+
+
+def update_model(model_id=None, value=None):
+ from python_moondb.core import ModelManager
+ return ModelManager.update_model(user_id=None, model_id=model_id, value=value)
diff --git a/old/moon_manager/tests/unit_python/helpers/pdp_helper.py b/old/moon_manager/tests/unit_python/helpers/pdp_helper.py
new file mode 100644
index 00000000..3d169b06
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/pdp_helper.py
@@ -0,0 +1,23 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+def update_pdp(pdp_id, value):
+ from python_moondb.core import PDPManager
+ return PDPManager.update_pdp("", pdp_id, value)
+
+
+def delete_pdp(pdp_id):
+ from python_moondb.core import PDPManager
+ PDPManager.delete_pdp("", pdp_id)
+
+
+def add_pdp(pdp_id=None, value=None):
+ from python_moondb.core import PDPManager
+ return PDPManager.add_pdp("", pdp_id, value)
+
+
+def get_pdp(pdp_id=None):
+ from python_moondb.core import PDPManager
+ return PDPManager.get_pdp("", pdp_id)
diff --git a/old/moon_manager/tests/unit_python/helpers/policy_helper.py b/old/moon_manager/tests/unit_python/helpers/policy_helper.py
new file mode 100644
index 00000000..eddd0b8d
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/helpers/policy_helper.py
@@ -0,0 +1,63 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+from uuid import uuid4
+
+def get_policies():
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_policies("admin")
+
+
+def add_policies(policy_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ if not value:
+ value = {
+ "name": "test_policy"+ uuid4().hex,
+ "model_id": "",
+ "genre": "authz",
+ "description": "test",
+ }
+ return PolicyManager.add_policy("admin", policy_id=policy_id, value=value)
+
+
+def delete_policies(uuid=None, name=None):
+ from python_moondb.core import PolicyManager
+ if not uuid:
+ for policy_id, policy_value in get_policies():
+ if name == policy_value['name']:
+ uuid = policy_id
+ break
+ PolicyManager.delete_policy("admin", uuid)
+
+
+def update_policy(policy_id, value):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.update_policy("admin", policy_id, value)
+
+
+def get_policy_from_meta_rules(meta_rule_id):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id)
+
+
+def get_rules(policy_id=None, meta_rule_id=None, rule_id=None):
+ from python_moondb.core import PolicyManager
+ return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id)
+
+
+def add_rule(policy_id=None, meta_rule_id=None, value=None):
+ from python_moondb.core import PolicyManager
+ if not value:
+ value = {
+ "rule": ("high", "medium", "vm-action"),
+ "instructions": ({"decision": "grant"}),
+ "enabled": "",
+ }
+ return PolicyManager.add_rule("", policy_id, meta_rule_id, value)
+
+
+def delete_rule(policy_id=None, rule_id=None):
+ from python_moondb.core import PolicyManager
+ PolicyManager.delete_rule("", policy_id, rule_id)
diff --git a/old/moon_manager/tests/unit_python/requirements.txt b/old/moon_manager/tests/unit_python/requirements.txt
new file mode 100644
index 00000000..d6f190e4
--- /dev/null
+++ b/old/moon_manager/tests/unit_python/requirements.txt
@@ -0,0 +1,5 @@
+flask
+flask_cors
+flask_restful
+python_moondb==1.2.20
+python_moonutilities==1.4.20