diff options
Diffstat (limited to 'moon_manager/tests')
32 files changed, 3268 insertions, 276 deletions
diff --git a/moon_manager/tests/functional_pod/json/mls.json b/moon_manager/tests/functional_pod/json/mls.json new file mode 100644 index 00000000..01ef6deb --- /dev/null +++ b/moon_manager/tests/functional_pod/json/mls.json @@ -0,0 +1,89 @@ +{ + "pdps": [{"name" : "pdp_mls", "keystone_project_id" : "", "description": "", "policies": [{"name": "MLS policy example"}]}], + + "policies":[{ "name": "MLS policy example", "genre": "authz", "description": "", "model": {"name": "MLS"} , "mandatory" :false , "override":true}], + + "models":[{"name":"MLS", "description":"","meta_rules": [{"name" : "mls"}], "override":true}], + + + + + + "subjects": [{ "name":"adminuser", "description": "", "extra": {}, "policies": [{ "name": "MLS policy example"}]} , + { "name": "user1", "description": "", "extra": {}, "policies": [{ "name": "MLS policy example"}] }, + { "name": "user2", "description": "", "extra": {}, "policies": [{ "name": "MLS policy example"}] }], + + "subject_categories": [{ "name":"subject-security-level", "description": "" }], + + "subject_data": [{ "name":"low", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "subject-security-level"}}, + { "name":"medium", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "subject-security-level"}}, + { "name":"high", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "subject-security-level"}}], + + "subject_assignments":[{ "subject" : {"name": "adminuser"}, "category" : {"name": "subject-security-level"}, "assignments": [{"name" : "high"}]}, + { "subject" : {"name": "user1"}, "category" : {"name": "subject-security-level"}, "assignments": [{"name" : "medium"}] }], + + + + + + + "objects": [{ "name":"vm0", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]} , + {"name": "vm1", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]} ], + + "object_categories": [{"name":"object-security-level", "description": ""}], + + "object_data": [{ "name":"low", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "object-security-level"}}, + { "name":"medium", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "object-security-level"}}, + { "name":"high", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "object-security-level"}}], + + "object_assignments":[{ "object" : {"name": "vm0"}, "category" : {"name": "object-security-level"}, "assignments": [{"name" : "medium"}]}, + { "object" : {"name": "vm1"}, "category" : {"name": "object-security-level"}, "assignments": [{"name" : "low"}]}], + + + + + + + "actions": [{ "name": "start", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]} , + { "name": "stop", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]}], + + "action_categories": [{"name":"action-type", "description": ""}], + + "action_data": [{"name":"vm-action", "description": "", "policies": [{"name": "MLS policy example"}], "category": {"name": "action-type"}}, + {"name":"storage-action", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "action-type"}}], + + "action_assignments":[{ "action" : {"name": "start"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}]}, + { "action" : {"name": "stop"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}]}], + + + + + + + "meta_rules":[{"name":"mls", "description": "", + "subject_categories": [{"name": "subject-security-level"}], + "object_categories": [{"name": "object-security-level"}], + "action_categories": [{"name": "action-type"}] + }], + + "rules": [{ + "meta_rule": {"name" : "mls"}, + "rule": {"subject_data" : [{"name":"high"}], "object_data": [{"name": "medium"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"MLS policy example"}, + "instructions" : {"decision" : "grant"} + }, { + "meta_rule": {"name" : "mls"}, + "rule": {"subject_data" : [{"name":"high"}], "object_data": [{"name": "low"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"MLS policy example"}, + "instructions" : {"decision" : "grant"} + }, { + "meta_rule": {"name" : "mls"}, + "rule": {"subject_data" : [{"name":"medium"}], "object_data": [{"name": "low"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"MLS policy example"}, + "instructions" : {"decision" : "grant"} + }] + + + + +}
\ No newline at end of file diff --git a/moon_manager/tests/functional_pod/json/rbac.json b/moon_manager/tests/functional_pod/json/rbac.json new file mode 100644 index 00000000..a75f291b --- /dev/null +++ b/moon_manager/tests/functional_pod/json/rbac.json @@ -0,0 +1,85 @@ +{ + "pdps": [{"name" : "pdp_rbac", "keystone_project_id" : "", "description": "", "policies": [{"name": "RBAC policy example"}]}], + + "policies":[{ "name": "RBAC policy example", "genre": "authz", "description": "", "model": {"name": "RBAC"} , "mandatory" :true , "override":true}], + + "models":[{"name":"RBAC", "description":"","meta_rules": [{"name" : "rbac"}], "override":true}], + + + + + + "subjects": [{ "name":"adminuser", "description": "", "extra": {}, "policies": [{ "name": "RBAC policy example"}]} , + { "name": "user1", "description": "", "extra": {}, "policies": [{ "name": "RBAC policy example"}] }, + { "name": "public", "description": "", "extra": {}, "policies": [] }], + + "subject_categories": [{ "name":"role", "description": "" }], + + "subject_data": [{ "name":"admin", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "role"}}, + { "name":"employee", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "role"}}, + { "name":"*", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "role"}}], + + "subject_assignments":[{ "subject" : {"name": "adminuser"}, "category" : {"name": "role"}, "assignments": [{"name" : "admin"}, {"name" : "employee"}, {"name" : "*"}]}, + { "subject" : {"name": "user1"}, "category" : {"name": "role"}, "assignments": [{"name" : "employee"}, {"name" : "*"}] }], + + + + + + + "objects": [{ "name":"vm0", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]} , + {"name": "vm1", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]} ], + + "object_categories": [{"name":"id", "description": ""}], + + "object_data": [{ "name":"vm0", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "id"}}, + { "name":"vm1", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "id"}}, + { "name":"*", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "id"}}], + + "object_assignments":[{ "object" : {"name": "vm0"}, "category" : {"name": "id"}, "assignments": [{"name" : "vm0"}, {"name" : "*"}]}, + { "object" : {"name": "vm1"}, "category" : {"name": "id"}, "assignments": [{"name" : "vm1"}, {"name" : "*"}]}], + + + + + + + "actions": [{ "name": "start", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]} , + { "name": "stop", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]}], + + "action_categories": [{"name":"action-type", "description": ""}], + + "action_data": [{"name":"vm-action", "description": "", "policies": [{"name": "RBAC policy example"}], "category": {"name": "action-type"}}, + {"name":"*", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "action-type"}}], + + "action_assignments":[{ "action" : {"name": "start"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}, {"name" : "*"}]}, + { "action" : {"name": "stop"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}, {"name" : "*"}]}], + + + + + + + "meta_rules":[{"name":"rbac", "description": "", + "subject_categories": [{"name": "role"}], + "object_categories": [{"name": "id"}], + "action_categories": [{"name": "action-type"}] + }], + + "rules": [{ + "meta_rule": {"name" : "rbac"}, + "rule": {"subject_data" : [{"name":"admin"}], "object_data": [{"name": "vm0"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"RBAC policy example"}, + "instructions" : {"decision" : "grant"}, + "enabled": true + }, { + "meta_rule": {"name" : "rbac"}, + "rule": {"subject_data" : [{"name":"employee"}], "object_data": [{"name": "vm1"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"RBAC policy example"}, + "instructions" : {"decision" : "grant"} + }] + + + + +}
\ No newline at end of file diff --git a/moon_manager/tests/functional_pod/run_functional_tests.sh b/moon_manager/tests/functional_pod/run_functional_tests.sh index 7a95a491..960e9480 100644 --- a/moon_manager/tests/functional_pod/run_functional_tests.sh +++ b/moon_manager/tests/functional_pod/run_functional_tests.sh @@ -1,4 +1,11 @@ #!/usr/bin/env bash +if [ -d /data/dist ]; +then + pip install /data/dist/*.tar.gz --upgrade + pip install /data/dist/*.whl --upgrade +fi + + cd /data/tests/functional_pod pytest . diff --git a/moon_manager/tests/functional_pod/test_manager.py b/moon_manager/tests/functional_pod/test_manager.py index aab5fba4..454d861b 100644 --- a/moon_manager/tests/functional_pod/test_manager.py +++ b/moon_manager/tests/functional_pod/test_manager.py @@ -1,6 +1,45 @@ import json import requests +def test_import_rbac(context): + files = {'file': open('/data/tests/functional_pod/json/rbac.json', 'r')} + req = requests.post("http://{}:{}/import".format( + context.get("hostname"), + context.get("port")) + , files=files) + print(req) + result = req.json() + print(result) + req.raise_for_status() + +def test_import_mls(context): + files = {'file': open('/data/tests/functional_pod/json/mls.json', 'r')} + req = requests.post("http://{}:{}/import".format( + context.get("hostname"), + context.get("port")) + , files=files) + req.raise_for_status() + + +def test_export_rbac(context): + test_import_rbac(context) + req = requests.get("http://{}:{}/export".format( + context.get("hostname"), + context.get("port")), + data={"filename":"/data/tests/functional_pod/json/rbac_export.json"} + ) + req.raise_for_status() + + +def test_export_mls(context): + test_import_mls(context) + req = requests.get("http://{}:{}/export".format( + context.get("hostname"), + context.get("port")), + data={"filename":"/data/tests/functional_pod/json/mls_export.json"} + ) + req.raise_for_status() + def get_json(data): return json.loads(data.decode("utf-8")) diff --git a/moon_manager/tests/functional_pod/test_models.py b/moon_manager/tests/functional_pod/test_models.py index dcda9f32..8b4ceef5 100644 --- a/moon_manager/tests/functional_pod/test_models.py +++ b/moon_manager/tests/functional_pod/test_models.py @@ -32,9 +32,10 @@ def delete_models(context, name): request = None for key, value in models['models'].items(): if value['name'] == name: - request = requests.delete("http://{}:{}/models/{}".format(key, + request = requests.delete("http://{}:{}/models/{}".format( context.get("hostname"), - context.get("port")), + context.get("port"), + key), timeout=3) break return request diff --git a/moon_manager/tests/unit_python/api/import_export_utilities.py b/moon_manager/tests/unit_python/api/import_export_utilities.py new file mode 100644 index 00000000..12cb208e --- /dev/null +++ b/moon_manager/tests/unit_python/api/import_export_utilities.py @@ -0,0 +1,196 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import api.test_unit_models as test_models +import api.test_policies as test_policies +import api.test_perimeter as test_perimeter +import api.test_meta_data as test_categories +import api.test_data as test_data +import api.test_meta_rules as test_meta_rules +import api.test_assignemnt as test_assignments +import api.test_rules as test_rules +import logging + +logger = logging.getLogger("moon.manager.test.api." + __name__) + + +def clean_models(client): + req, models = test_models.get_models(client) + for key in models["models"]: + client.delete("/models/{}".format(key)) + + +def clean_policies(client): + req, policies = test_policies.get_policies(client) + for key in policies["policies"]: + req = client.delete("/policies/{}".format(key)) + assert req.status_code == 200 + + +def clean_subjects(client): + subjects = test_perimeter.get_subjects(client) + logger.info("subjects {}".format(subjects)) + for key in subjects[1]["subjects"]: + subject = subjects[1]["subjects"][key] + policy_keys = subject["policy_list"] + logger.info("subjects policy_keys {}".format(policy_keys)) + for policy_key in policy_keys: + client.delete("/policies/{}/subjects/{}".format(policy_key, key)) + client.delete("/subjects/{}".format(key)) + + +def clean_objects(client): + objects = test_perimeter.get_objects(client) + logger.info("objects {}".format(objects)) + for key in objects[1]["objects"]: + object_ = objects[1]["objects"][key] + policy_keys = object_["policy_list"] + logger.info("objects policy_keys {}".format(policy_keys)) + for policy_key in policy_keys: + client.delete("/policies/{}/objects/{}".format(policy_key, key)) + client.delete("/objects/{}".format(key)) + + +def clean_actions(client): + actions = test_perimeter.get_actions(client) + logger.info("actions {}".format(actions)) + for key in actions[1]["actions"]: + action = actions[1]["actions"][key] + policy_keys = action["policy_list"] + logger.info("action policy_keys {}".format(policy_keys)) + for policy_key in policy_keys: + client.delete("/policies/{}/actions/{}".format(policy_key, key)) + client.delete("/actions/{}".format(key)) + + +def clean_subject_categories(client): + req, categories = test_categories.get_subject_categories(client) + logger.info(categories) + for key in categories["subject_categories"]: + client.delete("/subject_categories/{}".format(key)) + + +def clean_object_categories(client): + req, categories = test_categories.get_object_categories(client) + logger.info(categories) + for key in categories["object_categories"]: + client.delete("/object_categories/{}".format(key)) + + +def clean_action_categories(client): + req, categories = test_categories.get_action_categories(client) + logger.info(categories) + for key in categories["action_categories"]: + client.delete("/action_categories/{}".format(key)) + + +def clean_subject_data(client): + req, policies = test_policies.get_policies(client) + logger.info("clean_subject_data on {}".format(policies)) + for policy_key in policies["policies"]: + req, data = test_data.get_subject_data(client, policy_id=policy_key) + logger.info("============= data {}".format(data)) + for key in data["subject_data"]: + logger.info("============= Deleting {}/{}".format(policy_key, key)) + client.delete("/policies/{}/subject_data/{}".format(policy_key, key)) + + +def clean_object_data(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, data = test_data.get_object_data(client, policy_id=policy_key) + for key in data["object_data"]: + client.delete("/policies/{}/object_data/{}".format(policy_key, key)) + + +def clean_action_data(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, data = test_data.get_action_data(client, policy_id=policy_key) + for key in data["action_data"]: + client.delete("/policies/{}/action_data/{}".format(policy_key, key)) + + +def clean_meta_rule(client): + req, meta_rules = test_meta_rules.get_meta_rules(client) + meta_rules = meta_rules["meta_rules"] + for meta_rule_key in meta_rules: + logger.info("clean_meta_rule.meta_rule_key={}".format(meta_rule_key)) + logger.info("clean_meta_rule.meta_rule={}".format(meta_rules[meta_rule_key])) + client.delete("/meta_rules/{}".format(meta_rule_key)) + + +def clean_subject_assignments(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, assignments = test_assignments.get_subject_assignment(client, policy_key) + for key in assignments["subject_assignments"]: + subject_key = assignments["subject_assignments"][key]["subject_id"] + cat_key = assignments["subject_assignments"][key]["category_id"] + data_keys = assignments["subject_assignments"][key]["assignments"] + for data_key in data_keys: + client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_key, subject_key, + cat_key, data_key)) + + +def clean_object_assignments(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, assignments = test_assignments.get_object_assignment(client, policy_key) + for key in assignments["object_assignments"]: + object_key = assignments["object_assignments"][key]["object_id"] + cat_key = assignments["object_assignments"][key]["category_id"] + data_keys = assignments["object_assignments"][key]["assignments"] + for data_key in data_keys: + client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_key, object_key, + cat_key, data_key)) + + +def clean_action_assignments(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, assignments = test_assignments.get_action_assignment(client, policy_key) + for key in assignments["action_assignments"]: + action_key = assignments["action_assignments"][key]["action_id"] + cat_key = assignments["action_assignments"][key]["category_id"] + data_keys = assignments["action_assignments"][key]["assignments"] + for data_key in data_keys: + client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_key, action_key, + cat_key, data_key)) + + +def clean_rules(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, rules = test_rules.get_rules(client, policy_key) + rules = rules["rules"] + rules = rules["rules"] + for rule_key in rules: + client.delete("/policies/{}/rules/{}".format(policy_key, rule_key)) + + +def clean_all(client): + clean_rules(client) + + clean_subject_assignments(client) + clean_object_assignments(client) + clean_action_assignments(client) + + clean_meta_rule(client) + + clean_subject_data(client) + clean_object_data(client) + clean_action_data(client) + + clean_actions(client) + clean_objects(client) + clean_subjects(client) + + clean_subject_categories(client) + clean_object_categories(client) + clean_action_categories(client) + + clean_policies(client) + clean_models(client) diff --git a/moon_manager/tests/unit_python/api/meta_data_test.py b/moon_manager/tests/unit_python/api/meta_data_test.py index 8fb39ae1..8609f0b5 100644 --- a/moon_manager/tests/unit_python/api/meta_data_test.py +++ b/moon_manager/tests/unit_python/api/meta_data_test.py @@ -54,6 +54,20 @@ def test_add_subject_categories(): assert value['description'] == "description of {}".format("testuser") +def test_add_subject_categories_with_empty_user(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_subject_categories_with_user_contain_space(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + def test_delete_subject_categories(): client = utilities.register_client() req = delete_subject_categories(client, "testuser") @@ -119,6 +133,20 @@ def test_add_object_categories(): assert value['description'] == "description of {}".format("testuser") +def test_add_object_categories_with_empty_user(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_object_categories_with_user_contain_space(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + def test_delete_object_categories(): client = utilities.register_client() req = delete_object_categories(client, "testuser") @@ -184,6 +212,20 @@ def test_add_action_categories(): assert value['description'] == "description of {}".format("testuser") +def test_add_action_categories_with_empty_user(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_action_categories_with_user_contain_space(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + def test_delete_action_categories(): client = utilities.register_client() req = delete_action_categories(client, "testuser") @@ -193,4 +235,4 @@ def test_delete_action_categories(): def test_delete_action_categories_without_id(): client = utilities.register_client() req = delete_action_categories_without_id(client) - assert req.status_code == 500
\ No newline at end of file + assert req.status_code == 500 diff --git a/moon_manager/tests/unit_python/api/meta_rules_test.py b/moon_manager/tests/unit_python/api/meta_rules_test.py index b5b1ecf8..a87c16f3 100644 --- a/moon_manager/tests/unit_python/api/meta_rules_test.py +++ b/moon_manager/tests/unit_python/api/meta_rules_test.py @@ -22,6 +22,46 @@ def add_meta_rules(client, name): return req, meta_rules +def add_meta_rules_without_subject_category_ids(client, name): + data = { + "name": name, + "subject_categories": [], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules(client, name, metaRuleId): + data = { + "name": name, + "subject_categories": ["subject_category_id1_update", + "subject_category_id2_update"], + "object_categories": ["object_category_id1_update"], + "action_categories": ["action_category_id1_update"] + } + req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules_without_subject_category_ids(client, name): + data = { + "name": name, + "subject_categories": [], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + def delete_meta_rules(client, name): request, meta_rules = get_meta_rules(client) for key, value in meta_rules['meta_rules'].items(): @@ -57,6 +97,27 @@ def test_add_meta_rules(): assert value["action_categories"][0] == "action_category_id1" +def test_add_meta_rules_with_empty_user(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_meta_rules_with_user_contain_space(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + +def test_add_meta_rules_without_subject_categories(): + client = utilities.register_client() + req, meta_rules = add_meta_rules_without_subject_category_ids(client, "testuser") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == 'Empty Container' + + def test_delete_meta_rules(): client = utilities.register_client() req = delete_meta_rules(client, "testuser") @@ -67,3 +128,35 @@ def test_delete_meta_rules_without_id(): client = utilities.register_client() req = delete_meta_rules_without_id(client) assert req.status_code == 500 + + +def test_update_meta_rules(): + client = utilities.register_client() + req = add_meta_rules(client, "testuser") + meta_rule_id = list(req[1]['meta_rules'])[0] + req_update = update_meta_rules(client, "testuser", meta_rule_id) + assert req_update[0].status_code == 200 + value = list(req_update[1]["meta_rules"].values())[0] + assert value["subject_categories"][0] == "subject_category_id1_update" + delete_meta_rules(client, "testuser") + get_meta_rules(client) + + +def test_update_meta_rules_without_id(): + client = utilities.register_client() + req_update = update_meta_rules(client, "testuser", "") + assert req_update[0].status_code == 500 + + +def test_update_meta_rules_without_user(): + client = utilities.register_client() + req_update = update_meta_rules(client, "", "") + assert req_update[0].status_code == 500 + assert json.loads(req_update[0].data)["message"] == "Empty String" + + +def test_update_meta_rules_without_subject_categories(): + client = utilities.register_client() + req_update = update_meta_rules_without_subject_category_ids(client, "testuser") + assert req_update[0].status_code == 500 + assert json.loads(req_update[0].data)["message"] == "Empty Container" diff --git a/moon_manager/tests/unit_python/api/test_assignemnt.py b/moon_manager/tests/unit_python/api/test_assignemnt.py index 08688e04..22c727af 100644 --- a/moon_manager/tests/unit_python/api/test_assignemnt.py +++ b/moon_manager/tests/unit_python/api/test_assignemnt.py @@ -1,5 +1,7 @@ import api.utilities as utilities import json +from helpers import data_builder as builder +from uuid import uuid4 # subject_categories_test @@ -11,52 +13,84 @@ def get_subject_assignment(client, policy_id): return req, subject_assignment -def add_subject_assignment(client, policy_id, category_id): +def add_subject_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + subject_id = builder.create_subject(policy_id) + data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + data = { + "id": subject_id, + "category_id": subject_category_id, + "data_id": data_id + } + req = client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment_without_cat_id(client): + data = { - "id": "id1", - "category_id": category_id, - "data_id": "data_id1" + "id": "subject_id", + "category_id": "", + "data_id": "data_id" } - req = client.post("/policies/{}/subject_assignments/{}".format(policy_id, category_id), data=json.dumps(data), + req = client.post("/policies/{}/subject_assignments".format("1111"), data=json.dumps(data), headers={'Content-Type': 'application/json'}) subject_assignment = utilities.get_json(req.data) return req, subject_assignment -def delete_subject_assignment(client, policy_id): - req = client.delete("/policies/{}/subject_assignments".format(policy_id)) +def delete_subject_assignment(client, policy_id, sub_id, cat_id,data_id): + req = client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_id, sub_id, cat_id,data_id)) return req -def test_get_subject_assignment(): - policy_id = utilities.get_policy_id() +def test_add_subject_assignment(): client = utilities.register_client() - req, subject_assignment = get_subject_assignment(client, policy_id) + req, subject_assignment = add_subject_assignment(client) assert req.status_code == 200 assert isinstance(subject_assignment, dict) assert "subject_assignments" in subject_assignment -def test_add_subject_assignment(): - policy_id = utilities.get_policy_id() +def test_add_subject_assignment_without_cat_id(): client = utilities.register_client() - req, subject_assignment = add_subject_assignment(client, policy_id, "111") + req, subject_assignment = add_subject_assignment_without_cat_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_get_subject_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(client, policy_id) assert req.status_code == 200 assert isinstance(subject_assignment, dict) - value = subject_assignment["subject_assignments"] assert "subject_assignments" in subject_assignment - id = list(value.keys())[0] - assert value[id]['policy_id'] == policy_id - assert value[id]['category_id'] == "111" - assert value[id]['subject_id'] == "id1" def test_delete_subject_assignment(): client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_subject_assignment(client, policy_id) + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(client, policy_id) + value = subject_assignment["subject_assignments"] + id = list(value.keys())[0] + success_req = delete_subject_assignment(client, policy_id, value[id]['subject_id'], value[id]['category_id'],value[id]['assignments'][0]) assert success_req.status_code == 200 + +def test_delete_subject_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_subject_assignment(client, "", "id1", "111" ,"data_id1") + assert success_req.status_code == 404 + + # --------------------------------------------------------------------------- # object_categories_test @@ -68,25 +102,47 @@ def get_object_assignment(client, policy_id): return req, object_assignment -def add_object_assignment(client, policy_id, category_id): +def add_object_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + object_id = builder.create_object(policy_id) + data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id) + + data = { + "id": object_id, + "category_id": object_category_id, + "data_id": data_id + } + + req = client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment_without_cat_id(client): + data = { - "id": "id1", - "category_id": category_id, - "data_id": "data_id1" + "id": "object_id", + "category_id": "", + "data_id": "data_id" } - req = client.post("/policies/{}/object_assignments/{}".format(policy_id, category_id), data=json.dumps(data), + req = client.post("/policies/{}/object_assignments".format("1111"), data=json.dumps(data), headers={'Content-Type': 'application/json'}) object_assignment = utilities.get_json(req.data) return req, object_assignment -def delete_object_assignment(client, policy_id): - req = client.delete("/policies/{}/object_assignments".format(policy_id)) +def delete_object_assignment(client, policy_id, obj_id, cat_id, data_id): + req = client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_id, obj_id, cat_id, data_id)) return req def test_get_object_assignment(): - policy_id = utilities.get_policy_id() + policy_id = builder.get_policy_id_with_object_assignment() client = utilities.register_client() req, object_assignment = get_object_assignment(client, policy_id) assert req.status_code == 200 @@ -95,25 +151,35 @@ def test_get_object_assignment(): def test_add_object_assignment(): - policy_id = utilities.get_policy_id() client = utilities.register_client() - req, object_assignment = add_object_assignment(client, policy_id, "111") + req, object_assignment = add_object_assignment(client) assert req.status_code == 200 - assert isinstance(object_assignment, dict) - value = object_assignment["object_assignments"] assert "object_assignments" in object_assignment - id = list(value.keys())[0] - assert value[id]['policy_id'] == policy_id - assert value[id]['category_id'] == "111" - assert value[id]['object_id'] == "id1" + + +def test_add_object_assignment_without_cat_id(): + client = utilities.register_client() + req, object_assignment = add_object_assignment_without_cat_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" def test_delete_object_assignment(): client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_object_assignment(client, policy_id) + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(client, policy_id) + value = object_assignment["object_assignments"] + id = list(value.keys())[0] + success_req = delete_object_assignment(client, policy_id, value[id]['object_id'], value[id]['category_id'],value[id]['assignments'][0]) assert success_req.status_code == 200 + +def test_delete_object_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_object_assignment(client, "", "id1", "111","data_id1") + assert success_req.status_code == 404 + + # --------------------------------------------------------------------------- # action_categories_test @@ -125,25 +191,46 @@ def get_action_assignment(client, policy_id): return req, action_assignment -def add_action_assignment(client, policy_id, category_id): +def add_action_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + action_id = builder.create_action(policy_id) + data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id) + + data = { + "id": action_id, + "category_id": action_category_id, + "data_id": data_id + } + req = client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment_without_cat_id(client): + data = { - "id": "id1", - "category_id": category_id, - "data_id": "data_id1" + "id": "action_id", + "category_id": "", + "data_id": "data_id" } - req = client.post("/policies/{}/action_assignments/{}".format(policy_id, category_id), data=json.dumps(data), + req = client.post("/policies/{}/action_assignments".format("1111"), data=json.dumps(data), headers={'Content-Type': 'application/json'}) action_assignment = utilities.get_json(req.data) return req, action_assignment -def delete_action_assignment(client, policy_id): - req = client.delete("/policies/{}/action_assignments".format(policy_id)) +def delete_action_assignment(client, policy_id, action_id, cat_id, data_id): + req = client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_id, action_id, cat_id, data_id)) return req def test_get_action_assignment(): - policy_id = utilities.get_policy_id() + policy_id = builder.get_policy_id_with_action_assignment() client = utilities.register_client() req, action_assignment = get_action_assignment(client, policy_id) assert req.status_code == 200 @@ -152,23 +239,32 @@ def test_get_action_assignment(): def test_add_action_assignment(): - policy_id = utilities.get_policy_id() client = utilities.register_client() - req, action_assignment = add_action_assignment(client, policy_id, "111") + req, action_assignment = add_action_assignment(client) assert req.status_code == 200 - assert isinstance(action_assignment, dict) - value = action_assignment["action_assignments"] assert "action_assignments" in action_assignment - id = list(value.keys())[0] - assert value[id]['policy_id'] == policy_id - assert value[id]['category_id'] == "111" - assert value[id]['action_id'] == "id1" + + +def test_add_action_assignment_without_cat_id(): + client = utilities.register_client() + req, action_assignment = add_action_assignment_without_cat_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" def test_delete_action_assignment(): client = utilities.register_client() - policy_id = utilities.get_policy_id() - success_req = delete_action_assignment(client, policy_id) + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(client, policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_action_assignment(client, policy_id, value[id]['action_id'], value[id]['category_id'],value[id]['assignments'][0]) assert success_req.status_code == 200 -# ---------------------------------------------------------------------------
\ No newline at end of file + +def test_delete_action_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_action_assignment(client, "", "id1", "111" ,"data_id1") + assert success_req.status_code == 404 + +# --------------------------------------------------------------------------- diff --git a/moon_manager/tests/unit_python/api/test_data.py b/moon_manager/tests/unit_python/api/test_data.py index 87a80c69..ff0856af 100644 --- a/moon_manager/tests/unit_python/api/test_data.py +++ b/moon_manager/tests/unit_python/api/test_data.py @@ -1,22 +1,36 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + import api.utilities as utilities import json - +from helpers import data_builder as builder +from uuid import uuid4 # subject_categories_test -def get_subject_data(client, policy_id): - req = client.get("/policies/{}/subject_data".format(policy_id)) +def get_subject_data(client, policy_id, category_id=None): + if category_id is None: + req = client.get("/policies/{}/subject_data".format(policy_id)) + else: + req = client.get("/policies/{}/subject_data/{}".format(policy_id, category_id)) subject_data = utilities.get_json(req.data) return req, subject_data -def add_subject_data(client, name, policy_id, category_id): +def add_subject_data(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) data = { "name": name, "description": "description of {}".format(name) } - req = client.post("/policies/{}/subject_data/{}".format(policy_id, category_id), data=json.dumps(data), + req = client.post("/policies/{}/subject_data/{}".format(policy_id, subject_category_id), data=json.dumps(data), headers={'Content-Type': 'application/json'}) subject_data = utilities.get_json(req.data) return req, subject_data @@ -37,9 +51,8 @@ def test_get_subject_data(): def test_add_subject_data(): - policy_id = utilities.get_policy_id() client = utilities.register_client() - req, subject_data = add_subject_data(client, "testuser", policy_id, "111") + req, subject_data = add_subject_data(client, "testuser") assert req.status_code == 200 assert isinstance(subject_data, dict) value = subject_data["subject_data"]['data'] @@ -51,27 +64,55 @@ def test_add_subject_data(): def test_delete_subject_data(): client = utilities.register_client() - policy_id = utilities.get_policy_id() + subject_category_id, object_category_id, action_category_id, meta_rule_id,policy_id = builder.create_new_policy() success_req = delete_subject_data(client, policy_id) assert success_req.status_code == 200 + +def test_add_subject_data_with_empty_user(): + client = utilities.register_client() + req, subject_data = add_subject_data(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_subject_data_with_user_contain_space(): + client = utilities.register_client() + req, subject_data = add_subject_data(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_delete_subject_data_without_policy_id(): + client = utilities.register_client() + success_req = delete_subject_data(client, "") + assert success_req.status_code == 404 + # --------------------------------------------------------------------------- # object_categories_test -def get_object_data(client, policy_id): - req = client.get("/policies/{}/object_data".format(policy_id)) +def get_object_data(client, policy_id, category_id=None): + if category_id is None: + req = client.get("/policies/{}/object_data".format(policy_id)) + else: + req = client.get("/policies/{}/object_data/{}".format(policy_id, category_id)) object_data = utilities.get_json(req.data) return req, object_data -def add_object_data(client, name, policy_id, category_id): +def add_object_data(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) data = { "name": name, "description": "description of {}".format(name) } - req = client.post("/policies/{}/object_data/{}".format(policy_id, category_id), data=json.dumps(data), + req = client.post("/policies/{}/object_data/{}".format(policy_id, object_category_id), data=json.dumps(data), headers={'Content-Type': 'application/json'}) object_data = utilities.get_json(req.data) return req, object_data @@ -92,16 +133,19 @@ def test_get_object_data(): def test_add_object_data(): - policy_id = utilities.get_policy_id() client = utilities.register_client() - req, object_data = add_object_data(client, "testuser", policy_id, "111") + req, object_data = add_object_data(client, "testuser") assert req.status_code == 200 assert isinstance(object_data, dict) value = object_data["object_data"]['data'] assert "object_data" in object_data id = list(value.keys())[0] - assert value[id]['value']['name'] == "testuser" - assert value[id]['value']['description'] == "description of {}".format("testuser") + print("-----------------------") + print(id) + print(value[id]) + print("-----------------------") + assert value[id]['name'] == "testuser" + assert value[id]['description'] == "description of {}".format("testuser") def test_delete_object_data(): @@ -110,23 +154,50 @@ def test_delete_object_data(): success_req = delete_object_data(client, policy_id) assert success_req.status_code == 200 + +def test_add_object_data_with_empty_user(): + client = utilities.register_client() + req, subject_data = add_object_data(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_object_data_with_user_contain_space(): + client = utilities.register_client() + req, object_data = add_object_data(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_delete_object_data_without_policy_id(): + client = utilities.register_client() + success_req = delete_object_data(client, "") + assert success_req.status_code == 404 # --------------------------------------------------------------------------- # action_categories_test -def get_action_data(client, policy_id): - req = client.get("/policies/{}/action_data".format(policy_id)) +def get_action_data(client, policy_id, category_id=None): + if category_id is None: + req = client.get("/policies/{}/action_data".format(policy_id)) + else: + req = client.get("/policies/{}/action_data/{}".format(policy_id, category_id)) action_data = utilities.get_json(req.data) return req, action_data -def add_action_data(client, name, policy_id, category_id): +def add_action_data(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) data = { "name": name, "description": "description of {}".format(name) } - req = client.post("/policies/{}/action_data/{}".format(policy_id, category_id), data=json.dumps(data), + req = client.post("/policies/{}/action_data/{}".format(policy_id, action_category_id), data=json.dumps(data), headers={'Content-Type': 'application/json'}) action_data = utilities.get_json(req.data) return req, action_data @@ -147,16 +218,15 @@ def test_get_action_data(): def test_add_action_data(): - policy_id = utilities.get_policy_id() client = utilities.register_client() - req, action_data = add_action_data(client, "testuser", policy_id, "111") + req, action_data = add_action_data(client, "testuser") assert req.status_code == 200 assert isinstance(action_data, dict) value = action_data["action_data"]['data'] assert "action_data" in action_data id = list(value.keys())[0] - assert value[id]['value']['name'] == "testuser" - assert value[id]['value']['description'] == "description of {}".format("testuser") + assert value[id]['name'] == "testuser" + assert value[id]['description'] == "description of {}".format("testuser") def test_delete_action_data(): @@ -165,4 +235,23 @@ def test_delete_action_data(): success_req = delete_action_data(client, policy_id) assert success_req.status_code == 200 -# ---------------------------------------------------------------------------
\ No newline at end of file + +def test_add_action_data_with_empty_user(): + client = utilities.register_client() + req, action_data = add_action_data(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_action_data_with_user_contain_space(): + client = utilities.register_client() + req, action_data = add_action_data(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_delete_action_data_without_policy_id(): + client = utilities.register_client() + success_req = delete_action_data(client, "") + assert success_req.status_code == 404 +# --------------------------------------------------------------------------- diff --git a/moon_manager/tests/unit_python/api/test_export.py b/moon_manager/tests/unit_python/api/test_export.py new file mode 100644 index 00000000..ac8e8d17 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_export.py @@ -0,0 +1,282 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import json +import api.utilities as utilities +import api.import_export_utilities as import_export_utilities + + +MODEL_WITHOUT_META_RULES = {"models": [{"name": "test model", "description": "model description", "meta_rules": []}]} + +POLICIES = {"models": [{"name": "test model", "description": "", "meta_rules": []}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}]} + +SUBJECTS_OBJECTS_ACTIONS = {"models": [{"name": "test model", "description": "", "meta_rules": []}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}]} + + +SUBJECT_OBJECT_ACTION_CATEGORIES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}]} + +SUBJECT_OBJECT_ACTION_DATA = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}], + "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]} + + +META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "object action description"}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]} + + +ASSIGNMENTS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}], + "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object e0", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action e0", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}], + "object_assignments": [{"object": {"name": "test object e0"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}], + "action_assignments": [{"action": {"name": "test action e0"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}]} + +RULES = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}], + "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object e1", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action e1", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}], + "object_assignments": [{"object": {"name": "test object e1"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}], + "action_assignments": [{"action": {"name": "test action e1"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}], + "rules": [{"meta_rule": {"name": "meta rule"}, "rule": {"subject_data": [{"name": "test subject data"}], "object_data": [{"name": "test object data"}], "action_data": [{"name": "test action data"}]}, "policy": {"name":"test policy"}, "instructions": {"decision": "grant"}, "enabled": True}] + } + + +def test_export_models(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(MODEL_WITHOUT_META_RULES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + + assert "content" in data + assert "models" in data["content"] + assert isinstance(data["content"]["models"], list) + assert len(data["content"]["models"]) == 1 + model = data["content"]["models"][0] + assert model["name"] == "test model" + assert model["description"] == "model description" + assert isinstance(model["meta_rules"], list) + assert len(model["meta_rules"]) == 0 + + +def test_export_policies(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(POLICIES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + + assert "content" in data + assert "policies" in data["content"] + assert isinstance(data["content"]["policies"], list) + assert len(data["content"]["policies"]) == 1 + policy = data["content"]["policies"][0] + assert policy["name"] == "test policy" + assert policy["genre"] == "authz" + assert policy["description"] == "policy description" + assert "model" in policy + assert "name" in policy["model"] + model = policy["model"] + assert model["name"] == "test model" + + +def test_export_subject_object_action(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECTS_OBJECTS_ACTIONS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "s" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + element = data["content"][key][0] + if type_element == "subject": + assert element["name"] == "testuser" + else: + assert element["name"] == "test "+ type_element + assert element["description"] == "description of the " + type_element + assert "policies" in element + assert isinstance(element["policies"], list) + assert len(element["policies"]) == 1 + assert isinstance(element["policies"][0], dict) + assert element["policies"][0]["name"] == "test policy" + assert isinstance(element["extra"], dict) + key_dict = "field_extra_" + type_element + value_dict = "value extra " + type_element + assert key_dict in element["extra"] + assert element["extra"][key_dict] == value_dict + + +def test_export_subject_object_action_categories(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECT_OBJECT_ACTION_CATEGORIES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_categories" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + category = data["content"][key][0] + assert category["name"] == "test " + type_element + " categories" + assert category["description"] == type_element + " category description" + + +def test_export_subject_object_action_data(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECT_OBJECT_ACTION_DATA)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_data" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + data_elt = data["content"][key][0] + assert data_elt["name"] == "test " + type_element + " data" + assert data_elt["description"] == type_element + " data description" + assert isinstance(data_elt["policy"], dict) + assert data_elt["policy"]["name"] == "test policy" + assert isinstance(data_elt["category"], dict) + assert data_elt["category"]["name"] == "test " + type_element + " categories" + + +def test_export_assignments(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(ASSIGNMENTS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_assignments" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + assignment_elt = data["content"][key][0] + assert type_element in assignment_elt + assert isinstance(assignment_elt[type_element], dict) + if type_element == "subject": + assert assignment_elt[type_element]["name"] == "testuser" + else: + assert assignment_elt[type_element]["name"] == "test " + type_element + " e0" + assert "category" in assignment_elt + assert isinstance(assignment_elt["category"], dict) + assert assignment_elt["category"]["name"] == "test " + type_element + " categories" + assert "assignments" in assignment_elt + assert isinstance(assignment_elt["assignments"], list) + assert len(assignment_elt["assignments"]) == 1 + assert assignment_elt["assignments"][0]["name"] == "test " + type_element + " data" + + import_export_utilities.clean_all(client) + + +def test_export_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(RULES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + assert "rules" in data["content"] + assert isinstance(data["content"]["rules"], list) + assert len(data["content"]["rules"]) == 1 + rule = data["content"]["rules"][0] + assert "instructions" in rule + assert "decision" in rule["instructions"] + assert rule["instructions"]["decision"] == "grant" + assert "enabled" in rule + assert rule["enabled"] + assert "meta_rule" in rule + assert rule["meta_rule"]["name"] == "meta rule" + assert "policy" in rule + assert rule["policy"]["name"] == "test policy" + assert "rule" in rule + rule = rule["rule"] + assert "subject_data" in rule + assert isinstance(rule["subject_data"], list) + assert len(rule["subject_data"]) == 1 + assert rule["subject_data"][0]["name"] == "test subject data" + assert "object_data" in rule + assert isinstance(rule["object_data"], list) + assert len(rule["object_data"]) == 1 + assert rule["object_data"][0]["name"] == "test object data" + assert "action_data" in rule + assert isinstance(rule["action_data"], list) + assert len(rule["action_data"]) == 1 + assert rule["action_data"][0]["name"] == "test action data" diff --git a/moon_manager/tests/unit_python/api/test_import.py b/moon_manager/tests/unit_python/api/test_import.py new file mode 100644 index 00000000..f1ab8251 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_import.py @@ -0,0 +1,498 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import api.utilities as utilities +import api.test_unit_models as test_models +import api.test_policies as test_policies +import api.test_meta_data as test_categories +import api.test_data as test_data +import api.test_meta_rules as test_meta_rules +import api.test_assignemnt as test_assignments +import api.test_rules as test_rules +import api.import_export_utilities as import_export_utilities + +import json + + +MODEL_WITHOUT_META_RULES = [ + {"models": [{"name": "test model", "description": "", "meta_rules": []}]}, + {"models": [{"name": "test model", "description": "new description", "meta_rules": [], "override": True}]}, + {"models": [{"name": "test model", "description": "description not taken into account", "meta_rules": [], "override": False}]} + ] + +POLICIES = [ + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "new description not taken into account", "model": {"name" : "test model"}, "mandatory": True}]}, + {"policies": [{"name": "test policy", "genre": "not authz ?", "description": "generates an exception", "model": {"name" : "test model"}, "override": True}]}, + {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [{"name": "test policy", "genre": "not authz ?", "description": "changes taken into account", "model": {"name" : "test model"}, "override": True}]}, +] + +SUBJECTS = [{"subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]}, + {"subjects": [{"name": "testuser", "description": "new description of the subject", "extra": {"email": "new-email@test.com"}, "policies": [{"name": "test other policy"}]}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": [{"name": "test policy"}]}]}] + + +OBJECTS = [ + {"objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]}, + {"objects": [{"name": "test object", "description": "new description of the object", "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}]}, +] + + +ACTIONS = [{"actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]}, + {"actions": [{"name": "test action", "description": "new description of the action", "extra": {"test": "test extra"}, "policies": [{"name": "test other policy"}]}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": [{"name": "test policy"}]}]}] + + +SUBJECT_CATEGORIES = [{"subject_categories": [{"name": "test subject categories", "description": "subject category description"}]}, + {"subject_categories": [{"name": "test subject categories", "description": "new subject category description"}]}] + + +OBJECT_CATEGORIES = [{"object_categories": [{"name": "test object categories", "description": "object category description"}]}, + {"object_categories": [{"name": "test object categories", "description": "new object category description"}]}] + + +ACTION_CATEGORIES = [{"action_categories": [{"name": "test action categories", "description": "action category description"}]}, + {"action_categories": [{"name": "test action categories", "description": "new action category description"}]}] + +# meta_rules import is needed otherwise the search for data do not work !!! +PRE_DATA = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}, {"name": "other good meta rule"}]}], + "policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}, {"name": "other test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}, {"name": "other test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}, {"name": "other test action categories", "description": "action category description"}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}, + {"name": "other good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "other test subject categories"}], "object_categories": [{"name": "other test object categories"}], "action_categories": [{"name": "other test action categories"}]}]} + +SUBJECT_DATA = [{"subject_data": [{"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]}, + {"subject_data": [{"name": "not valid subject data", "description": "", "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "subject_data": [{"name": "one valid subject data", "description": "description", "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test subject categories"}}]}] + +OBJECT_DATA = [{"object_data": [{"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]}, + {"object_data": [{"name": "not valid object data", "description": "", "policies": [{}], "category": {"name": "test object categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "object_data": [{"name": "one valid object data", "description": "description", "policies": [{}], "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test object categories"}}]}] + + +ACTION_DATA = [{"action_data": [{"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]}, + {"action_data": [{"name": "not valid action data", "description": "", "policies": [{}], "category": {"name": "test action categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "action_data": [{"name": "one valid action data", "description": "description", "policies": [{}], "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test action categories"}}]}] + + +PRE_META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "object action description"}]} + +META_RULES = [{"meta_rules" :[{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "not valid category"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "not valid category"}], "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "not valid category"}]}]}, + {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}] + + +PRE_ASSIGNMENTS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name" : "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "object action description"}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": [{"name": "test policy"}]}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}], + "subject_data": [{"name": "subject data", "description": "test subject data", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "object data", "description": "test object data", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "action data", "description": "test action data", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]} + + +SUBJECT_ASSIGNMENTS = [{"subject_assignments": [{"subject": {"name": "unknonw"}, "category" : {"name": "test subject categories"}, "assignments": [{"name": "subject data"}]}]}, + {"subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "unknown"}, "assignments": [{"name": "subject data"}]}]}, + {"subject_assignments": [{"subject": {"name": "testuser"}, "category" : {"name": "test subject categories"}, "assignments": [{"name": "unknwon"}]}]}, + {"subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "subject data"}]}]}] + +OBJECT_ASSIGNMENTS = [{"object_assignments": [{"object": {"name": "unknown"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "object data"}]}]}, + {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "unknown"}, "assignments": [{"name": "object data"}]}]}, + {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "unknown"}]}]}, + {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "object data"}]}]}] + +ACTION_ASSIGNMENTS = [{"action_assignments": [{"action": {"name": "unknown"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "action data"}]}]}, + {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "unknown"}, "assignments": [{"name": "action data"}]}]}, + {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "unknown"}]}]}, + {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "action data"}]}]}] + +RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "unknown subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "unknown object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "unknown action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}] + + +def test_import_models_without_new_meta_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + counter = 0 + for models_description in MODEL_WITHOUT_META_RULES: + req = client.post("/import", content_type='application/json', data=json.dumps(models_description)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + req, models = test_models.get_models(client) + models = models["models"] + assert len(list(models.keys())) == 1 + values = list(models.values()) + assert values[0]["name"] == "test model" + if counter == 0: + assert len(values[0]["description"]) == 0 + if counter == 1 or counter == 2: + assert values[0]["description"] == "new description" + counter = counter + 1 + import_export_utilities.clean_all(client) + + +def test_import_policies(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + counter = -1 + for policy_description in POLICIES: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(policy_description)) + try: + data = utilities.get_json(req.data) + assert data == "Import ok !" + except Exception: + assert counter == 2 # this is an expected failure + continue + + req, policies = test_policies.get_policies(client) + policies = policies["policies"] + assert len(list(policies.keys())) == 1 + values = list(policies.values()) + assert values[0]["name"] == "test policy" + if counter < 3: + assert values[0]["genre"] == "authz" + assert values[0]["description"] == "description" + else: + assert values[0]["genre"] == "not authz ?" + assert values[0]["description"] == "changes taken into account" + assert len(values[0]["model_id"]) > 0 + import_export_utilities.clean_all(client) + + +def test_import_subject_object_action(): + client = utilities.register_client() + type_elements = ["object", "action"] + + for type_element in type_elements: + import_export_utilities.clean_all(client) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECTS + clean_method = import_export_utilities.clean_subjects + name = "testuser" + key_extra = "email" + value_extra = "new-email@test.com" + elif type_element == "object": + elements = OBJECTS + clean_method = import_export_utilities.clean_objects + name = "test object" + key_extra = "test" + value_extra = "test extra" + else: + elements = ACTIONS + clean_method = import_export_utilities.clean_actions + name = "test action" + key_extra = "test" + value_extra = "test extra" + + for element in elements: + counter = counter + 1 + if counter == 2 or counter == 4: + clean_method(client) + + req = client.post("/import", content_type='application/json', data=json.dumps(element)) + if counter < 2: + assert req.status_code == 500 + continue + + try: + data = utilities.get_json(req.data) + except Exception as e: + assert False + #assert counter < 2 # Â this is an expected failure + #continue + + assert data == "Import ok !" + get_elements = utilities.get_json(client.get("/"+type_element + "s").data) + get_elements = get_elements[type_element + "s"] + + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == name + if counter == 2 or counter == 4: + assert values[0]["description"] == "description of the " + type_element + #assert not values[0]["extra"] + if counter == 3: + assert values[0]["description"] == "new description of the " + type_element + assert values[0]["extra"][key_extra] == value_extra + + #Â assert len(values[0]["policy_list"]) == 1 + import_export_utilities.clean_all(client) + + +def test_import_subject_object_action_categories(): + client = utilities.register_client() + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_utilities.clean_all(client) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_CATEGORIES + get_method = test_categories.get_subject_categories + elif type_element == "object": + elements = OBJECT_CATEGORIES + get_method = test_categories.get_object_categories + else: + elements = ACTION_CATEGORIES + get_method = test_categories.get_action_categories + + for element in elements: + req = client.post("/import", content_type='application/json', data=json.dumps(element)) + counter = counter + 1 + data = utilities.get_json(req.data) + assert data == "Import ok !" + req, get_elements = get_method(client) + get_elements = get_elements[type_element + "_categories"] + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == "test " + type_element + " categories" + assert values[0]["description"] == type_element + " category description" + + +def test_import_meta_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + # import some categories + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_META_RULES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + counter = -1 + for meta_rule in META_RULES: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(meta_rule)) + if counter != 3: + assert req.status_code == 500 + continue + else: + data = utilities.get_json(req.data) + assert data == "Import ok !" + assert req.status_code == 200 + + req, meta_rules = test_meta_rules.get_meta_rules(client) + meta_rules = meta_rules["meta_rules"] + key = list(meta_rules.keys())[0] + assert isinstance(meta_rules,dict) + assert meta_rules[key]["name"] == "good meta rule" + assert meta_rules[key]["description"] == "valid meta rule" + assert len(meta_rules[key]["subject_categories"]) == 1 + assert len(meta_rules[key]["object_categories"]) == 1 + assert len(meta_rules[key]["action_categories"]) == 1 + + subject_category_key = meta_rules[key]["subject_categories"][0] + object_category_key = meta_rules[key]["object_categories"][0] + action_category_key = meta_rules[key]["action_categories"][0] + + req, sub_cat = test_categories.get_subject_categories(client) + sub_cat = sub_cat["subject_categories"] + assert sub_cat[subject_category_key]["name"] == "test subject categories" + + req, ob_cat = test_categories.get_object_categories(client) + ob_cat = ob_cat["object_categories"] + assert ob_cat[object_category_key]["name"] == "test object categories" + + req, ac_cat = test_categories.get_action_categories(client) + ac_cat = ac_cat["action_categories"] + assert ac_cat[action_category_key]["name"] == "test action categories" + + import_export_utilities.clean_all(client) + + +def test_import_subject_object_action_assignments(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + counter = -1 + if type_element == "subject": + datas = SUBJECT_ASSIGNMENTS + get_method = test_assignments.get_subject_assignment + elif type_element == "object": + datas = OBJECT_ASSIGNMENTS + get_method = test_assignments.get_object_assignment + else: + datas = ACTION_ASSIGNMENTS + get_method = test_assignments.get_action_assignment + + for assignments in datas: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(assignments)) + if counter != 3: + assert req.status_code == 500 + continue + else: + assert data == "Import ok !" + assert req.status_code == 200 + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, get_assignments = get_method(client, policy_key) + get_assignments = get_assignments[type_element+"_assignments"] + assert len(get_assignments) == 1 + + +def test_import_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + counter = -1 + for rule in RULES: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(rule)) + + if counter < 5: + assert req.status_code == 500 + continue + + assert req.status_code == 200 + + req, rules = test_rules.test_get_rules() + rules = rules["rules"] + rules = rules["rules"] + assert len(rules) == 1 + rules = rules[0] + assert rules["enabled"] + assert rules["instructions"]["decision"] == "grant" + + req, meta_rules = test_meta_rules.get_meta_rules(client) + assert meta_rules["meta_rules"][list(meta_rules["meta_rules"].keys())[0]]["name"] == "good meta rule" + + +def test_import_subject_object_action_data(): + client = utilities.register_client() + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_DATA)) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_DATA + get_method = test_data.get_subject_data + get_categories = test_categories.get_subject_categories + elif type_element == "object": + elements = OBJECT_DATA + get_method = test_data.get_object_data + get_categories = test_categories.get_object_categories + else: + elements = ACTION_DATA + get_method = test_data.get_action_data + get_categories = test_categories.get_action_categories + + for element in elements: + req = client.post("/import", content_type='application/json', data=json.dumps(element)) + counter = counter + 1 + if counter == 0 or counter == 1: + assert req.status_code == 500 + continue + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req, policies = test_policies.get_policies(client) + policies = policies["policies"] + req, categories = get_categories(client) + categories = categories[type_element + "_categories"] + case_tested = False + for policy_key in policies.keys(): + policy = policies[policy_key] + for category_key in categories: + req, get_elements = get_method(client, policy_id=policy_key, category_id=category_key) + if len(get_elements[type_element+"_data"]) == 0: + continue + + # do this because the backend gives an element with empty data if the policy_key, + # category_key couple does not have any data... + get_elements = get_elements[type_element+"_data"] + if len(get_elements[0]["data"]) == 0: + continue + + if policy["name"] == "test policy": + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + if counter == 2: + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "one valid " + type_element + " data" + if counter == 3: + assert len(el["data"].keys()) == 2 + el1 = el["data"][list(el["data"].keys())[0]] + el2 = el["data"][list(el["data"].keys())[1]] + if "value" in el1: + el1 = el1["value"] + el2 = el2["value"] + assert (el1["name"] == "one valid " + type_element + " data" and el2["name"] == "valid " + type_element + " data") or (el2["name"] == "one valid " + type_element + " data" and el1["name"] == "valid " + type_element + " data") + assert el1["description"] == "description" + assert el2["description"] == "description" + + case_tested = True + + if policy["name"] == "test other policy": + if counter == 4: + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "valid " + type_element + " data" + assert el["description"] == "new description" + case_tested = True + + assert case_tested is True + + +def test_clean(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + #restore the database as previously + utilities.get_policy_id() diff --git a/moon_manager/tests/unit_python/api/test_meta_data.py b/moon_manager/tests/unit_python/api/test_meta_data.py new file mode 100644 index 00000000..4cb86913 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_meta_data.py @@ -0,0 +1,235 @@ +import json +import api.utilities as utilities + +#subject_categories_test + + +def get_subject_categories(client): + req = client.get("/subject_categories") + subject_categories = utilities.get_json(req.data) + return req, subject_categories + + +def add_subject_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/subject_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_categories = utilities.get_json(req.data) + return req, subject_categories + + +def delete_subject_categories(client, name): + request, subject_categories = get_subject_categories(client) + for key, value in subject_categories['subject_categories'].items(): + if value['name'] == name: + return client.delete("/subject_categories/{}".format(key)) + + +def delete_subject_categories_without_id(client): + req = client.delete("/subject_categories/{}".format("")) + return req + + +def test_get_subject_categories(): + client = utilities.register_client() + req, subject_categories = get_subject_categories(client) + assert req.status_code == 200 + assert isinstance(subject_categories, dict) + assert "subject_categories" in subject_categories + + +def test_add_subject_categories(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(subject_categories, dict) + value = list(subject_categories["subject_categories"].values())[0] + assert "subject_categories" in subject_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_subject_categories_with_empty_user(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_subject_categories_with_user_contain_space(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_delete_subject_categories(): + client = utilities.register_client() + req = delete_subject_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_subject_categories_without_id(): + client = utilities.register_client() + req = delete_subject_categories_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Subject Category Unknown" + + +#--------------------------------------------------------------------------- +#object_categories_test + +def get_object_categories(client): + req = client.get("/object_categories") + object_categories = utilities.get_json(req.data) + return req, object_categories + + +def add_object_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/object_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_categories = utilities.get_json(req.data) + return req, object_categories + + +def delete_object_categories(client, name): + request, object_categories = get_object_categories(client) + for key, value in object_categories['object_categories'].items(): + if value['name'] == name: + return client.delete("/object_categories/{}".format(key)) + + +def delete_object_categories_without_id(client): + req = client.delete("/object_categories/{}".format("")) + return req + + +def test_get_object_categories(): + client = utilities.register_client() + req, object_categories = get_object_categories(client) + assert req.status_code == 200 + assert isinstance(object_categories, dict) + assert "object_categories" in object_categories + + +def test_add_object_categories(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(object_categories, dict) + value = list(object_categories["object_categories"].values())[0] + assert "object_categories" in object_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_object_categories_with_empty_user(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_object_categories_with_user_contain_space(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_delete_object_categories(): + client = utilities.register_client() + req = delete_object_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_object_categories_without_id(): + client = utilities.register_client() + req = delete_object_categories_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Object Category Unknown" + + +#--------------------------------------------------------------------------- +#action_categories_test + +def get_action_categories(client): + req = client.get("/action_categories") + action_categories = utilities.get_json(req.data) + return req, action_categories + + +def add_action_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/action_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_categories = utilities.get_json(req.data) + return req, action_categories + + +def delete_action_categories(client, name): + request, action_categories = get_action_categories(client) + for key, value in action_categories['action_categories'].items(): + if value['name'] == name: + return client.delete("/action_categories/{}".format(key)) + + +def delete_action_categories_without_id(client): + req = client.delete("/action_categories/{}".format("")) + return req + + +def test_get_action_categories(): + client = utilities.register_client() + req, action_categories = get_action_categories(client) + assert req.status_code == 200 + assert isinstance(action_categories, dict) + assert "action_categories" in action_categories + + +def test_add_action_categories(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(action_categories, dict) + value = list(action_categories["action_categories"].values())[0] + assert "action_categories" in action_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_action_categories_with_empty_user(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_action_categories_with_user_contain_space(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_delete_action_categories(): + client = utilities.register_client() + req = delete_action_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_action_categories_without_id(): + client = utilities.register_client() + req = delete_action_categories_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Action Category Unknown" diff --git a/moon_manager/tests/unit_python/api/test_meta_rules.py b/moon_manager/tests/unit_python/api/test_meta_rules.py new file mode 100644 index 00000000..80d648b4 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_meta_rules.py @@ -0,0 +1,175 @@ +import json +import api.utilities as utilities +from helpers import category_helper +from uuid import uuid4 + + +def get_meta_rules(client): + req = client.get("/meta_rules") + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules(client, name): + subject_category = category_helper.add_subject_category(value={"name": "subject category name"+uuid4().hex, "description": "description 1"}) + subject_category_id = list(subject_category.keys())[0] + object_category = category_helper.add_object_category(value={"name": "object category name"+ uuid4().hex, "description": "description 1"}) + object_category_id = list(object_category.keys())[0] + action_category = category_helper.add_action_category(value={"name": "action category name"+uuid4().hex, "description": "description 1"}) + action_category_id = list(action_category.keys())[0] + + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules_without_subject_category_ids(client, name): + data = { + "name": name, + "subject_categories": [], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules(client, name, metaRuleId): + subject_category = category_helper.add_subject_category( + value={"name": "subject category name update" + uuid4().hex, "description": "description 1"}) + subject_category_id = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name update" + uuid4().hex, "description": "description 1"}) + object_category_id = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name update" + uuid4().hex, "description": "description 1"}) + action_category_id = list(action_category.keys())[0] + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules_without_subject_category_ids(client, name): + data = { + "name": name, + "subject_categories": [], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def delete_meta_rules(client, name): + request, meta_rules = get_meta_rules(client) + for key, value in meta_rules['meta_rules'].items(): + if value['name'] == name: + req = client.delete("/meta_rules/{}".format(key)) + break + return req + + +def delete_meta_rules_without_id(client): + req = client.delete("/meta_rules/{}".format("")) + return req + + +def test_get_meta_rules(): + client = utilities.register_client() + req, meta_rules = get_meta_rules(client) + assert req.status_code == 200 + assert isinstance(meta_rules, dict) + assert "meta_rules" in meta_rules + + +def test_add_meta_rules(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "testuser") + assert req.status_code == 200 + assert isinstance(meta_rules, dict) + value = list(meta_rules["meta_rules"].values())[0] + assert "meta_rules" in meta_rules + assert value['name'] == "testuser" + + +def test_add_meta_rules_with_empty_user(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_meta_rules_with_user_contain_space(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_add_meta_rules_without_subject_categories(): + client = utilities.register_client() + req, meta_rules = add_meta_rules_without_subject_category_ids(client, "testuser") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'subject_categories', [Empty Container]" + + +def test_delete_meta_rules(): + client = utilities.register_client() + req = delete_meta_rules(client, "testuser") + assert req.status_code == 200 + + +def test_delete_meta_rules_without_id(): + client = utilities.register_client() + req = delete_meta_rules_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Meta Rule Unknown" + + +def test_update_meta_rules(): + client = utilities.register_client() + req = add_meta_rules(client, "testuser") + meta_rule_id = list(req[1]['meta_rules'])[0] + req_update = update_meta_rules(client, "testuser", meta_rule_id) + assert req_update[0].status_code == 200 + delete_meta_rules(client, "testuser") + get_meta_rules(client) + + +def test_update_meta_rules_without_id(): + client = utilities.register_client() + req_update = update_meta_rules(client, "testuser", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "400: Meta Rule Unknown" + + +def test_update_meta_rules_without_user(): + client = utilities.register_client() + req_update = update_meta_rules(client, "", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Key: 'name', [Empty String]" + + +def test_update_meta_rules_without_subject_categories(): + client = utilities.register_client() + req_update = update_meta_rules_without_subject_category_ids(client, "testuser") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Key: 'subject_categories', [Empty Container]" diff --git a/moon_manager/tests/unit_python/api/test_models.py b/moon_manager/tests/unit_python/api/test_models.py deleted file mode 100644 index 3c205d1d..00000000 --- a/moon_manager/tests/unit_python/api/test_models.py +++ /dev/null @@ -1,67 +0,0 @@ -import json -import api.utilities as utilities - - -def get_models(client): - req = client.get("/models") - models = utilities.get_json(req.data) - return req, models - - -def add_models(client, name): - data = { - "name": name, - "description": "description of {}".format(name), - "meta_rules": ["meta_rule_id1", "meta_rule_id2"] - } - req = client.post("/models", data=json.dumps(data), - headers={'Content-Type': 'application/json'}) - models = utilities.get_json(req.data) - return req, models - - -def delete_models(client, name): - request, models = get_models(client) - for key, value in models['models'].items(): - if value['name'] == name: - req = client.delete("/models/{}".format(key)) - break - return req - - -def delete_models_without_id(client): - req = client.delete("/models/{}".format("")) - return req - - -def test_get_models(): - client = utilities.register_client() - req, models= get_models(client) - assert req.status_code == 200 - assert isinstance(models, dict) - assert "models" in models - - -def test_add_models(): - client = utilities.register_client() - req, models = add_models(client, "testuser") - assert req.status_code == 200 - assert isinstance(models, dict) - value = list(models["models"].values())[0] - assert "models" in models - assert value['name'] == "testuser" - assert value["description"] == "description of {}".format("testuser") - assert value["meta_rules"][0] == "meta_rule_id1" - - -def test_delete_models(): - client = utilities.register_client() - req = delete_models(client, "testuser") - assert req.status_code == 200 - - -def test_delete_models_without_id(): - client = utilities.register_client() - req = delete_models_without_id(client) - assert req.status_code == 500 - diff --git a/moon_manager/tests/unit_python/api/test_pdp.py b/moon_manager/tests/unit_python/api/test_pdp.py index a2d0cb5a..1ac9b84f 100644 --- a/moon_manager/tests/unit_python/api/test_pdp.py +++ b/moon_manager/tests/unit_python/api/test_pdp.py @@ -1,6 +1,7 @@ import json import api.utilities as utilities -import pytest +from helpers import data_builder as builder +from uuid import uuid4 def get_pdp(client): @@ -16,6 +17,13 @@ def add_pdp(client, data): return req, pdp +def update_pdp(client, data, pdp_id): + req = client.patch("/pdp/{}".format(pdp_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + pdp = utilities.get_json(req.data) + return req, pdp + + def delete_pdp(client, key): req = client.delete("/pdp/{}".format(key)) return req @@ -35,9 +43,15 @@ def test_get_pdp(): def test_add_pdp(): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) data = { "name": "testuser", - "security_pipeline": ["policy_id_1", "policy_id_2"], + "security_pipeline": [policy_id], "keystone_project_id": "keystone_project_id", "description": "description of testuser" } @@ -60,3 +74,128 @@ def test_delete_pdp(): success_req = delete_pdp(client, key) break assert success_req.status_code == 200 + + +def test_add_pdp_with_empty_user(): + data = { + "name": "", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req, models = add_pdp(client, data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_pdp_with_user_contain_space(): + data = { + "name": "test user", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req, models = add_pdp(client, data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_add_pdp_without_security_pipeline(): + data = { + "name": "testuser", + "security_pipeline": [], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req, meta_rules = add_pdp(client, data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'security_pipeline', [Empty Container]" + + +def test_add_pdp_without_keystone(): + data = { + "name": "testuser", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "", + "description": "description of testuser" + } + client = utilities.register_client() + req, meta_rules = add_pdp(client, data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'keystone_project_id', [Empty String]" + + +def test_update_pdp(): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1"+uuid4().hex, + object_category_name="object_category1"+uuid4().hex, + action_category_name="action_category1"+uuid4().hex, + meta_rule_name="meta_rule_1"+uuid4().hex, + model_name="model1"+uuid4().hex) + data_add = { + "name": "testuser", + "security_pipeline": [policy_id], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id_update = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data_update = { + "name": "testuser", + "security_pipeline": [policy_id_update], + "keystone_project_id": "keystone_project_id_update", + "description": "description of testuser" + } + client = utilities.register_client() + req = add_pdp(client, data_add) + pdp_id = list(req[1]['pdps'])[0] + req_update = update_pdp(client, data_update, pdp_id) + assert req_update[0].status_code == 200 + value = list(req_update[1]["pdps"].values())[0] + assert value["keystone_project_id"] == "keystone_project_id_update" + request, pdp = get_pdp(client) + for key, value in pdp['pdps'].items(): + if value['name'] == "testuser": + delete_pdp(client, key) + break + + +def test_update_pdp_without_id(): + client = utilities.register_client() + req_update = update_pdp(client, "testuser", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == 'Invalid Key :name not found' + + +def test_update_pdp_without_user(): + data = { + "name": "", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req_update = update_pdp(client, data, "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Key: 'name', [Empty String]" + + +def test_update_pdp_without_security_pipeline(): + data = { + "name": "testuser", + "security_pipeline": [], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req_update = update_pdp(client, data, "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Key: 'security_pipeline', [Empty Container]"
\ No newline at end of file diff --git a/moon_manager/tests/unit_python/api/test_perimeter.py b/moon_manager/tests/unit_python/api/test_perimeter.py index db09780f..322d90c6 100644 --- a/moon_manager/tests/unit_python/api/test_perimeter.py +++ b/moon_manager/tests/unit_python/api/test_perimeter.py @@ -2,156 +2,282 @@ # import moon_manager.api import json import api.utilities as utilities +from helpers import data_builder as builder +from uuid import uuid4 def get_subjects(client): req = client.get("/subjects") - assert req.status_code == 200 subjects = utilities.get_json(req.data) - assert isinstance(subjects, dict) - assert "subjects" in subjects - return subjects + return req, subjects def add_subjects(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) data = { - "name": name, + "name": name + uuid4().hex, "description": "description of {}".format(name), "password": "password for {}".format(name), "email": "{}@moon".format(name) } - req = client.post("/subjects", data=json.dumps(data), + req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data), headers={'Content-Type': 'application/json'}) - assert req.status_code == 200 subjects = utilities.get_json(req.data) + return req, subjects + + +def delete_subject(client): + subjects = get_subjects(client) + value = subjects[1]['subjects'] + id = list(value.keys())[0] + policy_id = builder.get_policy_id_with_subject_assignment() + return client.delete("/policies/{}/subjects/{}".format(policy_id, id)) + + +def delete_subjects_without_perimeter_id(client): + req = client.delete("/subjects/{}".format("")) + return req + + +def test_perimeter_get_subject(): + client = utilities.register_client() + req, subjects = get_subjects(client) + assert req.status_code == 200 assert isinstance(subjects, dict) - key = list(subjects["subjects"].keys())[0] + assert "subjects" in subjects + + +def test_perimeter_add_subject(): + client = utilities.register_client() + req, subjects = add_subjects(client, "testuser") value = list(subjects["subjects"].values())[0] + assert req.status_code == 200 assert "subjects" in subjects - assert key == "1111111111111" - assert value['id'] == "1111111111111" - assert value['name'] == name - assert value["description"] == "description of {}".format(name) - assert value["email"] == "{}@moon".format(name) - return subjects + assert value["name"] is not None + assert value["email"] is not None -def add_subjects_without_name(client, name): +def test_perimeter_add_subject_without_name(): + client = utilities.register_client() data = { - "name": name, - "description": "description of {}".format(name), - "password": "password for {}".format(name), - "email": "{}@moon".format(name) + "name": "", + "description": "description of {}".format(""), + "password": "password for {}".format(""), + "email": "{}@moon".format("") } - req = client.post("/subjects", data=json.dumps(data), + req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data), headers={'Content-Type': 'application/json'}) - assert req.status_code == 500 + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" -def delete_subject(client, name): - subjects = get_subjects(client) - for key, value in subjects['subjects'].items(): - if value['name'] == name: - req = client.delete("/subjects/{}".format(key)) - assert req.status_code == 200 - break - subjects = get_subjects(client) - assert name not in [x['name'] for x in subjects["subjects"].values()] +def test_perimeter_add_subject_with_name_contain_spaces(): + client = utilities.register_client() + data = { + "name": "test user", + "description": "description of {}".format("test user"), + "password": "password for {}".format("test user"), + "email": "{}@moon".format("test user") + } + req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_perimeter_delete_subject(): + client = utilities.register_client() + req = delete_subject(client) + assert req.status_code == 200 -def test_subject(): +def test_perimeter_delete_subjects_without_perimeter_id(): client = utilities.register_client() - get_subjects(client) - add_subjects(client, "testuser") - add_subjects_without_name(client, "") - delete_subject(client, "testuser") + req = delete_subjects_without_perimeter_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Subject Unknown" def get_objects(client): req = client.get("/objects") - assert req.status_code == 200 objects = utilities.get_json(req.data) - assert isinstance(objects, dict) - assert "objects" in objects - return objects + return req, objects def add_objects(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) data = { - "name": name, + "name": name + uuid4().hex, "description": "description of {}".format(name), } - req = client.post("/objects", data=json.dumps(data), + req = client.post("/policies/{}/objects/".format(policyId), data=json.dumps(data), headers={'Content-Type': 'application/json'}) - assert req.status_code == 200 objects = utilities.get_json(req.data) + return req, objects + + +def delete_object(client): + objects = get_objects(client) + value = objects[1]['objects'] + id = list(value.keys())[0] + policy_id = builder.get_policy_id_with_object_assignment() + return client.delete("/policies/{}/objects/{}".format(policy_id, id)) + + +def delete_objects_without_perimeter_id(client): + req = client.delete("/objects/{}".format("")) + return req + + +def test_perimeter_get_object(): + client = utilities.register_client() + req, objects = get_objects(client) + assert req.status_code == 200 assert isinstance(objects, dict) - key = list(objects["objects"].keys())[0] + assert "objects" in objects + + +def test_perimeter_add_object(): + client = utilities.register_client() + req, objects = add_objects(client, "testuser") value = list(objects["objects"].values())[0] + assert req.status_code == 200 assert "objects" in objects - assert value['name'] == name - assert value["description"] == "description of {}".format(name) - return objects + assert value['name'] is not None -def delete_objects(client, name): - objects = get_objects(client) - for key, value in objects['objects'].items(): - if value['name'] == name: - req = client.delete("/objects/{}".format(key)) - assert req.status_code == 200 - break - objects = get_objects(client) - assert name not in [x['name'] for x in objects["objects"].values()] +def test_perimeter_add_object_without_name(): + client = utilities.register_client() + data = { + "name": "", + "description": "description of {}".format(""), + } + req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" -def test_objects(): +def test_perimeter_add_object_with_name_contain_spaces(): client = utilities.register_client() - get_objects(client) - add_objects(client, "testuser") - delete_objects(client, "testuser") + data = { + "name": "test user", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_perimeter_delete_object(): + client = utilities.register_client() + req = delete_object(client) + assert req.status_code == 200 + + +def test_perimeter_delete_objects_without_perimeter_id(): + client = utilities.register_client() + req = delete_objects_without_perimeter_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Object Unknown" def get_actions(client): req = client.get("/actions") - assert req.status_code == 200 actions = utilities.get_json(req.data) - assert isinstance(actions, dict) - assert "actions" in actions - return actions + return req, actions def add_actions(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) data = { - "name": name, + "name": name + uuid4().hex, "description": "description of {}".format(name), } - req = client.post("/actions", data=json.dumps(data), + req = client.post("/policies/{}/actions".format(policyId), data=json.dumps(data), headers={'Content-Type': 'application/json'}) - assert req.status_code == 200 actions = utilities.get_json(req.data) + return req, actions + + +def delete_actions(client): + actions = get_actions(client) + value = actions[1]['actions'] + id = list(value.keys())[0] + policy_id = builder.get_policy_id_with_action_assignment() + return client.delete("/policies/{}/actions/{}".format(policy_id, id)) + + +def delete_actions_without_perimeter_id(client): + req = client.delete("/actions/{}".format("")) + return req + + +def test_perimeter_get_actions(): + client = utilities.register_client() + req, actions = get_actions(client) + assert req.status_code == 200 assert isinstance(actions, dict) - key = list(actions["actions"].keys())[0] + assert "actions" in actions + + +def test_perimeter_add_actions(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") value = list(actions["actions"].values())[0] + assert req.status_code == 200 assert "actions" in actions - assert value['name'] == name - assert value["description"] == "description of {}".format(name) - return actions + assert value['name'] is not None -def delete_actions(client, name): - actions = get_actions(client) - for key, value in actions['actions'].items(): - if value['name'] == name: - req = client.delete("/actions/{}".format(key)) - assert req.status_code == 200 - break - actions = get_actions(client) - assert name not in [x['name'] for x in actions["actions"].values()] +def test_perimeter_add_actions_without_name(): + client = utilities.register_client() + data = { + "name": "", + "description": "description of {}".format(""), + } + req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_perimeter_add_actions_with_name_contain_spaces(): + client = utilities.register_client() + data = { + "name": "test user", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_perimeter_delete_actions(): + client = utilities.register_client() + req = delete_actions(client) + assert req.status_code == 200 -def test_actions(): +def test_perimeter_delete_actions_without_perimeter_id(): client = utilities.register_client() - get_actions(client) - add_actions(client, "testuser") - delete_actions(client, "testuser") + req = delete_actions_without_perimeter_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Action Unknown" diff --git a/moon_manager/tests/unit_python/api/test_policies.py b/moon_manager/tests/unit_python/api/test_policies.py index 4d4e387e..cd50f4c7 100644 --- a/moon_manager/tests/unit_python/api/test_policies.py +++ b/moon_manager/tests/unit_python/api/test_policies.py @@ -1,5 +1,12 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + import json +from uuid import uuid4 import api.utilities as utilities +from helpers import model_helper def get_policies(client): @@ -9,10 +16,12 @@ def get_policies(client): def add_policies(client, name): + req = model_helper.add_model(model_id="mls_model_id"+uuid4().hex) + model_id = list(req.keys())[0] data = { "name": name, "description": "description of {}".format(name), - "model_id": "modelId", + "model_id": model_id, "genre": "genre" } req = client.post("/policies", data=json.dumps(data), @@ -24,9 +33,8 @@ def add_policies(client, name): def delete_policies(client, name): request, policies = get_policies(client) for key, value in policies['policies'].items(): - if value['name'] == name: - req = client.delete("/policies/{}".format(key)) - break + req = client.delete("/policies/{}".format(key)) + break return req @@ -44,16 +52,15 @@ def test_get_policies(): def test_add_policies(): + policy_name = "testuser" + uuid4().hex client = utilities.register_client() - req, policies = add_policies(client, "testuser") + req, policies = add_policies(client, policy_name) assert req.status_code == 200 assert isinstance(policies, dict) value = list(policies["policies"].values())[0] assert "policies" in policies - assert value['name'] == "testuser" - assert value["description"] == "description of {}".format("testuser") - assert value["model_id"] == "modelId" - assert value["genre"] == "genre" + assert value['name'] == policy_name + assert value["description"] == "description of {}".format(policy_name) def test_delete_policies(): @@ -65,5 +72,6 @@ def test_delete_policies(): def test_delete_policies_without_id(): client = utilities.register_client() req = delete_policies_without_id(client) - assert req.status_code == 500 + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' diff --git a/moon_manager/tests/unit_python/api/test_rules.py b/moon_manager/tests/unit_python/api/test_rules.py index 86a3d390..af1501e4 100644 --- a/moon_manager/tests/unit_python/api/test_rules.py +++ b/moon_manager/tests/unit_python/api/test_rules.py @@ -1,5 +1,8 @@ import api.utilities as utilities import json +from helpers import data_builder as builder +from uuid import uuid4 +from helpers import policy_helper def get_rules(client, policy_id): @@ -8,9 +11,45 @@ def get_rules(client, policy_id): return req, rules -def add_rules(client, policy_id): +def add_rules(client): + sub_id, obj_id, act_id, meta_rule_id, policy_id = builder.create_new_policy("sub_cat" + uuid4().hex, + "obj_cat" + uuid4().hex, + "act_cat" + uuid4().hex) + sub_data_id = builder.create_subject_data(policy_id, sub_id) + obj_data_id = builder.create_object_data(policy_id, obj_id) + act_data_id = builder.create_action_data(policy_id, act_id) data = { - "meta_rule_id": "meta_rule_id1", + "meta_rule_id": meta_rule_id, + "rule": [sub_data_id, obj_data_id, act_data_id], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules + + +def add_rules_without_policy_id(client): + data = { + "meta_rule_id": "meta_rule_id", + "rule": ["sub_data_id", "obj_data_id", "act_data_id"], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(None), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules + + +def add_rules_without_meta_rule_id(client, policy_id): + data = { + "meta_rule_id": "", "rule": ["subject_data_id2", "object_data_id2", "action_data_id2"], "instructions": ( {"decision": "grant"}, @@ -23,6 +62,20 @@ def add_rules(client, policy_id): return req, rules +def add_rules_without_rule(client, policy_id): + data = { + "meta_rule_id": "meta_rule_id1", + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules + + def delete_rules(client, policy_id, meta_rule_id): req = client.delete("/policies/{}/rules/{}".format(policy_id, meta_rule_id)) return req @@ -35,24 +88,61 @@ def test_get_rules(): assert req.status_code == 200 assert isinstance(rules, dict) assert "rules" in rules + return req, rules def test_add_rules(): - policy_id = utilities.get_policy_id() client = utilities.register_client() - req, rules = add_rules(client, policy_id) + req, rules = add_rules(client, ) assert req.status_code == 200 - assert isinstance(rules, dict) - value = rules["rules"] - assert "rules" in rules - id = list(value.keys())[0] - assert value[id]["meta_rule_id"] == "meta_rule_id1" -def test_delete_rules(): +def test_add_rules_without_policy_id(): + client = utilities.register_client() + req, rules = add_rules_without_policy_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_add_rules_without_meta_rule_id(): + policy_id = utilities.get_policy_id() client = utilities.register_client() + req, rules = add_rules_without_meta_rule_id(client, policy_id) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]" + + +def test_add_rules_without_rule(): policy_id = utilities.get_policy_id() + client = utilities.register_client() + req, rules = add_rules_without_rule(client, policy_id) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == 'Invalid Key :rule not found' + + +def test_delete_rules_with_invalid_parameters(): + client = utilities.register_client() + rules = delete_rules(client, "", "") + assert rules.status_code == 404 + + +def test_delete_rules_without_policy_id(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + sub_data_id = builder.create_subject_data(policy_id, subject_category_id) + obj_data_id = builder.create_object_data(policy_id, object_category_id) + act_data_id = builder.create_action_data(policy_id, action_category_id) + data = { + "meta_rule_id": meta_rule_id, + "rule": [sub_data_id, obj_data_id, act_data_id], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) req, added_rules = get_rules(client, policy_id) - id = added_rules["rules"]['rules'][0]['id'] - rules = delete_rules(client, policy_id, id) + id = list(added_rules["rules"]["rules"])[0]["id"] + rules = delete_rules(client, None, id) assert rules.status_code == 200 diff --git a/moon_manager/tests/unit_python/api/test_unit_models.py b/moon_manager/tests/unit_python/api/test_unit_models.py new file mode 100644 index 00000000..d754b976 --- /dev/null +++ b/moon_manager/tests/unit_python/api/test_unit_models.py @@ -0,0 +1,186 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import json +import api.utilities as utilities +from helpers import data_builder as builder +from uuid import uuid4 + + +def get_models(client): + req = client.get("/models") + models = utilities.get_json(req.data) + return req, models + + +def add_models(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule( + subject_category_name="subject_category"+uuid4().hex, + object_category_name="object_category"+uuid4().hex, action_category_name="action_category"+uuid4().hex, + meta_rule_name="meta_rule" + uuid4().hex) + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + req = client.post("/models", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def update_model(client, name, model_id): + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule( + subject_category_name="subject_category" + uuid4().hex, + object_category_name="object_category" + uuid4().hex, action_category_name="action_category" + uuid4().hex, + meta_rule_name="meta_rule" + uuid4().hex) + + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + req = client.patch("/models/{}".format(model_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def add_model_without_meta_rules_ids(client, name): + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [] + } + req = client.post("/models", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def update_model_without_meta_rules_ids(client, name): + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [] + } + req = client.patch("/models", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def delete_models(client, name): + request, models = get_models(client) + for key, value in models['models'].items(): + if value['name'] == name: + req = client.delete("/models/{}".format(key)) + break + return req + + +def delete_models_without_id(client): + req = client.delete("/models/{}".format("")) + return req + + +def clean_models(): + client = utilities.register_client() + req, models = get_models(client) + for key, value in models['models'].items(): + print(key) + print(value) + client.delete("/models/{}".format(key)) + + +def test_get_models(): + client = utilities.register_client() + req, models = get_models(client) + assert req.status_code == 200 + assert isinstance(models, dict) + assert "models" in models + + +def test_add_models(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, "testuser") + assert req.status_code == 200 + assert isinstance(models, dict) + model_id = list(models["models"])[0] + assert "models" in models + assert models['models'][model_id]['name'] == "testuser" + assert models['models'][model_id]["description"] == "description of {}".format("testuser") + + +def test_delete_models(): + client = utilities.register_client() + req = delete_models(client, "testuser") + assert req.status_code == 200 + + +def test_delete_models_without_id(): + client = utilities.register_client() + req = delete_models_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Model Unknown" + + +def test_add_model_with_empty_user(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Empty String]" + + +def test_add_model_with_user_contain_space(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, "test user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [String contains space]" + + +def test_add_model_without_meta_rules(): + clean_models() + client = utilities.register_client() + req, meta_rules = add_model_without_meta_rules_ids(client, "testuser") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'meta_rules', [Empty Container]" + + +def test_update_model(): + clean_models() + client = utilities.register_client() + req = add_models(client, "testuser") + model_id = list(req[1]['models'])[0] + req_update = update_model(client, "testuser", model_id) + assert req_update[0].status_code == 200 + model_id = list(req_update[1]["models"])[0] + assert req_update[1]["models"][model_id]["meta_rules"][0] is not None + delete_models(client, "testuser") + + +def test_update_meta_rules_without_id(): + clean_models() + client = utilities.register_client() + req_update = update_model(client, "testuser", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "400: Model Unknown" + + +def test_update_meta_rules_without_user(): + client = utilities.register_client() + req_update = update_model(client, "", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Key: 'name', [Empty String]" + + +def test_update_meta_rules_without_meta_rules(): + client = utilities.register_client() + req_update = update_model_without_meta_rules_ids(client, "testuser") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Key: 'meta_rules', [Empty Container]" diff --git a/moon_manager/tests/unit_python/api/utilities.py b/moon_manager/tests/unit_python/api/utilities.py index 66ca30c5..2e51fec8 100644 --- a/moon_manager/tests/unit_python/api/utilities.py +++ b/moon_manager/tests/unit_python/api/utilities.py @@ -1,5 +1,5 @@ import json - +from uuid import uuid4 def get_json(data): return json.loads(data.decode("utf-8")) @@ -13,14 +13,14 @@ def register_client(): def get_policy_id(): - import api.test_policies as policies - client = register_client() - policy_id = '' - req, policy = policies.get_policies(client) - for id in policy['policies']: - if id: - policy_id = id - break - if not policy_id: - policies.add_policies(client, "testuser") + from helpers import policy_helper + value = { + "name": "test_policy"+uuid4().hex, + "model_id": "", + "genre": "authz", + "description": "test", + } + policy_helper.add_policies(value=value) + req = policy_helper.get_policies() + policy_id = list(req.keys())[0] return policy_id diff --git a/moon_manager/tests/unit_python/conftest.py b/moon_manager/tests/unit_python/conftest.py index 902a41a2..d9899231 100644 --- a/moon_manager/tests/unit_python/conftest.py +++ b/moon_manager/tests/unit_python/conftest.py @@ -187,12 +187,14 @@ def no_requests(monkeypatch): 'DELETE', 'http://keystone:5000/v3/auth/tokens', headers={'X-Subject-Token': "111111111"} ) + + def match_request_text(request): + # request.url may be None, or '' prevents a TypeError. + return 'http://keystone:5000/v3/users?name=testuser' in request.url + m.register_uri( - 'POST', 'http://keystone:5000/v3/users?name=testuser&domain_id=default', - json={"users": {}} - ) - m.register_uri( - 'GET', 'http://keystone:5000/v3/users?name=testuser&domain_id=default', + requests_mock.ANY, '/v3/users', + additional_matcher=match_request_text, json={"users": {}} ) m.register_uri( diff --git a/moon_manager/tests/unit_python/api/__init__.py b/moon_manager/tests/unit_python/helpers/__init__.py index e69de29b..e69de29b 100644 --- a/moon_manager/tests/unit_python/api/__init__.py +++ b/moon_manager/tests/unit_python/helpers/__init__.py diff --git a/moon_manager/tests/unit_python/helpers/assignment_helper.py b/moon_manager/tests/unit_python/helpers/assignment_helper.py new file mode 100644 index 00000000..22a56e38 --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/assignment_helper.py @@ -0,0 +1,49 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +def get_action_assignments(policy_id, action_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_action_assignments("", policy_id, action_id, category_id) + + +def add_action_assignment(policy_id, action_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id) + + +def delete_action_assignment(policy_id, action_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id) + + +def get_object_assignments(policy_id, object_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_object_assignments("", policy_id, object_id, category_id) + + +def add_object_assignment(policy_id, object_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id) + + +def delete_object_assignment(policy_id, object_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id) + + +def get_subject_assignments(policy_id, subject_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id) + + +def add_subject_assignment(policy_id, subject_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id) + + +def delete_subject_assignment(policy_id, subject_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id) + diff --git a/moon_manager/tests/unit_python/helpers/category_helper.py b/moon_manager/tests/unit_python/helpers/category_helper.py new file mode 100644 index 00000000..6c419ca8 --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/category_helper.py @@ -0,0 +1,40 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + + +def add_subject_category(cat_id=None, value=None): + from python_moondb.core import ModelManager + category = ModelManager.add_subject_category(user_id=None, category_id=cat_id, value=value) + return category + + +def get_subject_category(cat_id=None): + from python_moondb.core import ModelManager + category = ModelManager.get_subject_categories(user_id=None, category_id=cat_id) + return category + + +def add_object_category(cat_id=None, value=None): + from python_moondb.core import ModelManager + category = ModelManager.add_object_category(user_id=None, category_id=cat_id, value=value) + return category + + +def get_object_category(cat_id=None): + from python_moondb.core import ModelManager + category = ModelManager.get_object_categories(user_id=None, category_id=cat_id) + return category + + +def add_action_category(cat_id=None, value=None): + from python_moondb.core import ModelManager + category = ModelManager.add_action_category(user_id=None, category_id=cat_id, value=value) + return category + + +def get_action_category(cat_id=None): + from python_moondb.core import ModelManager + category = ModelManager.get_action_categories(user_id=None, category_id=cat_id) + return category diff --git a/moon_manager/tests/unit_python/helpers/data_builder.py b/moon_manager/tests/unit_python/helpers/data_builder.py new file mode 100644 index 00000000..2a7c5979 --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/data_builder.py @@ -0,0 +1,209 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from .category_helper import * +from .policy_helper import * +from .data_helper import * +from helpers import model_helper +from .meta_rule_helper import * +import api.utilities as utilities +import json + + +def create_subject_category(name): + subject_category = add_subject_category( + value={"name": name + uuid4().hex, "description": "description 1"}) + return list(subject_category.keys())[0] + + +def create_object_category(name): + object_category = add_object_category( + value={"name": name + uuid4().hex, "description": "description 1"}) + return list(object_category.keys())[0] + + +def create_action_category(name): + action_category = add_action_category( + value={"name": name + uuid4().hex, "description": "description 1"}) + return list(action_category.keys())[0] + + +def create_model(meta_rule_id, model_name="test_model"): + value = { + "name": model_name + uuid4().hex, + "description": "test", + "meta_rules": [meta_rule_id] + + } + return value + + +def create_policy(model_id, policy_name="policy_1"): + value = { + "name": policy_name, + "model_id": model_id, + "genre": "authz", + "description": "test", + } + return value + + +def create_pdp(policies_ids): + value = { + "name": "test_pdp", + "security_pipeline": policies_ids, + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + return value + + +def create_new_policy(subject_category_name="subjectCategory", object_category_name="objectCategory", + action_category_name="actionCategory", + model_name="test_model" + uuid4().hex, policy_name="policy_1" + uuid4().hex, + meta_rule_name="meta_rule1" + uuid4().hex): + subject_category_id, object_category_id, action_category_id, meta_rule_id = create_new_meta_rule( + subject_category_name=subject_category_name + uuid4().hex, + object_category_name=object_category_name + uuid4().hex, + action_category_name=action_category_name + uuid4().hex, meta_rule_name=meta_rule_name + uuid4().hex) + model = model_helper.add_model(value=create_model(meta_rule_id, model_name)) + model_id = list(model.keys())[0] + value = create_policy(model_id, policy_name) + policy = add_policies(value=value) + assert policy + policy_id = list(policy.keys())[0] + return subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id + + +def create_new_meta_rule(subject_category_name="subjectCategory", object_category_name="objectCategory", + action_category_name="actionCategory", + meta_rule_name="meta_rule1" + uuid4().hex): + subject_category_id = create_subject_category(subject_category_name) + object_category_id = create_object_category(object_category_name) + action_category_id = create_action_category(action_category_name) + value = {"name": meta_rule_name, + "algorithm": "name of the meta rule algorithm", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + meta_rule = add_meta_rule(value=value) + return subject_category_id, object_category_id, action_category_id, list(meta_rule.keys())[0] + + +def create_subject(policy_id): + value = { + "name": "testuser" + uuid4().hex, + "description": "test", + } + subject = add_subject(policy_id=policy_id, value=value) + return list(subject.keys())[0] + + +def create_object(policy_id): + value = { + "name": "testobject" + uuid4().hex, + "description": "test", + } + object = add_object(policy_id=policy_id, value=value) + return list(object.keys())[0] + + +def create_action(policy_id): + value = { + "name": "testaction" + uuid4().hex, + "description": "test", + } + action = add_action(policy_id=policy_id, value=value) + return list(action.keys())[0] + + +def create_subject_data(policy_id, category_id): + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + subject_data = add_subject_data(policy_id=policy_id, category_id=category_id, value=value).get('data') + assert subject_data + return list(subject_data.keys())[0] + + +def create_object_data(policy_id, category_id): + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + object_data = add_object_data(policy_id=policy_id, category_id=category_id, value=value).get('data') + return list(object_data.keys())[0] + + +def create_action_data(policy_id, category_id): + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + action_data = add_action_data(policy_id=policy_id, category_id=category_id, value=value).get('data') + return list(action_data.keys())[0] + + +def get_policy_id_with_subject_assignment(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + subject_id = create_subject(policy_id) + data_id = create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + data = { + "id": subject_id, + "category_id": subject_category_id, + "data_id": data_id + } + client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + return policy_id + + +def get_policy_id_with_object_assignment(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + object_id = create_object(policy_id) + data_id = create_object_data(policy_id=policy_id, category_id=object_category_id) + + data = { + "id": object_id, + "category_id": object_category_id, + "data_id": data_id + } + + client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + return policy_id + + +def get_policy_id_with_action_assignment(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + action_id = create_action(policy_id) + data_id = create_action_data(policy_id=policy_id, category_id=action_category_id) + + data = { + "id": action_id, + "category_id": action_category_id, + "data_id": data_id + } + client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + return policy_id diff --git a/moon_manager/tests/unit_python/helpers/data_helper.py b/moon_manager/tests/unit_python/helpers/data_helper.py new file mode 100644 index 00000000..da6b9376 --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/data_helper.py @@ -0,0 +1,99 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + + +def get_action_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_action_data("", policy_id, data_id, category_id) + + +def add_action_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_action_data("", policy_id, data_id, category_id, value) + + +def delete_action_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action_data("", policy_id, data_id) + + +def get_object_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_object_data("", policy_id, data_id, category_id) + + +def add_object_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_object_data("", policy_id, data_id, category_id, value) + + +def delete_object_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object_data("", policy_id, data_id) + + +def get_subject_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subject_data("", policy_id, data_id, category_id) + + +def add_subject_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value) + + +def delete_subject_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject_data("", policy_id, data_id) + + +def get_actions(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_actions("", policy_id, perimeter_id) + + +def add_action(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_action("", policy_id, perimeter_id, value) + + +def delete_action(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action("", policy_id, perimeter_id) + + +def get_objects(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_objects("", policy_id, perimeter_id) + + +def add_object(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_object("", policy_id, perimeter_id, value) + + +def delete_object(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object("", policy_id, perimeter_id) + + +def get_subjects(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subjects("", policy_id, perimeter_id) + + +def add_subject(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_subject("", policy_id, perimeter_id, value) + + +def delete_subject(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject("", policy_id, perimeter_id) + + +def get_available_metadata(policy_id): + from python_moondb.core import PolicyManager + return PolicyManager.get_available_metadata("", policy_id) diff --git a/moon_manager/tests/unit_python/helpers/meta_rule_helper.py b/moon_manager/tests/unit_python/helpers/meta_rule_helper.py new file mode 100644 index 00000000..e882706b --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/meta_rule_helper.py @@ -0,0 +1,49 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from helpers import data_builder as builder +from uuid import uuid4 + + +def set_meta_rule(meta_rule_id, value=None): + from python_moondb.core import ModelManager + if not value: + action_category_id = builder.create_action_category("action_category_id1"+uuid4().hex) + subject_category_id = builder.create_subject_category("subject_category_id1"+uuid4().hex) + object_category_id = builder.create_object_category("object_category_id1"+uuid4().hex) + value = { + "name": "MLS_meta_rule", + "description": "test", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + return ModelManager.set_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value) + + +def add_meta_rule(meta_rule_id=None, value=None): + from python_moondb.core import ModelManager + if not value: + action_category_id = builder.create_action_category("action_category_id1"+uuid4().hex) + subject_category_id = builder.create_subject_category("subject_category_id1"+uuid4().hex) + object_category_id = builder.create_object_category("object_category_id1"+uuid4().hex) + value = { + "name": "MLS_meta_rule"+uuid4().hex, + "description": "test", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + return ModelManager.add_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value) + + +def get_meta_rules(meta_rule_id=None): + from python_moondb.core import ModelManager + return ModelManager.get_meta_rules(user_id=None, meta_rule_id=meta_rule_id) + + +def delete_meta_rules(meta_rule_id=None): + from python_moondb.core import ModelManager + ModelManager.delete_meta_rule(user_id=None, meta_rule_id=meta_rule_id) diff --git a/moon_manager/tests/unit_python/helpers/model_helper.py b/moon_manager/tests/unit_python/helpers/model_helper.py new file mode 100644 index 00000000..d2ffb85b --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/model_helper.py @@ -0,0 +1,51 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from helpers import data_builder as builder +from uuid import uuid4 + + +def get_models(model_id=None): + from python_moondb.core import ModelManager + return ModelManager.get_models(user_id=None, model_id=model_id) + + +def add_model(model_id=None, value=None): + from python_moondb.core import ModelManager + if not value: + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule( + subject_category_name="subject_category1"+uuid4().hex, + object_category_name="object_category1"+uuid4().hex, + action_category_name="action_category1"+uuid4().hex) + name = "MLS" if model_id is None else "MLS " + model_id + value = { + "name": name, + "description": "test", + "meta_rules": [meta_rule_id] + } + return ModelManager.add_model(user_id=None, model_id=model_id, value=value) + + +def delete_models(uuid=None, name=None): + from python_moondb.core import ModelManager + if not uuid: + for model_id, model_value in get_models(): + if name == model_value['name']: + uuid = model_id + break + ModelManager.delete_model(user_id=None, model_id=uuid) + + +def delete_all_models(): + from python_moondb.core import ModelManager + models_values = get_models() + print(models_values) + for model_id, model_value in models_values.items(): + ModelManager.delete_model(user_id=None, model_id=model_id) + + +def update_model(model_id=None, value=None): + from python_moondb.core import ModelManager + return ModelManager.update_model(user_id=None, model_id=model_id, value=value) diff --git a/moon_manager/tests/unit_python/helpers/pdp_helper.py b/moon_manager/tests/unit_python/helpers/pdp_helper.py new file mode 100644 index 00000000..3d169b06 --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/pdp_helper.py @@ -0,0 +1,23 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +def update_pdp(pdp_id, value): + from python_moondb.core import PDPManager + return PDPManager.update_pdp("", pdp_id, value) + + +def delete_pdp(pdp_id): + from python_moondb.core import PDPManager + PDPManager.delete_pdp("", pdp_id) + + +def add_pdp(pdp_id=None, value=None): + from python_moondb.core import PDPManager + return PDPManager.add_pdp("", pdp_id, value) + + +def get_pdp(pdp_id=None): + from python_moondb.core import PDPManager + return PDPManager.get_pdp("", pdp_id) diff --git a/moon_manager/tests/unit_python/helpers/policy_helper.py b/moon_manager/tests/unit_python/helpers/policy_helper.py new file mode 100644 index 00000000..c932ee3a --- /dev/null +++ b/moon_manager/tests/unit_python/helpers/policy_helper.py @@ -0,0 +1,61 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +def get_policies(): + from python_moondb.core import PolicyManager + return PolicyManager.get_policies("admin") + + +def add_policies(policy_id=None, value=None): + from python_moondb.core import PolicyManager + if not value: + value = { + "name": "test_policy", + "model_id": "", + "genre": "authz", + "description": "test", + } + return PolicyManager.add_policy("admin", policy_id=policy_id, value=value) + + +def delete_policies(uuid=None, name=None): + from python_moondb.core import PolicyManager + if not uuid: + for policy_id, policy_value in get_policies(): + if name == policy_value['name']: + uuid = policy_id + break + PolicyManager.delete_policy("admin", uuid) + + +def update_policy(policy_id, value): + from python_moondb.core import PolicyManager + return PolicyManager.update_policy("admin", policy_id, value) + + +def get_policy_from_meta_rules(meta_rule_id): + from python_moondb.core import PolicyManager + return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id) + + +def get_rules(policy_id=None, meta_rule_id=None, rule_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id) + + +def add_rule(policy_id=None, meta_rule_id=None, value=None): + from python_moondb.core import PolicyManager + if not value: + value = { + "rule": ("high", "medium", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + return PolicyManager.add_rule("", policy_id, meta_rule_id, value) + + +def delete_rule(policy_id=None, rule_id=None): + from python_moondb.core import PolicyManager + PolicyManager.delete_rule("", policy_id, rule_id) diff --git a/moon_manager/tests/unit_python/requirements.txt b/moon_manager/tests/unit_python/requirements.txt index 21975ce3..6c6e5bb8 100644 --- a/moon_manager/tests/unit_python/requirements.txt +++ b/moon_manager/tests/unit_python/requirements.txt @@ -2,4 +2,4 @@ flask flask_cors flask_restful python_moondb -python_moonutilities
\ No newline at end of file +python_moonutilities |