diff options
Diffstat (limited to 'old/moon_manager/tests')
33 files changed, 6226 insertions, 0 deletions
diff --git a/old/moon_manager/tests/functional_pod/conftest.py b/old/moon_manager/tests/functional_pod/conftest.py new file mode 100644 index 00000000..b5811755 --- /dev/null +++ b/old/moon_manager/tests/functional_pod/conftest.py @@ -0,0 +1,12 @@ +import pytest + +print("ANALYSING CONFTEST") + + +@pytest.fixture +def context(): + print("CREATING CONTEXT") + yield { + "hostname": "manager", + "port": 8082, + } diff --git a/old/moon_manager/tests/functional_pod/json/mls.json b/old/moon_manager/tests/functional_pod/json/mls.json new file mode 100644 index 00000000..01ef6deb --- /dev/null +++ b/old/moon_manager/tests/functional_pod/json/mls.json @@ -0,0 +1,89 @@ +{ + "pdps": [{"name" : "pdp_mls", "keystone_project_id" : "", "description": "", "policies": [{"name": "MLS policy example"}]}], + + "policies":[{ "name": "MLS policy example", "genre": "authz", "description": "", "model": {"name": "MLS"} , "mandatory" :false , "override":true}], + + "models":[{"name":"MLS", "description":"","meta_rules": [{"name" : "mls"}], "override":true}], + + + + + + "subjects": [{ "name":"adminuser", "description": "", "extra": {}, "policies": [{ "name": "MLS policy example"}]} , + { "name": "user1", "description": "", "extra": {}, "policies": [{ "name": "MLS policy example"}] }, + { "name": "user2", "description": "", "extra": {}, "policies": [{ "name": "MLS policy example"}] }], + + "subject_categories": [{ "name":"subject-security-level", "description": "" }], + + "subject_data": [{ "name":"low", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "subject-security-level"}}, + { "name":"medium", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "subject-security-level"}}, + { "name":"high", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "subject-security-level"}}], + + "subject_assignments":[{ "subject" : {"name": "adminuser"}, "category" : {"name": "subject-security-level"}, "assignments": [{"name" : "high"}]}, + { "subject" : {"name": "user1"}, "category" : {"name": "subject-security-level"}, "assignments": [{"name" : "medium"}] }], + + + + + + + "objects": [{ "name":"vm0", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]} , + {"name": "vm1", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]} ], + + "object_categories": [{"name":"object-security-level", "description": ""}], + + "object_data": [{ "name":"low", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "object-security-level"}}, + { "name":"medium", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "object-security-level"}}, + { "name":"high", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "object-security-level"}}], + + "object_assignments":[{ "object" : {"name": "vm0"}, "category" : {"name": "object-security-level"}, "assignments": [{"name" : "medium"}]}, + { "object" : {"name": "vm1"}, "category" : {"name": "object-security-level"}, "assignments": [{"name" : "low"}]}], + + + + + + + "actions": [{ "name": "start", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]} , + { "name": "stop", "description": "", "extra": {}, "policies": [{"name": "MLS policy example"}]}], + + "action_categories": [{"name":"action-type", "description": ""}], + + "action_data": [{"name":"vm-action", "description": "", "policies": [{"name": "MLS policy example"}], "category": {"name": "action-type"}}, + {"name":"storage-action", "description": "", "policies": [{"name" :"MLS policy example"}], "category": {"name": "action-type"}}], + + "action_assignments":[{ "action" : {"name": "start"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}]}, + { "action" : {"name": "stop"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}]}], + + + + + + + "meta_rules":[{"name":"mls", "description": "", + "subject_categories": [{"name": "subject-security-level"}], + "object_categories": [{"name": "object-security-level"}], + "action_categories": [{"name": "action-type"}] + }], + + "rules": [{ + "meta_rule": {"name" : "mls"}, + "rule": {"subject_data" : [{"name":"high"}], "object_data": [{"name": "medium"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"MLS policy example"}, + "instructions" : {"decision" : "grant"} + }, { + "meta_rule": {"name" : "mls"}, + "rule": {"subject_data" : [{"name":"high"}], "object_data": [{"name": "low"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"MLS policy example"}, + "instructions" : {"decision" : "grant"} + }, { + "meta_rule": {"name" : "mls"}, + "rule": {"subject_data" : [{"name":"medium"}], "object_data": [{"name": "low"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"MLS policy example"}, + "instructions" : {"decision" : "grant"} + }] + + + + +}
\ No newline at end of file diff --git a/old/moon_manager/tests/functional_pod/json/rbac.json b/old/moon_manager/tests/functional_pod/json/rbac.json new file mode 100644 index 00000000..a75f291b --- /dev/null +++ b/old/moon_manager/tests/functional_pod/json/rbac.json @@ -0,0 +1,85 @@ +{ + "pdps": [{"name" : "pdp_rbac", "keystone_project_id" : "", "description": "", "policies": [{"name": "RBAC policy example"}]}], + + "policies":[{ "name": "RBAC policy example", "genre": "authz", "description": "", "model": {"name": "RBAC"} , "mandatory" :true , "override":true}], + + "models":[{"name":"RBAC", "description":"","meta_rules": [{"name" : "rbac"}], "override":true}], + + + + + + "subjects": [{ "name":"adminuser", "description": "", "extra": {}, "policies": [{ "name": "RBAC policy example"}]} , + { "name": "user1", "description": "", "extra": {}, "policies": [{ "name": "RBAC policy example"}] }, + { "name": "public", "description": "", "extra": {}, "policies": [] }], + + "subject_categories": [{ "name":"role", "description": "" }], + + "subject_data": [{ "name":"admin", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "role"}}, + { "name":"employee", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "role"}}, + { "name":"*", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "role"}}], + + "subject_assignments":[{ "subject" : {"name": "adminuser"}, "category" : {"name": "role"}, "assignments": [{"name" : "admin"}, {"name" : "employee"}, {"name" : "*"}]}, + { "subject" : {"name": "user1"}, "category" : {"name": "role"}, "assignments": [{"name" : "employee"}, {"name" : "*"}] }], + + + + + + + "objects": [{ "name":"vm0", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]} , + {"name": "vm1", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]} ], + + "object_categories": [{"name":"id", "description": ""}], + + "object_data": [{ "name":"vm0", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "id"}}, + { "name":"vm1", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "id"}}, + { "name":"*", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "id"}}], + + "object_assignments":[{ "object" : {"name": "vm0"}, "category" : {"name": "id"}, "assignments": [{"name" : "vm0"}, {"name" : "*"}]}, + { "object" : {"name": "vm1"}, "category" : {"name": "id"}, "assignments": [{"name" : "vm1"}, {"name" : "*"}]}], + + + + + + + "actions": [{ "name": "start", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]} , + { "name": "stop", "description": "", "extra": {}, "policies": [{"name": "RBAC policy example"}]}], + + "action_categories": [{"name":"action-type", "description": ""}], + + "action_data": [{"name":"vm-action", "description": "", "policies": [{"name": "RBAC policy example"}], "category": {"name": "action-type"}}, + {"name":"*", "description": "", "policies": [{"name" :"RBAC policy example"}], "category": {"name": "action-type"}}], + + "action_assignments":[{ "action" : {"name": "start"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}, {"name" : "*"}]}, + { "action" : {"name": "stop"}, "category" : {"name": "action-type"}, "assignments": [{"name" : "vm-action"}, {"name" : "*"}]}], + + + + + + + "meta_rules":[{"name":"rbac", "description": "", + "subject_categories": [{"name": "role"}], + "object_categories": [{"name": "id"}], + "action_categories": [{"name": "action-type"}] + }], + + "rules": [{ + "meta_rule": {"name" : "rbac"}, + "rule": {"subject_data" : [{"name":"admin"}], "object_data": [{"name": "vm0"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"RBAC policy example"}, + "instructions" : {"decision" : "grant"}, + "enabled": true + }, { + "meta_rule": {"name" : "rbac"}, + "rule": {"subject_data" : [{"name":"employee"}], "object_data": [{"name": "vm1"}], "action_data": [{"name": "vm-action"}]}, + "policy": {"name" :"RBAC policy example"}, + "instructions" : {"decision" : "grant"} + }] + + + + +}
\ No newline at end of file diff --git a/old/moon_manager/tests/functional_pod/run_functional_tests.sh b/old/moon_manager/tests/functional_pod/run_functional_tests.sh new file mode 100644 index 00000000..960e9480 --- /dev/null +++ b/old/moon_manager/tests/functional_pod/run_functional_tests.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +if [ -d /data/dist ]; +then + pip install /data/dist/*.tar.gz --upgrade + pip install /data/dist/*.whl --upgrade +fi + + +cd /data/tests/functional_pod +pytest . diff --git a/old/moon_manager/tests/functional_pod/test_manager.py b/old/moon_manager/tests/functional_pod/test_manager.py new file mode 100644 index 00000000..454d861b --- /dev/null +++ b/old/moon_manager/tests/functional_pod/test_manager.py @@ -0,0 +1,116 @@ +import json +import requests + +def test_import_rbac(context): + files = {'file': open('/data/tests/functional_pod/json/rbac.json', 'r')} + req = requests.post("http://{}:{}/import".format( + context.get("hostname"), + context.get("port")) + , files=files) + print(req) + result = req.json() + print(result) + req.raise_for_status() + +def test_import_mls(context): + files = {'file': open('/data/tests/functional_pod/json/mls.json', 'r')} + req = requests.post("http://{}:{}/import".format( + context.get("hostname"), + context.get("port")) + , files=files) + req.raise_for_status() + + +def test_export_rbac(context): + test_import_rbac(context) + req = requests.get("http://{}:{}/export".format( + context.get("hostname"), + context.get("port")), + data={"filename":"/data/tests/functional_pod/json/rbac_export.json"} + ) + req.raise_for_status() + + +def test_export_mls(context): + test_import_mls(context) + req = requests.get("http://{}:{}/export".format( + context.get("hostname"), + context.get("port")), + data={"filename":"/data/tests/functional_pod/json/mls_export.json"} + ) + req.raise_for_status() + + +def get_json(data): + return json.loads(data.decode("utf-8")) + + +def get_pdp(context): + req = requests.get("http://{}:{}/pdp".format( + context.get("hostname"), + context.get("port")), + timeout=3) + pdp = req.json() + return req, pdp + + +def add_pdp(context, data): + req = requests.post("http://{}:{}/pdp".format( + context.get("hostname"), + context.get("port")), + data=json.dumps(data), + headers={'Content-Type': 'application/json'}, + timeout=3) + pdp = req.json() + return req, pdp + + +def delete_pdp(context, key): + req = requests.delete("http://{}:{}/pdp/{}".format( + context.get("hostname"), + context.get("port"), key), + timeout=3) + return req + + +def delete_pdp_without_id(context): + req = requests.delete("http://{}:{}/pdp/{}".format( + context.get("hostname"), + context.get("port"), ""), + timeout=3) + return req + + +def test_get_pdp(context): + req, pdp = get_pdp(context) + assert req.status_code == 200 + assert isinstance(pdp, dict) + assert "pdps" in pdp + + +def test_add_pdp(context): + data = { + "name": "testuser", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + req, pdp = add_pdp(context, data) + assert req.status_code == 200 + assert isinstance(pdp, dict) + value = list(pdp["pdps"].values())[0] + assert "pdps" in pdp + assert value['name'] == "testuser" + assert value["description"] == "description of {}".format("testuser") + assert value["keystone_project_id"] == "keystone_project_id" + + +def test_delete_pdp(context): + request, pdp = get_pdp(context) + success_req = None + for key, value in pdp['pdps'].items(): + if value['name'] == "testuser": + success_req = delete_pdp(context, key) + break + assert success_req + assert success_req.status_code == 200 diff --git a/old/moon_manager/tests/functional_pod/test_models.py b/old/moon_manager/tests/functional_pod/test_models.py new file mode 100644 index 00000000..8b4ceef5 --- /dev/null +++ b/old/moon_manager/tests/functional_pod/test_models.py @@ -0,0 +1,79 @@ +import json +import requests + + +def get_models(context): + req = requests.get("http://{}:{}/models".format( + context.get("hostname"), + context.get("port")), + timeout=3) + models = req.json() + return req, models + + +def add_models(context, name): + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": ["meta_rule_id1", "meta_rule_id2"] + } + req = requests.post("http://{}:{}/models".format( + context.get("hostname"), + context.get("port")), + data=json.dumps(data), + headers={'Content-Type': 'application/json'}, + timeout=3) + models = req.json() + return req, models + + +def delete_models(context, name): + _, models = get_models(context) + request = None + for key, value in models['models'].items(): + if value['name'] == name: + request = requests.delete("http://{}:{}/models/{}".format( + context.get("hostname"), + context.get("port"), + key), + timeout=3) + break + return request + + +def delete_models_without_id(context): + req = requests.delete("http://{}:{}/models/{}".format( + context.get("hostname"), + context.get("port"), + ""), + timeout=3) + return req + + +def test_get_models(context): + req, models = get_models(context) + assert req.status_code == 200 + assert isinstance(models, dict) + assert "models" in models + + +def test_add_models(context): + req, models = add_models(context, "testuser") + assert req.status_code == 200 + assert isinstance(models, dict) + value = list(models["models"].values())[0] + assert "models" in models + assert value['name'] == "testuser" + assert value["description"] == "description of {}".format("testuser") + assert value["meta_rules"][0] == "meta_rule_id1" + + +def test_delete_models(context): + req = delete_models(context, "testuser") + assert req.status_code == 200 + + +def test_delete_models_without_id(context): + req = delete_models_without_id(context) + assert req.status_code == 500 + diff --git a/old/moon_manager/tests/unit_python/api/import_export_utilities.py b/old/moon_manager/tests/unit_python/api/import_export_utilities.py new file mode 100644 index 00000000..2ee2627d --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/import_export_utilities.py @@ -0,0 +1,202 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import api.test_unit_models as test_models +import api.test_policies as test_policies +import api.test_perimeter as test_perimeter +import api.test_meta_data as test_categories +import api.test_data as test_data +import api.test_meta_rules as test_meta_rules +import api.test_assignement as test_assignments +import api.test_rules as test_rules +import logging + +logger = logging.getLogger("moon.manager.test.api." + __name__) + + +def clean_models(client): + req, models = test_models.get_models(client) + for key in models["models"]: + client.delete("/models/{}".format(key)) + + +def clean_policies(client): + req, policies = test_policies.get_policies(client) + for key in policies["policies"]: + req = client.delete("/policies/{}".format(key)) + assert req.status_code == 200 + + +def clean_subjects(client): + subjects = test_perimeter.get_subjects(client) + logger.info("subjects {}".format(subjects)) + for key in subjects[1]["subjects"]: + subject = subjects[1]["subjects"][key] + policy_keys = subject["policy_list"] + logger.info("subjects policy_keys {}".format(policy_keys)) + for policy_key in policy_keys: + client.delete("/policies/{}/subjects/{}".format(policy_key, key)) + + +def clean_objects(client): + objects = test_perimeter.get_objects(client) + logger.info("objects {}".format(objects)) + for key in objects[1]["objects"]: + object_ = objects[1]["objects"][key] + policy_keys = object_["policy_list"] + logger.info("objects policy_keys {}".format(policy_keys)) + for policy_key in policy_keys: + client.delete("/policies/{}/objects/{}".format(policy_key, key)) + + +def clean_actions(client): + actions = test_perimeter.get_actions(client) + actions = test_perimeter.get_actions(client) + logger.info("actions {}".format(actions)) + for key in actions[1]["actions"]: + action = actions[1]["actions"][key] + policy_keys = action["policy_list"] + logger.info("action policy_keys {}".format(policy_keys)) + for policy_key in policy_keys: + client.delete("/policies/{}/actions/{}".format(policy_key, key)) + + +def clean_subject_categories(client): + req, categories = test_categories.get_subject_categories(client) + logger.info(categories) + for key in categories["subject_categories"]: + client.delete("/subject_categories/{}".format(key)) + + +def clean_object_categories(client): + req, categories = test_categories.get_object_categories(client) + logger.info(categories) + for key in categories["object_categories"]: + client.delete("/object_categories/{}".format(key)) + + +def clean_action_categories(client): + req, categories = test_categories.get_action_categories(client) + logger.info(categories) + for key in categories["action_categories"]: + client.delete("/action_categories/{}".format(key)) + + +def clean_subject_data(client): + req, policies = test_policies.get_policies(client) + logger.info("clean_subject_data on {}".format(policies)) + for policy_key in policies["policies"]: + req, data = test_data.get_subject_data(client, policy_id=policy_key) + logger.info("============= data {}".format(data)) + for data_item in data["subject_data"]: + if data_item["data"]: + for data_id in data_item["data"]: + logger.info("============= Deleting {}/{}".format(policy_key, data_id)) + client.delete("/policies/{}/subject_data/{}/{}".format(policy_key, data_item['category_id'], data_id)) + + +def clean_object_data(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, data = test_data.get_object_data(client, policy_id=policy_key) + for data_item in data["object_data"]: + if data_item["data"]: + for data_id in data_item["data"]: + logger.info("============= object_data {}/{}".format(policy_key, data_id)) + client.delete("/policies/{}/object_data/{}/{}".format(policy_key, data_item['category_id'], data_id)) + + +def clean_action_data(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, data = test_data.get_action_data(client, policy_id=policy_key) + for data_item in data["action_data"]: + if data_item["data"]: + for data_id in data_item["data"]: + logger.info("============= action_data {}/{}".format(policy_key, data_id)) + client.delete("/policies/{}/action_data/{}/{}".format(policy_key, data_item['category_id'], data_id)) + + +def clean_meta_rule(client): + req, meta_rules = test_meta_rules.get_meta_rules(client) + meta_rules = meta_rules["meta_rules"] + for meta_rule_key in meta_rules: + logger.info("clean_meta_rule.meta_rule_key={}".format(meta_rule_key)) + logger.info("clean_meta_rule.meta_rule={}".format(meta_rules[meta_rule_key])) + client.delete("/meta_rules/{}".format(meta_rule_key)) + + +def clean_subject_assignments(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, assignments = test_assignments.get_subject_assignment(client, policy_key) + for key in assignments["subject_assignments"]: + subject_key = assignments["subject_assignments"][key]["subject_id"] + cat_key = assignments["subject_assignments"][key]["category_id"] + data_keys = assignments["subject_assignments"][key]["assignments"] + for data_key in data_keys: + client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_key, subject_key, + cat_key, data_key)) + + +def clean_object_assignments(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, assignments = test_assignments.get_object_assignment(client, policy_key) + for key in assignments["object_assignments"]: + object_key = assignments["object_assignments"][key]["object_id"] + cat_key = assignments["object_assignments"][key]["category_id"] + data_keys = assignments["object_assignments"][key]["assignments"] + for data_key in data_keys: + client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_key, object_key, + cat_key, data_key)) + + +def clean_action_assignments(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, assignments = test_assignments.get_action_assignment(client, policy_key) + for key in assignments["action_assignments"]: + action_key = assignments["action_assignments"][key]["action_id"] + cat_key = assignments["action_assignments"][key]["category_id"] + data_keys = assignments["action_assignments"][key]["assignments"] + for data_key in data_keys: + client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_key, action_key, + cat_key, data_key)) + + +def clean_rules(client): + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, rules = test_rules.get_rules(client, policy_key) + rules = rules["rules"]["rules"] + for rule_key in rules: + req = client.delete("/policies/{}/rules/{}".format(policy_key, rule_key["id"])) + + +def clean_all(client): + clean_rules(client) + + clean_subject_assignments(client) + clean_object_assignments(client) + clean_action_assignments(client) + + + clean_subject_data(client) + clean_object_data(client) + clean_action_data(client) + + clean_actions(client) + clean_objects(client) + clean_subjects(client) + + clean_subject_categories(client) + clean_object_categories(client) + clean_action_categories(client) + + + clean_policies(client) + clean_models(client) + clean_meta_rule(client)
\ No newline at end of file diff --git a/old/moon_manager/tests/unit_python/api/meta_data_test.py b/old/moon_manager/tests/unit_python/api/meta_data_test.py new file mode 100644 index 00000000..8609f0b5 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/meta_data_test.py @@ -0,0 +1,238 @@ +import json +import api.utilities as utilities + +#subject_categories_test + + +def get_subject_categories(client): + req = client.get("/subject_categories") + subject_categories = utilities.get_json(req.data) + return req, subject_categories + + +def add_subject_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/subject_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_categories = utilities.get_json(req.data) + return req, subject_categories + + +def delete_subject_categories(client, name): + request, subject_categories = get_subject_categories(client) + for key, value in subject_categories['subject_categories'].items(): + if value['name'] == name: + req = client.delete("/subject_categories/{}".format(key)) + break + return req + + +def delete_subject_categories_without_id(client): + req = client.delete("/subject_categories/{}".format("")) + return req + + +def test_get_subject_categories(): + client = utilities.register_client() + req, subject_categories = get_subject_categories(client) + assert req.status_code == 200 + assert isinstance(subject_categories, dict) + assert "subject_categories" in subject_categories + + +def test_add_subject_categories(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(subject_categories, dict) + value = list(subject_categories["subject_categories"].values())[0] + assert "subject_categories" in subject_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_subject_categories_with_empty_user(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_subject_categories_with_user_contain_space(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + +def test_delete_subject_categories(): + client = utilities.register_client() + req = delete_subject_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_subject_categories_without_id(): + client = utilities.register_client() + req = delete_subject_categories_without_id(client) + assert req.status_code == 500 + + +#--------------------------------------------------------------------------- +#object_categories_test + +def get_object_categories(client): + req = client.get("/object_categories") + object_categories = utilities.get_json(req.data) + return req, object_categories + + +def add_object_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/object_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_categories = utilities.get_json(req.data) + return req, object_categories + + +def delete_object_categories(client, name): + request, object_categories = get_object_categories(client) + for key, value in object_categories['object_categories'].items(): + if value['name'] == name: + req = client.delete("/object_categories/{}".format(key)) + break + return req + + +def delete_object_categories_without_id(client): + req = client.delete("/object_categories/{}".format("")) + return req + + +def test_get_object_categories(): + client = utilities.register_client() + req, object_categories = get_object_categories(client) + assert req.status_code == 200 + assert isinstance(object_categories, dict) + assert "object_categories" in object_categories + + +def test_add_object_categories(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(object_categories, dict) + value = list(object_categories["object_categories"].values())[0] + assert "object_categories" in object_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_object_categories_with_empty_user(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_object_categories_with_user_contain_space(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + +def test_delete_object_categories(): + client = utilities.register_client() + req = delete_object_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_object_categories_without_id(): + client = utilities.register_client() + req = delete_object_categories_without_id(client) + assert req.status_code == 500 + + +#--------------------------------------------------------------------------- +#action_categories_test + +def get_action_categories(client): + req = client.get("/action_categories") + action_categories = utilities.get_json(req.data) + return req, action_categories + + +def add_action_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/action_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_categories = utilities.get_json(req.data) + return req, action_categories + + +def delete_action_categories(client, name): + request, action_categories = get_action_categories(client) + for key, value in action_categories['action_categories'].items(): + if value['name'] == name: + req = client.delete("/action_categories/{}".format(key)) + break + return req + + +def delete_action_categories_without_id(client): + req = client.delete("/action_categories/{}".format("")) + return req + + +def test_get_action_categories(): + client = utilities.register_client() + req, action_categories = get_action_categories(client) + assert req.status_code == 200 + assert isinstance(action_categories, dict) + assert "action_categories" in action_categories + + +def test_add_action_categories(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(action_categories, dict) + value = list(action_categories["action_categories"].values())[0] + assert "action_categories" in action_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_action_categories_with_empty_user(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_action_categories_with_user_contain_space(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + +def test_delete_action_categories(): + client = utilities.register_client() + req = delete_action_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_action_categories_without_id(): + client = utilities.register_client() + req = delete_action_categories_without_id(client) + assert req.status_code == 500 diff --git a/old/moon_manager/tests/unit_python/api/meta_rules_test.py b/old/moon_manager/tests/unit_python/api/meta_rules_test.py new file mode 100644 index 00000000..a87c16f3 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/meta_rules_test.py @@ -0,0 +1,162 @@ +import json +import api.utilities as utilities + + +def get_meta_rules(client): + req = client.get("/meta_rules") + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules(client, name): + data = { + "name": name, + "subject_categories": ["subject_category_id1", + "subject_category_id2"], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules_without_subject_category_ids(client, name): + data = { + "name": name, + "subject_categories": [], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules(client, name, metaRuleId): + data = { + "name": name, + "subject_categories": ["subject_category_id1_update", + "subject_category_id2_update"], + "object_categories": ["object_category_id1_update"], + "action_categories": ["action_category_id1_update"] + } + req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules_without_subject_category_ids(client, name): + data = { + "name": name, + "subject_categories": [], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def delete_meta_rules(client, name): + request, meta_rules = get_meta_rules(client) + for key, value in meta_rules['meta_rules'].items(): + if value['name'] == name: + req = client.delete("/meta_rules/{}".format(key)) + break + return req + + +def delete_meta_rules_without_id(client): + req = client.delete("/meta_rules/{}".format("")) + return req + + +def test_get_meta_rules(): + client = utilities.register_client() + req, meta_rules = get_meta_rules(client) + assert req.status_code == 200 + assert isinstance(meta_rules, dict) + assert "meta_rules" in meta_rules + + +def test_add_meta_rules(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "testuser") + assert req.status_code == 200 + assert isinstance(meta_rules, dict) + value = list(meta_rules["meta_rules"].values())[0] + assert "meta_rules" in meta_rules + assert value['name'] == "testuser" + assert value["subject_categories"][0] == "subject_category_id1" + assert value["object_categories"][0] == "object_category_id1" + assert value["action_categories"][0] == "action_category_id1" + + +def test_add_meta_rules_with_empty_user(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "Empty String" + + +def test_add_meta_rules_with_user_contain_space(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "test user") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == "String contains space" + + +def test_add_meta_rules_without_subject_categories(): + client = utilities.register_client() + req, meta_rules = add_meta_rules_without_subject_category_ids(client, "testuser") + assert req.status_code == 500 + assert json.loads(req.data)["message"] == 'Empty Container' + + +def test_delete_meta_rules(): + client = utilities.register_client() + req = delete_meta_rules(client, "testuser") + assert req.status_code == 200 + + +def test_delete_meta_rules_without_id(): + client = utilities.register_client() + req = delete_meta_rules_without_id(client) + assert req.status_code == 500 + + +def test_update_meta_rules(): + client = utilities.register_client() + req = add_meta_rules(client, "testuser") + meta_rule_id = list(req[1]['meta_rules'])[0] + req_update = update_meta_rules(client, "testuser", meta_rule_id) + assert req_update[0].status_code == 200 + value = list(req_update[1]["meta_rules"].values())[0] + assert value["subject_categories"][0] == "subject_category_id1_update" + delete_meta_rules(client, "testuser") + get_meta_rules(client) + + +def test_update_meta_rules_without_id(): + client = utilities.register_client() + req_update = update_meta_rules(client, "testuser", "") + assert req_update[0].status_code == 500 + + +def test_update_meta_rules_without_user(): + client = utilities.register_client() + req_update = update_meta_rules(client, "", "") + assert req_update[0].status_code == 500 + assert json.loads(req_update[0].data)["message"] == "Empty String" + + +def test_update_meta_rules_without_subject_categories(): + client = utilities.register_client() + req_update = update_meta_rules_without_subject_category_ids(client, "testuser") + assert req_update[0].status_code == 500 + assert json.loads(req_update[0].data)["message"] == "Empty Container" diff --git a/old/moon_manager/tests/unit_python/api/test_assignement.py b/old/moon_manager/tests/unit_python/api/test_assignement.py new file mode 100644 index 00000000..b56fb420 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_assignement.py @@ -0,0 +1,280 @@ +import api.utilities as utilities +import json +from helpers import data_builder as builder +from uuid import uuid4 + + +# subject_categories_test + + +def get_subject_assignment(client, policy_id): + req = client.get("/policies/{}/subject_assignments".format(policy_id)) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + subject_id = builder.create_subject(policy_id) + data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + data = { + "id": subject_id, + "category_id": subject_category_id, + "data_id": data_id + } + req = client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment_without_cat_id(client): + + data = { + "id": "subject_id", + "category_id": "", + "data_id": "data_id" + } + req = client.post("/policies/{}/subject_assignments".format("1111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def delete_subject_assignment(client, policy_id, sub_id, cat_id,data_id): + req = client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_id, sub_id, cat_id,data_id)) + return req + + +def test_add_subject_assignment(): + client = utilities.register_client() + req, subject_assignment = add_subject_assignment(client) + assert req.status_code == 200 + assert isinstance(subject_assignment, dict) + assert "subject_assignments" in subject_assignment + + +# def test_add_subject_assignment_without_cat_id(): +# client = utilities.register_client() +# req, subject_assignment = add_subject_assignment_without_cat_id(client) +# assert req.status_code == 400 +# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_get_subject_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(client, policy_id) + assert req.status_code == 200 + assert isinstance(subject_assignment, dict) + assert "subject_assignments" in subject_assignment + + +def test_delete_subject_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(client, policy_id) + value = subject_assignment["subject_assignments"] + _id = list(value.keys())[0] + success_req = delete_subject_assignment(client, + policy_id, + value[_id]['subject_id'], + value[_id]['category_id'], + value[_id]['assignments'][0]) + assert success_req.status_code == 200 + + +def test_delete_subject_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_subject_assignment(client, "", "id1", "111", "data_id1") + assert success_req.status_code == 404 + + +# --------------------------------------------------------------------------- +# object_categories_test + + +def get_object_assignment(client, policy_id): + req = client.get("/policies/{}/object_assignments".format(policy_id)) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + object_id = builder.create_object(policy_id) + data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id) + + data = { + "id": object_id, + "category_id": object_category_id, + "data_id": data_id + } + + req = client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment_without_cat_id(client): + + data = { + "id": "object_id", + "category_id": "", + "data_id": "data_id" + } + req = client.post("/policies/{}/object_assignments".format("1111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def delete_object_assignment(client, policy_id, obj_id, cat_id, data_id): + req = client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_id, obj_id, cat_id, data_id)) + return req + + +def test_get_object_assignment(): + policy_id = builder.get_policy_id_with_object_assignment() + client = utilities.register_client() + req, object_assignment = get_object_assignment(client, policy_id) + assert req.status_code == 200 + assert isinstance(object_assignment, dict) + assert "object_assignments" in object_assignment + + +def test_add_object_assignment(): + client = utilities.register_client() + req, object_assignment = add_object_assignment(client) + assert req.status_code == 200 + assert "object_assignments" in object_assignment + + +# def test_add_object_assignment_without_cat_id(): +# client = utilities.register_client() +# req, object_assignment = add_object_assignment_without_cat_id(client) +# assert req.status_code == 400 +# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_delete_object_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(client, policy_id) + value = object_assignment["object_assignments"] + _id = list(value.keys())[0] + success_req = delete_object_assignment(client, + policy_id, + value[_id]['object_id'], + value[_id]['category_id'], + value[_id]['assignments'][0]) + assert success_req.status_code == 200 + + +def test_delete_object_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_object_assignment(client, "", "id1", "111", "data_id1") + assert success_req.status_code == 404 + + +# --------------------------------------------------------------------------- +# action_categories_test + + +def get_action_assignment(client, policy_id): + req = client.get("/policies/{}/action_assignments".format(policy_id)) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + action_id = builder.create_action(policy_id) + data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id) + + data = { + "id": action_id, + "category_id": action_category_id, + "data_id": data_id + } + req = client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment_without_cat_id(client): + + data = { + "id": "action_id", + "category_id": "", + "data_id": "data_id" + } + req = client.post("/policies/{}/action_assignments".format("1111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def delete_action_assignment(client, policy_id, action_id, cat_id, data_id): + req = client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_id, action_id, cat_id, data_id)) + return req + + +def test_get_action_assignment(): + policy_id = builder.get_policy_id_with_action_assignment() + client = utilities.register_client() + req, action_assignment = get_action_assignment(client, policy_id) + assert req.status_code == 200 + assert isinstance(action_assignment, dict) + assert "action_assignments" in action_assignment + + +def test_add_action_assignment(): + client = utilities.register_client() + req, action_assignment = add_action_assignment(client) + assert req.status_code == 200 + assert "action_assignments" in action_assignment + + +# def test_add_action_assignment_without_cat_id(): +# client = utilities.register_client() +# req, action_assignment = add_action_assignment_without_cat_id(client) +# assert req.status_code == 400 +# assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_delete_action_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(client, policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_action_assignment(client, + policy_id, + value[id]['action_id'], + value[id]['category_id'], + value[id]['assignments'][0]) + assert success_req.status_code == 200 + + +def test_delete_action_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_action_assignment(client, "", "id1", "111", "data_id1") + assert success_req.status_code == 404 + +# --------------------------------------------------------------------------- diff --git a/old/moon_manager/tests/unit_python/api/test_assignemnt.py b/old/moon_manager/tests/unit_python/api/test_assignemnt.py new file mode 100644 index 00000000..22c727af --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_assignemnt.py @@ -0,0 +1,270 @@ +import api.utilities as utilities +import json +from helpers import data_builder as builder +from uuid import uuid4 + + +# subject_categories_test + + +def get_subject_assignment(client, policy_id): + req = client.get("/policies/{}/subject_assignments".format(policy_id)) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + subject_id = builder.create_subject(policy_id) + data_id = builder.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + data = { + "id": subject_id, + "category_id": subject_category_id, + "data_id": data_id + } + req = client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def add_subject_assignment_without_cat_id(client): + + data = { + "id": "subject_id", + "category_id": "", + "data_id": "data_id" + } + req = client.post("/policies/{}/subject_assignments".format("1111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_assignment = utilities.get_json(req.data) + return req, subject_assignment + + +def delete_subject_assignment(client, policy_id, sub_id, cat_id,data_id): + req = client.delete("/policies/{}/subject_assignments/{}/{}/{}".format(policy_id, sub_id, cat_id,data_id)) + return req + + +def test_add_subject_assignment(): + client = utilities.register_client() + req, subject_assignment = add_subject_assignment(client) + assert req.status_code == 200 + assert isinstance(subject_assignment, dict) + assert "subject_assignments" in subject_assignment + + +def test_add_subject_assignment_without_cat_id(): + client = utilities.register_client() + req, subject_assignment = add_subject_assignment_without_cat_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_get_subject_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(client, policy_id) + assert req.status_code == 200 + assert isinstance(subject_assignment, dict) + assert "subject_assignments" in subject_assignment + + +def test_delete_subject_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_subject_assignment() + req, subject_assignment = get_subject_assignment(client, policy_id) + value = subject_assignment["subject_assignments"] + id = list(value.keys())[0] + success_req = delete_subject_assignment(client, policy_id, value[id]['subject_id'], value[id]['category_id'],value[id]['assignments'][0]) + assert success_req.status_code == 200 + + +def test_delete_subject_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_subject_assignment(client, "", "id1", "111" ,"data_id1") + assert success_req.status_code == 404 + + +# --------------------------------------------------------------------------- + +# object_categories_test + + +def get_object_assignment(client, policy_id): + req = client.get("/policies/{}/object_assignments".format(policy_id)) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + object_id = builder.create_object(policy_id) + data_id = builder.create_object_data(policy_id=policy_id, category_id=object_category_id) + + data = { + "id": object_id, + "category_id": object_category_id, + "data_id": data_id + } + + req = client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def add_object_assignment_without_cat_id(client): + + data = { + "id": "object_id", + "category_id": "", + "data_id": "data_id" + } + req = client.post("/policies/{}/object_assignments".format("1111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_assignment = utilities.get_json(req.data) + return req, object_assignment + + +def delete_object_assignment(client, policy_id, obj_id, cat_id, data_id): + req = client.delete("/policies/{}/object_assignments/{}/{}/{}".format(policy_id, obj_id, cat_id, data_id)) + return req + + +def test_get_object_assignment(): + policy_id = builder.get_policy_id_with_object_assignment() + client = utilities.register_client() + req, object_assignment = get_object_assignment(client, policy_id) + assert req.status_code == 200 + assert isinstance(object_assignment, dict) + assert "object_assignments" in object_assignment + + +def test_add_object_assignment(): + client = utilities.register_client() + req, object_assignment = add_object_assignment(client) + assert req.status_code == 200 + assert "object_assignments" in object_assignment + + +def test_add_object_assignment_without_cat_id(): + client = utilities.register_client() + req, object_assignment = add_object_assignment_without_cat_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_delete_object_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_object_assignment() + req, object_assignment = get_object_assignment(client, policy_id) + value = object_assignment["object_assignments"] + id = list(value.keys())[0] + success_req = delete_object_assignment(client, policy_id, value[id]['object_id'], value[id]['category_id'],value[id]['assignments'][0]) + assert success_req.status_code == 200 + + +def test_delete_object_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_object_assignment(client, "", "id1", "111","data_id1") + assert success_req.status_code == 404 + + +# --------------------------------------------------------------------------- + +# action_categories_test + + +def get_action_assignment(client, policy_id): + req = client.get("/policies/{}/action_assignments".format(policy_id)) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + action_id = builder.create_action(policy_id) + data_id = builder.create_action_data(policy_id=policy_id, category_id=action_category_id) + + data = { + "id": action_id, + "category_id": action_category_id, + "data_id": data_id + } + req = client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def add_action_assignment_without_cat_id(client): + + data = { + "id": "action_id", + "category_id": "", + "data_id": "data_id" + } + req = client.post("/policies/{}/action_assignments".format("1111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_assignment = utilities.get_json(req.data) + return req, action_assignment + + +def delete_action_assignment(client, policy_id, action_id, cat_id, data_id): + req = client.delete("/policies/{}/action_assignments/{}/{}/{}".format(policy_id, action_id, cat_id, data_id)) + return req + + +def test_get_action_assignment(): + policy_id = builder.get_policy_id_with_action_assignment() + client = utilities.register_client() + req, action_assignment = get_action_assignment(client, policy_id) + assert req.status_code == 200 + assert isinstance(action_assignment, dict) + assert "action_assignments" in action_assignment + + +def test_add_action_assignment(): + client = utilities.register_client() + req, action_assignment = add_action_assignment(client) + assert req.status_code == 200 + assert "action_assignments" in action_assignment + + +def test_add_action_assignment_without_cat_id(): + client = utilities.register_client() + req, action_assignment = add_action_assignment_without_cat_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'category_id', [Empty String]" + + +def test_delete_action_assignment(): + client = utilities.register_client() + policy_id = builder.get_policy_id_with_action_assignment() + req, action_assignment = get_action_assignment(client, policy_id) + value = action_assignment["action_assignments"] + id = list(value.keys())[0] + success_req = delete_action_assignment(client, policy_id, value[id]['action_id'], value[id]['category_id'],value[id]['assignments'][0]) + assert success_req.status_code == 200 + + +def test_delete_action_assignment_without_policy_id(): + client = utilities.register_client() + success_req = delete_action_assignment(client, "", "id1", "111" ,"data_id1") + assert success_req.status_code == 404 + +# --------------------------------------------------------------------------- diff --git a/old/moon_manager/tests/unit_python/api/test_data.py b/old/moon_manager/tests/unit_python/api/test_data.py new file mode 100644 index 00000000..433f69e6 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_data.py @@ -0,0 +1,239 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import api.utilities as utilities +import json +from helpers import data_builder as builder +from uuid import uuid4 + +# subject_categories_test + + +def get_subject_data(client, policy_id, category_id=None): + if category_id is None: + req = client.get("/policies/{}/subject_data".format(policy_id)) + else: + req = client.get("/policies/{}/subject_data/{}".format(policy_id, category_id)) + subject_data = utilities.get_json(req.data) + return req, subject_data + + +def add_subject_data(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/policies/{}/subject_data/{}".format(policy_id, subject_category_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_data = utilities.get_json(req.data) + return req, subject_data + + +def delete_subject_data(client, policy_id, category_id, data_id): + req = client.delete("/policies/{}/subject_data/{}/{}".format(policy_id,category_id,data_id)) + return req + + +def test_get_subject_data(): + policy_id = utilities.get_policy_id() + client = utilities.register_client() + req, subject_data = get_subject_data(client, policy_id) + assert req.status_code == 200 + assert isinstance(subject_data, dict) + assert "subject_data" in subject_data + + +def test_add_subject_data(): + client = utilities.register_client() + req, subject_data = add_subject_data(client, "testuser") + assert req.status_code == 200 + assert isinstance(subject_data, dict) + value = subject_data["subject_data"]['data'] + assert "subject_data" in subject_data + id = list(value.keys())[0] + assert value[id]['name'] == "testuser" + assert value[id]['description'] == "description of {}".format("testuser") + + +def test_delete_subject_data(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id,policy_id = builder.create_new_policy() + data_id = builder.create_subject_data(policy_id,subject_category_id) + success_req = delete_subject_data(client, policy_id, subject_category_id, data_id ) + assert success_req.status_code == 200 + + +def test_add_subject_data_with_forbidden_char_in_user(): + client = utilities.register_client() + req, subject_data = add_subject_data(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_subject_data_without_policy_id(): + client = utilities.register_client() + success_req = delete_subject_data(client, "", "", "") + assert success_req.status_code == 404 + +# --------------------------------------------------------------------------- +# object_categories_test + + +def get_object_data(client, policy_id, category_id=None): + if category_id is None: + req = client.get("/policies/{}/object_data".format(policy_id)) + else: + req = client.get("/policies/{}/object_data/{}".format(policy_id, category_id)) + object_data = utilities.get_json(req.data) + return req, object_data + + +def add_object_data(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/policies/{}/object_data/{}".format(policy_id, object_category_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_data = utilities.get_json(req.data) + return req, object_data + + +def delete_object_data(client, policy_id, category_id, data_id): + req = client.delete("/policies/{}/object_data/{}/{}".format(policy_id, category_id, data_id)) + return req + + +def test_get_object_data(): + policy_id = utilities.get_policy_id() + client = utilities.register_client() + req, object_data = get_object_data(client, policy_id) + assert req.status_code == 200 + assert isinstance(object_data, dict) + assert "object_data" in object_data + + +def test_add_object_data(): + client = utilities.register_client() + req, object_data = add_object_data(client, "testuser") + assert req.status_code == 200 + assert isinstance(object_data, dict) + value = object_data["object_data"]['data'] + assert "object_data" in object_data + _id = list(value.keys())[0] + assert value[_id]['name'] == "testuser" + assert value[_id]['description'] == "description of {}".format("testuser") + + +def test_delete_object_data(): + client = utilities.register_client() + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + data_id = builder.create_object_data(policy_id, object_category_id) + + success_req = delete_object_data(client, policy_id, data_id, object_category_id) + assert success_req.status_code == 200 + + +def test_add_object_data_with_forbidden_char_in_user(): + client = utilities.register_client() + req, subject_data = add_object_data(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_object_data_without_policy_id(): + client = utilities.register_client() + success_req = delete_object_data(client, "", "", "") + assert success_req.status_code == 404 + +# --------------------------------------------------------------------------- +# action_categories_test + + +def get_action_data(client, policy_id, category_id=None): + if category_id is None: + req = client.get("/policies/{}/action_data".format(policy_id)) + else: + req = client.get("/policies/{}/action_data/{}".format(policy_id, category_id)) + action_data = utilities.get_json(req.data) + return req, action_data + + +def add_action_data(client, name): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/policies/{}/action_data/{}".format(policy_id, action_category_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_data = utilities.get_json(req.data) + return req, action_data + + +def delete_action_data(client, policy_id, categorgy_id, data_id): + req = client.delete("/policies/{}/action_data/{}/{}".format(policy_id, categorgy_id, data_id)) + return req + + +def test_get_action_data(): + policy_id = utilities.get_policy_id() + client = utilities.register_client() + req, action_data = get_action_data(client, policy_id) + assert req.status_code == 200 + assert isinstance(action_data, dict) + assert "action_data" in action_data + + +def test_add_action_data(): + client = utilities.register_client() + req, action_data = add_action_data(client, "testuser") + assert req.status_code == 200 + assert isinstance(action_data, dict) + value = action_data["action_data"]['data'] + assert "action_data" in action_data + id = list(value.keys())[0] + assert value[id]['name'] == "testuser" + assert value[id]['description'] == "description of {}".format("testuser") + + +def test_delete_action_data(): + client = utilities.register_client() + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + data_id = builder.create_action_data(policy_id, action_category_id) + + success_req = delete_action_data(client, policy_id, data_id, action_category_id) + + assert success_req.status_code == 200 + + +def test_add_action_data_with_forbidden_char_in_user(): + client = utilities.register_client() + req, action_data = add_action_data(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_action_data_without_policy_id(): + client = utilities.register_client() + success_req = delete_action_data(client, "", "", "") + assert success_req.status_code == 404 +# --------------------------------------------------------------------------- diff --git a/old/moon_manager/tests/unit_python/api/test_export.py b/old/moon_manager/tests/unit_python/api/test_export.py new file mode 100644 index 00000000..ac8e8d17 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_export.py @@ -0,0 +1,282 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import json +import api.utilities as utilities +import api.import_export_utilities as import_export_utilities + + +MODEL_WITHOUT_META_RULES = {"models": [{"name": "test model", "description": "model description", "meta_rules": []}]} + +POLICIES = {"models": [{"name": "test model", "description": "", "meta_rules": []}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}]} + +SUBJECTS_OBJECTS_ACTIONS = {"models": [{"name": "test model", "description": "", "meta_rules": []}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}]} + + +SUBJECT_OBJECT_ACTION_CATEGORIES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}]} + +SUBJECT_OBJECT_ACTION_DATA = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}], + "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]} + + +META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "object action description"}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]} + + +ASSIGNMENTS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}], + "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object e0", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action e0", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}], + "object_assignments": [{"object": {"name": "test object e0"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}], + "action_assignments": [{"action": {"name": "test action e0"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}]} + +RULES = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "policy description", "model": {"name" : "test model"}}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}], + "subject_data": [{"name": "test subject data", "description": "subject data description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "test object data", "description": "object data description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "test action data", "description": "action data description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}], + "meta_rules": [{"name": "meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {"field_extra_subject": "value extra subject"}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object e1", "description": "description of the object", "extra": {"field_extra_object": "value extra object"}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action e1", "description": "description of the action", "extra": {"field_extra_action": "value extra action"}, "policies": [{"name": "test policy"}]}], + "subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "test subject data"}]}], + "object_assignments": [{"object": {"name": "test object e1"}, "category": {"name": "test object categories"}, "assignments": [{"name": "test object data"}]}], + "action_assignments": [{"action": {"name": "test action e1"}, "category": {"name": "test action categories"}, "assignments": [{"name": "test action data"}]}], + "rules": [{"meta_rule": {"name": "meta rule"}, "rule": {"subject_data": [{"name": "test subject data"}], "object_data": [{"name": "test object data"}], "action_data": [{"name": "test action data"}]}, "policy": {"name":"test policy"}, "instructions": {"decision": "grant"}, "enabled": True}] + } + + +def test_export_models(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(MODEL_WITHOUT_META_RULES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + + assert "content" in data + assert "models" in data["content"] + assert isinstance(data["content"]["models"], list) + assert len(data["content"]["models"]) == 1 + model = data["content"]["models"][0] + assert model["name"] == "test model" + assert model["description"] == "model description" + assert isinstance(model["meta_rules"], list) + assert len(model["meta_rules"]) == 0 + + +def test_export_policies(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(POLICIES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + + assert "content" in data + assert "policies" in data["content"] + assert isinstance(data["content"]["policies"], list) + assert len(data["content"]["policies"]) == 1 + policy = data["content"]["policies"][0] + assert policy["name"] == "test policy" + assert policy["genre"] == "authz" + assert policy["description"] == "policy description" + assert "model" in policy + assert "name" in policy["model"] + model = policy["model"] + assert model["name"] == "test model" + + +def test_export_subject_object_action(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECTS_OBJECTS_ACTIONS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "s" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + element = data["content"][key][0] + if type_element == "subject": + assert element["name"] == "testuser" + else: + assert element["name"] == "test "+ type_element + assert element["description"] == "description of the " + type_element + assert "policies" in element + assert isinstance(element["policies"], list) + assert len(element["policies"]) == 1 + assert isinstance(element["policies"][0], dict) + assert element["policies"][0]["name"] == "test policy" + assert isinstance(element["extra"], dict) + key_dict = "field_extra_" + type_element + value_dict = "value extra " + type_element + assert key_dict in element["extra"] + assert element["extra"][key_dict] == value_dict + + +def test_export_subject_object_action_categories(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECT_OBJECT_ACTION_CATEGORIES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_categories" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + category = data["content"][key][0] + assert category["name"] == "test " + type_element + " categories" + assert category["description"] == type_element + " category description" + + +def test_export_subject_object_action_data(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(SUBJECT_OBJECT_ACTION_DATA)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_data" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + data_elt = data["content"][key][0] + assert data_elt["name"] == "test " + type_element + " data" + assert data_elt["description"] == type_element + " data description" + assert isinstance(data_elt["policy"], dict) + assert data_elt["policy"]["name"] == "test policy" + assert isinstance(data_elt["category"], dict) + assert data_elt["category"]["name"] == "test " + type_element + " categories" + + +def test_export_assignments(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(ASSIGNMENTS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + type_elements = ["subject", "object", "action"] + for type_element in type_elements: + key = type_element + "_assignments" + assert key in data["content"] + assert isinstance(data["content"][key], list) + assert len(data["content"][key]) == 1 + assignment_elt = data["content"][key][0] + assert type_element in assignment_elt + assert isinstance(assignment_elt[type_element], dict) + if type_element == "subject": + assert assignment_elt[type_element]["name"] == "testuser" + else: + assert assignment_elt[type_element]["name"] == "test " + type_element + " e0" + assert "category" in assignment_elt + assert isinstance(assignment_elt["category"], dict) + assert assignment_elt["category"]["name"] == "test " + type_element + " categories" + assert "assignments" in assignment_elt + assert isinstance(assignment_elt["assignments"], list) + assert len(assignment_elt["assignments"]) == 1 + assert assignment_elt["assignments"][0]["name"] == "test " + type_element + " data" + + import_export_utilities.clean_all(client) + + +def test_export_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(RULES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req = client.get("/export") + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert "content" in data + assert "rules" in data["content"] + assert isinstance(data["content"]["rules"], list) + assert len(data["content"]["rules"]) == 1 + rule = data["content"]["rules"][0] + assert "instructions" in rule + assert "decision" in rule["instructions"] + assert rule["instructions"]["decision"] == "grant" + assert "enabled" in rule + assert rule["enabled"] + assert "meta_rule" in rule + assert rule["meta_rule"]["name"] == "meta rule" + assert "policy" in rule + assert rule["policy"]["name"] == "test policy" + assert "rule" in rule + rule = rule["rule"] + assert "subject_data" in rule + assert isinstance(rule["subject_data"], list) + assert len(rule["subject_data"]) == 1 + assert rule["subject_data"][0]["name"] == "test subject data" + assert "object_data" in rule + assert isinstance(rule["object_data"], list) + assert len(rule["object_data"]) == 1 + assert rule["object_data"][0]["name"] == "test object data" + assert "action_data" in rule + assert isinstance(rule["action_data"], list) + assert len(rule["action_data"]) == 1 + assert rule["action_data"][0]["name"] == "test action data" diff --git a/old/moon_manager/tests/unit_python/api/test_import.py b/old/moon_manager/tests/unit_python/api/test_import.py new file mode 100644 index 00000000..af5f753a --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_import.py @@ -0,0 +1,510 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import api.utilities as utilities +import api.test_unit_models as test_models +import api.test_policies as test_policies +import api.test_meta_data as test_categories +import api.test_data as test_data +import api.test_meta_rules as test_meta_rules +import api.test_assignement as test_assignments +import api.test_rules as test_rules +import api.import_export_utilities as import_export_utilities + +import json + + +MODEL_WITHOUT_META_RULES = [ + {"models": [{"name": "test model", "description": "", "meta_rules": []}]}, + {"models": [{"name": "test model", "description": "new description", "meta_rules": [], "override": True}]}, + {"models": [{"name": "test model", "description": "description not taken into account", "meta_rules": [], "override": False}]} + ] + +POLICIES = [ + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "new description not taken into account", "model": {"name" : "test model"}, "mandatory": True}]}, + {"policies": [{"name": "test policy", "genre": "not authz ?", "description": "generates an exception", "model": {"name" : "test model"}, "override": True}]}, + {"models": [{"name": "test model", "description": "", "meta_rules": []}], "policies": [{"name": "test policy", "genre": "not authz ?", "description": "changes taken into account", "model": {"name" : "test model"}, "override": True}]}, +] + +SUBJECTS = [{"subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": []}]}, + {"subjects": [{"name": "testuser", "description": "new description of the subject", "extra": {"email": "new-email@test.com"}, "policies": [{"name": "test other policy"}]}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": [{"name": "test policy"}]}]}] + + +OBJECTS = [ + {"objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": []}]}, + {"objects": [{"name": "test object", "description": "new description of the object", + "extra": {"test": "test extra"}, + "policies": [{"name": "test other policy"}]}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}]}, +] + + +ACTIONS = [{"actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]}, + {"policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {}, "mandatory": True}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": []}]}, + {"actions": [{"name": "test action", "description": "new description of the action", "extra": {"test": "test extra"}, "policies": [{"name": "test other policy"}]}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {}, "mandatory": False}], "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": [{"name": "test policy"}]}]}] + + +SUBJECT_CATEGORIES = [{"subject_categories": [{"name": "test subject categories", "description": "subject category description"}]}, + {"subject_categories": [{"name": "test subject categories", "description": "new subject category description"}]}] + + +OBJECT_CATEGORIES = [{"object_categories": [{"name": "test object categories", "description": "object category description"}]}, + {"object_categories": [{"name": "test object categories", "description": "new object category description"}]}] + + +ACTION_CATEGORIES = [{"action_categories": [{"name": "test action categories", "description": "action category description"}]}, + {"action_categories": [{"name": "test action categories", "description": "new action category description"}]}] + +# meta_rules import is needed otherwise the search for data do not work !!! +PRE_DATA = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}, {"name": "other good meta rule"}]}], + "policies": [{"name": "test other policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}, {"name": "other test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}, {"name": "other test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "action category description"}, {"name": "other test action categories", "description": "action category description"}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}, + {"name": "other good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "other test subject categories"}], "object_categories": [{"name": "other test object categories"}], "action_categories": [{"name": "other test action categories"}]}]} + +SUBJECT_DATA = [{"subject_data": [{"name": "not valid subject data", "description": "", "policies": [{}], "category": {}}]}, + {"subject_data": [{"name": "not valid subject data", "description": "", "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "subject_data": [{"name": "one valid subject data", "description": "description", "policies": [{}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}]}, + {"subject_data": [{"name": "valid subject data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test subject categories"}}]}] + +OBJECT_DATA = [{"object_data": [{"name": "not valid object data", "description": "", "policies": [{}], "category": {}}]}, + {"object_data": [{"name": "not valid object data", "description": "", "policies": [{}], "category": {"name": "test object categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "object_data": [{"name": "one valid object data", "description": "description", "policies": [{}], "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}]}, + {"object_data": [{"name": "valid object data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test object categories"}}]}] + + +ACTION_DATA = [{"action_data": [{"name": "not valid action data", "description": "", "policies": [{}], "category": {}}]}, + {"action_data": [{"name": "not valid action data", "description": "", "policies": [{}], "category": {"name": "test action categories"}}]}, + {"policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name": "test model"}, "mandatory": True}], "action_data": [{"name": "one valid action data", "description": "description", "policies": [{}], "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "description", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]}, + {"action_data": [{"name": "valid action data", "description": "new description", "policies": [{"name": "test other policy"}], "category": {"name": "test action categories"}}]}] + + +PRE_META_RULES = {"subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "object action description"}]} + +META_RULES = [{"meta_rules" :[{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "not valid category"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "not valid category"}], "action_categories": [{"name": "test action categories"}]}]}, + {"meta_rules": [{"name": "bad meta rule", "description": "not valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "not valid category"}]}]}, + {"meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}]}] + + +PRE_ASSIGNMENTS = {"models": [{"name": "test model", "description": "", "meta_rules": [{"name": "good meta rule"}]}], + "policies": [{"name": "test policy", "genre": "authz", "description": "description", "model": {"name" : "test model"}, "mandatory": True}], + "subject_categories": [{"name": "test subject categories", "description": "subject category description"}], + "object_categories": [{"name": "test object categories", "description": "object category description"}], + "action_categories": [{"name": "test action categories", "description": "object action description"}], + "subjects": [{"name": "testuser", "description": "description of the subject", "extra": {}, "policies": [{"name": "test policy"}]}], + "objects": [{"name": "test object", "description": "description of the object", "extra": {}, "policies": [{"name": "test policy"}]}], + "actions": [{"name": "test action", "description": "description of the action", "extra": {}, "policies": [{"name": "test policy"}]}], + "meta_rules": [{"name": "good meta rule", "description": "valid meta rule", "subject_categories": [{"name": "test subject categories"}], "object_categories": [{"name": "test object categories"}], "action_categories": [{"name": "test action categories"}]}], + "subject_data": [{"name": "subject data", "description": "test subject data", "policies": [{"name": "test policy"}], "category": {"name": "test subject categories"}}], + "object_data": [{"name": "object data", "description": "test object data", "policies": [{"name": "test policy"}], "category": {"name": "test object categories"}}], + "action_data": [{"name": "action data", "description": "test action data", "policies": [{"name": "test policy"}], "category": {"name": "test action categories"}}]} + + +SUBJECT_ASSIGNMENTS = [{"subject_assignments": [{"subject": {"name": "unknonw"}, "category" : {"name": "test subject categories"}, "assignments": [{"name": "subject data"}]}]}, + {"subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "unknown"}, "assignments": [{"name": "subject data"}]}]}, + {"subject_assignments": [{"subject": {"name": "testuser"}, "category" : {"name": "test subject categories"}, "assignments": [{"name": "unknwon"}]}]}, + {"subject_assignments": [{"subject": {"name": "testuser"}, "category": {"name": "test subject categories"}, "assignments": [{"name": "subject data"}]}]}] + +OBJECT_ASSIGNMENTS = [{"object_assignments": [{"object": {"name": "unknown"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "object data"}]}]}, + {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "unknown"}, "assignments": [{"name": "object data"}]}]}, + {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "unknown"}]}]}, + {"object_assignments": [{"object": {"name": "test object"}, "category" : {"name": "test object categories"}, "assignments": [{"name": "object data"}]}]}] + +ACTION_ASSIGNMENTS = [{"action_assignments": [{"action": {"name": "unknown"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "action data"}]}]}, + {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "unknown"}, "assignments": [{"name": "action data"}]}]}, + {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "unknown"}]}]}, + {"action_assignments": [{"action": {"name": "test action"}, "category" : {"name": "test action categories"}, "assignments": [{"name": "action data"}]}]}] + +RULES = [{"rules": [{"meta_rule": {"name": "unknown meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "unknown policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "unknown subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "unknown object data"}], "action_data": [{"name": "action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "unknown action data"}]}}]}, + {"rules": [{"meta_rule": {"name": "good meta rule"}, "policy": {"name": "test policy"}, "instructions": {"decision": "grant"}, "enabled": True, "rule": {"subject_data": [{"name": "subject data"}], "object_data": [{"name": "object data"}], "action_data": [{"name": "action data"}]}}]}] + + +def test_import_models_without_new_meta_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + counter = 0 + for models_description in MODEL_WITHOUT_META_RULES: + req = client.post("/import", content_type='application/json', data=json.dumps(models_description)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + req, models = test_models.get_models(client) + models = models["models"] + assert len(list(models.keys())) == 1 + values = list(models.values()) + assert values[0]["name"] == "test model" + if counter == 0: + assert len(values[0]["description"]) == 0 + if counter == 1 or counter == 2: + assert values[0]["description"] == "new description" + counter = counter + 1 + import_export_utilities.clean_all(client) + + +def test_import_policies(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + counter = -1 + for policy_description in POLICIES: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(policy_description)) + try: + data = utilities.get_json(req.data) + assert data == "Import ok !" + except Exception: + assert counter == 2 # this is an expected failure + continue + + req, policies = test_policies.get_policies(client) + policies = policies["policies"] + assert len(list(policies.keys())) == 1 + values = list(policies.values()) + assert values[0]["name"] == "test policy" + if counter < 3: + assert values[0]["genre"] == "authz" + assert values[0]["description"] == "description" + else: + assert values[0]["genre"] == "not authz ?" + assert values[0]["description"] == "changes taken into account" + assert len(values[0]["model_id"]) > 0 + import_export_utilities.clean_all(client) + + +def test_import_subject_object_action(): + client = utilities.register_client() + type_elements = ["object", "action"] + + for type_element in type_elements: + import_export_utilities.clean_all(client) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECTS + clean_method = import_export_utilities.clean_subjects + name = "testuser" + key_extra = "email" + value_extra = "new-email@test.com" + elif type_element == "object": + elements = OBJECTS + clean_method = import_export_utilities.clean_objects + name = "test object" + key_extra = "test" + value_extra = "test extra" + else: + elements = ACTIONS + clean_method = import_export_utilities.clean_actions + name = "test action" + key_extra = "test" + value_extra = "test extra" + + for element in elements: + counter = counter + 1 + if counter == 2 or counter == 4: + clean_method(client) + + + if counter == 3: + req = client.patch("/{}s/{}".format(type_element,perimeter_id), content_type='application/json', + data=json.dumps( + element["{}s".format(type_element)][0])) + else : + req = client.post("/import", content_type='application/json', + data=json.dumps(element)) + if counter < 2: + assert req.status_code == 500 + continue + + try: + data = utilities.get_json(req.data) + except Exception as e: + assert False + #assert counter < 2 # Â this is an expected failure + #continue + + if counter != 3: + assert data == "Import ok !" + get_elements = utilities.get_json(client.get("/"+type_element + "s").data) + get_elements = get_elements[type_element + "s"] + + perimeter_id = list(get_elements.keys())[0] + + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == name + if counter == 2 or counter == 4: + assert values[0]["description"] == "description of the " + type_element + #assert not values[0]["extra"] + if counter == 3: + assert values[0]["description"] == "new description of the " + type_element + assert values[0]["extra"][key_extra] == value_extra + + #Â assert len(values[0]["policy_list"]) == 1 + import_export_utilities.clean_all(client) + + +def test_import_subject_object_action_categories(): + client = utilities.register_client() + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_utilities.clean_all(client) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_CATEGORIES + get_method = test_categories.get_subject_categories + elif type_element == "object": + elements = OBJECT_CATEGORIES + get_method = test_categories.get_object_categories + else: + elements = ACTION_CATEGORIES + get_method = test_categories.get_action_categories + + for element in elements: + req = client.post("/import", content_type='application/json', data=json.dumps(element)) + counter = counter + 1 + data = utilities.get_json(req.data) + assert data == "Import ok !" + req, get_elements = get_method(client) + get_elements = get_elements[type_element + "_categories"] + assert len(list(get_elements.keys())) == 1 + values = list(get_elements.values()) + assert values[0]["name"] == "test " + type_element + " categories" + assert values[0]["description"] == type_element + " category description" + + +def test_import_meta_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + # import some categories + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_META_RULES)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + counter = -1 + for meta_rule in META_RULES: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(meta_rule)) + if counter != 3: + assert req.status_code == 500 + continue + else: + data = utilities.get_json(req.data) + assert data == "Import ok !" + assert req.status_code == 200 + + req, meta_rules = test_meta_rules.get_meta_rules(client) + meta_rules = meta_rules["meta_rules"] + key = list(meta_rules.keys())[0] + assert isinstance(meta_rules,dict) + assert meta_rules[key]["name"] == "good meta rule" + assert meta_rules[key]["description"] == "valid meta rule" + assert len(meta_rules[key]["subject_categories"]) == 1 + assert len(meta_rules[key]["object_categories"]) == 1 + assert len(meta_rules[key]["action_categories"]) == 1 + + subject_category_key = meta_rules[key]["subject_categories"][0] + object_category_key = meta_rules[key]["object_categories"][0] + action_category_key = meta_rules[key]["action_categories"][0] + + req, sub_cat = test_categories.get_subject_categories(client) + sub_cat = sub_cat["subject_categories"] + assert sub_cat[subject_category_key]["name"] == "test subject categories" + + req, ob_cat = test_categories.get_object_categories(client) + ob_cat = ob_cat["object_categories"] + assert ob_cat[object_category_key]["name"] == "test object categories" + + req, ac_cat = test_categories.get_action_categories(client) + ac_cat = ac_cat["action_categories"] + assert ac_cat[action_category_key]["name"] == "test action categories" + + import_export_utilities.clean_all(client) + + +def test_import_subject_object_action_assignments(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + counter = -1 + if type_element == "subject": + datas = SUBJECT_ASSIGNMENTS + get_method = test_assignments.get_subject_assignment + elif type_element == "object": + datas = OBJECT_ASSIGNMENTS + get_method = test_assignments.get_object_assignment + else: + datas = ACTION_ASSIGNMENTS + get_method = test_assignments.get_action_assignment + + for assignments in datas: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(assignments)) + if counter != 3: + assert req.status_code == 500 + continue + else: + assert data == "Import ok !" + assert req.status_code == 200 + req, policies = test_policies.get_policies(client) + for policy_key in policies["policies"]: + req, get_assignments = get_method(client, policy_key) + get_assignments = get_assignments[type_element+"_assignments"] + assert len(get_assignments) == 1 + + +def test_import_rules(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_ASSIGNMENTS)) + data = utilities.get_json(req.data) + assert data == "Import ok !" + + counter = -1 + for rule in RULES: + counter = counter + 1 + req = client.post("/import", content_type='application/json', data=json.dumps(rule)) + + if counter < 5: + assert req.status_code == 500 + continue + + assert req.status_code == 200 + + req, rules = test_rules.test_get_rules() + rules = rules["rules"] + rules = rules["rules"] + assert len(rules) == 1 + rules = rules[0] + assert rules["enabled"] + assert rules["instructions"]["decision"] == "grant" + + req, meta_rules = test_meta_rules.get_meta_rules(client) + assert meta_rules["meta_rules"][list(meta_rules["meta_rules"].keys())[0]]["name"] == "good meta rule" + + +def test_import_subject_object_action_data(): + client = utilities.register_client() + type_elements = ["subject", "object", "action"] + + for type_element in type_elements: + import_export_utilities.clean_all(client) + req = client.post("/import", content_type='application/json', data=json.dumps(PRE_DATA)) + counter = -1 + # set the getters and the comparison values + if type_element == "subject": + elements = SUBJECT_DATA + get_method = test_data.get_subject_data + get_categories = test_categories.get_subject_categories + elif type_element == "object": + elements = OBJECT_DATA + get_method = test_data.get_object_data + get_categories = test_categories.get_object_categories + else: + elements = ACTION_DATA + get_method = test_data.get_action_data + get_categories = test_categories.get_action_categories + + for element in elements: + req = client.post("/import", content_type='application/json', data=json.dumps(element)) + counter = counter + 1 + if counter == 0 or counter == 1: + assert req.status_code == 500 + continue + assert req.status_code == 200 + data = utilities.get_json(req.data) + assert data == "Import ok !" + + req, policies = test_policies.get_policies(client) + policies = policies["policies"] + req, categories = get_categories(client) + categories = categories[type_element + "_categories"] + case_tested = False + for policy_key in policies.keys(): + policy = policies[policy_key] + for category_key in categories: + req, get_elements = get_method(client, policy_id=policy_key, category_id=category_key) + if len(get_elements[type_element+"_data"]) == 0: + continue + + # do this because the backend gives an element with empty data if the policy_key, + # category_key couple does not have any data... + get_elements = get_elements[type_element+"_data"] + if len(get_elements[0]["data"]) == 0: + continue + + if policy["name"] == "test policy": + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + if counter == 2: + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "one valid " + type_element + " data" + if counter == 3: + assert len(el["data"].keys()) == 2 + el1 = el["data"][list(el["data"].keys())[0]] + el2 = el["data"][list(el["data"].keys())[1]] + if "value" in el1: + el1 = el1["value"] + el2 = el2["value"] + assert (el1["name"] == "one valid " + type_element + " data" and el2["name"] == "valid " + type_element + " data") or (el2["name"] == "one valid " + type_element + " data" and el1["name"] == "valid " + type_element + " data") + assert el1["description"] == "description" + assert el2["description"] == "description" + + case_tested = True + + if policy["name"] == "test other policy": + if counter == 4: + assert len(get_elements) == 1 + el = get_elements[0] + assert isinstance(el["data"], dict) + assert len(el["data"].keys()) == 1 + el = el["data"][list(el["data"].keys())[0]] + if "value" in el: + el = el["value"] + assert el["name"] == "valid " + type_element + " data" + assert el["description"] == "new description" + case_tested = True + + assert case_tested is True + + +def test_clean(): + client = utilities.register_client() + import_export_utilities.clean_all(client) + #restore the database as previously + utilities.get_policy_id() diff --git a/old/moon_manager/tests/unit_python/api/test_meta_data.py b/old/moon_manager/tests/unit_python/api/test_meta_data.py new file mode 100644 index 00000000..e6cb0833 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_meta_data.py @@ -0,0 +1,305 @@ +import json +import api.utilities as utilities +from helpers import data_builder +from uuid import uuid4 + + +# subject_categories_test + + +def get_subject_categories(client): + req = client.get("/subject_categories") + subject_categories = utilities.get_json(req.data) + return req, subject_categories + + +def add_subject_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/subject_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subject_categories = utilities.get_json(req.data) + return req, subject_categories + + +def delete_subject_categories(client, name): + request, subject_categories = get_subject_categories(client) + for key, value in subject_categories['subject_categories'].items(): + if value['name'] == name: + return client.delete("/subject_categories/{}".format(key)) + + +def delete_subject_categories_without_id(client): + req = client.delete("/subject_categories/{}".format("")) + return req + + +def test_get_subject_categories(): + client = utilities.register_client() + req, subject_categories = get_subject_categories(client) + assert req.status_code == 200 + assert isinstance(subject_categories, dict) + assert "subject_categories" in subject_categories + + +def test_add_subject_categories(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(subject_categories, dict) + value = list(subject_categories["subject_categories"].values())[0] + assert "subject_categories" in subject_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_subject_categories_with_existed_name(): + client = utilities.register_client() + name = uuid4().hex + req, subject_categories = add_subject_categories(client, name) + assert req.status_code == 200 + req, subject_categories = add_subject_categories(client, name) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Subject Category Existing' + + +def test_add_subject_categories_name_contain_space(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, " ") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Category Name Invalid' + + +def test_add_subject_categories_with_empty_name(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_subject_categories_with_name_contain_space(): + client = utilities.register_client() + req, subject_categories = add_subject_categories(client, "test<z>user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_subject_categories(): + client = utilities.register_client() + req = delete_subject_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_subject_categories_without_id(): + client = utilities.register_client() + req = delete_subject_categories_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Subject Category Unknown" + + +# --------------------------------------------------------------------------- +# object_categories_test + +def get_object_categories(client): + req = client.get("/object_categories") + object_categories = utilities.get_json(req.data) + return req, object_categories + + +def add_object_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/object_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + object_categories = utilities.get_json(req.data) + return req, object_categories + + +def delete_object_categories(client, name): + request, object_categories = get_object_categories(client) + for key, value in object_categories['object_categories'].items(): + if value['name'] == name: + return client.delete("/object_categories/{}".format(key)) + + +def delete_object_categories_without_id(client): + req = client.delete("/object_categories/{}".format("")) + return req + + +def test_get_object_categories(): + client = utilities.register_client() + req, object_categories = get_object_categories(client) + assert req.status_code == 200 + assert isinstance(object_categories, dict) + assert "object_categories" in object_categories + + +def test_add_object_categories(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(object_categories, dict) + value = list(object_categories["object_categories"].values())[0] + assert "object_categories" in object_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_object_categories_with_existed_name(): + client = utilities.register_client() + name = uuid4().hex + req, object_categories = add_object_categories(client, name) + assert req.status_code == 200 + req, object_categories = add_object_categories(client, name) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Object Category Existing' + + +def test_add_object_categories_name_contain_space(): + client = utilities.register_client() + req, subject_categories = add_object_categories(client, " ") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Category Name Invalid' + + +def test_add_object_categories_with_empty_name(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_object_categories_with_name_contain_space(): + client = utilities.register_client() + req, object_categories = add_object_categories(client, "test<a>user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_object_categories(): + client = utilities.register_client() + req = delete_object_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_object_categories_without_id(): + client = utilities.register_client() + req = delete_object_categories_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Object Category Unknown" + + +# --------------------------------------------------------------------------- +# action_categories_test + +def get_action_categories(client): + req = client.get("/action_categories") + action_categories = utilities.get_json(req.data) + return req, action_categories + + +def add_action_categories(client, name): + data = { + "name": name, + "description": "description of {}".format(name) + } + req = client.post("/action_categories", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + action_categories = utilities.get_json(req.data) + return req, action_categories + + +def delete_action_categories(client, name): + request, action_categories = get_action_categories(client) + for key, value in action_categories['action_categories'].items(): + if value['name'] == name: + return client.delete("/action_categories/{}".format(key)) + + +def delete_action_categories_without_id(client): + req = client.delete("/action_categories/{}".format("")) + return req + + +def test_get_action_categories(): + client = utilities.register_client() + req, action_categories = get_action_categories(client) + assert req.status_code == 200 + assert isinstance(action_categories, dict) + assert "action_categories" in action_categories + + +def test_add_action_categories(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "testuser") + assert req.status_code == 200 + assert isinstance(action_categories, dict) + value = list(action_categories["action_categories"].values())[0] + assert "action_categories" in action_categories + assert value['name'] == "testuser" + assert value['description'] == "description of {}".format("testuser") + + +def test_add_action_categories_with_existed_name(): + client = utilities.register_client() + name = uuid4().hex + req, action_categories = add_action_categories(client, name) + assert req.status_code == 200 + req, action_categories = add_action_categories(client, name) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Action Category Existing' + + +def test_add_action_categories_name_contain_space(): + client = utilities.register_client() + req, subject_categories = add_action_categories(client, " ") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Category Name Invalid' + + +def test_add_action_categories_with_empty_name(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_action_categories_with_name_contain_space(): + client = utilities.register_client() + req, action_categories = add_action_categories(client, "test<a>user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_delete_action_categories(): + client = utilities.register_client() + req = delete_action_categories(client, "testuser") + assert req.status_code == 200 + + +def test_delete_action_categories_without_id(): + client = utilities.register_client() + req = delete_action_categories_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Action Category Unknown" + + +def test_delete_data_categories_connected_to_meta_rule(): + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + client = utilities.register_client() + req = client.delete("/subject_categories/{}".format(subject_category_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Subject Category With Meta Rule Error' + + req = client.delete("/object_categories/{}".format(object_category_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Object Category With Meta Rule Error' + + req = client.delete("/action_categories/{}".format(action_category_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Action Category With Meta Rule Error' diff --git a/old/moon_manager/tests/unit_python/api/test_meta_rules.py b/old/moon_manager/tests/unit_python/api/test_meta_rules.py new file mode 100644 index 00000000..634f19da --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_meta_rules.py @@ -0,0 +1,415 @@ +import json +import api.utilities as utilities +from helpers import category_helper +from helpers import data_builder +from uuid import uuid4 + + +def get_meta_rules(client): + req = client.get("/meta_rules") + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules(client, name, data=None): + if not data: + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id = list(action_category.keys())[0] + + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def add_meta_rules_without_category_ids(client, name): + data = { + "name": name + uuid4().hex, + "subject_categories": [], + "object_categories": [], + "action_categories": [] + } + req = client.post("/meta_rules", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules(client, name, metaRuleId, data=None): + if not data: + subject_category = category_helper.add_subject_category( + value={"name": "subject category name update" + uuid4().hex, + "description": "description 1"}) + subject_category_id = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name update" + uuid4().hex, + "description": "description 1"}) + object_category_id = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name update" + uuid4().hex, + "description": "description 1"}) + action_category_id = list(action_category.keys())[0] + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + + req = client.patch("/meta_rules/{}".format(metaRuleId), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def update_meta_rules_with_categories(client, name, data=None, meta_rule_id=None): + if not meta_rule_id: + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": name, + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + + req = client.patch("/meta_rules/{}".format(meta_rule_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + meta_rules = utilities.get_json(req.data) + return req, meta_rules + + +def delete_meta_rules(client, name): + request, meta_rules = get_meta_rules(client) + for key, value in meta_rules['meta_rules'].items(): + if value['name'] == name: + return client.delete("/meta_rules/{}".format(key)) + + +def delete_meta_rules_without_id(client): + req = client.delete("/meta_rules/{}".format("")) + return req + + +def test_get_meta_rules(): + client = utilities.register_client() + req, meta_rules = get_meta_rules(client) + assert req.status_code == 200 + assert isinstance(meta_rules, dict) + assert "meta_rules" in meta_rules + + +def test_add_meta_rules(): + client = utilities.register_client() + meta_rule_name = uuid4().hex + req, meta_rules = add_meta_rules(client, meta_rule_name) + assert req.status_code == 200 + assert isinstance(meta_rules, dict) + value = list(meta_rules["meta_rules"].values())[0] + assert "meta_rules" in meta_rules + assert value['name'] == meta_rule_name + + +def test_add_two_meta_rules_with_same_categories_combination(): + client = utilities.register_client() + meta_rule_name = uuid4().hex + req, meta_rules = add_meta_rules(client, meta_rule_name) + assert req.status_code == 200 + for meta_rule_id in meta_rules['meta_rules']: + if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name: + data = meta_rules['meta_rules'][meta_rule_id] + + data['name'] = uuid4().hex + req, meta_rules = add_meta_rules(client, name=data['name'], data=data) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Meta Rule Existing' + + +def test_add_three_meta_rules_with_different_combination_but_similar_items(): + client = utilities.register_client() + meta_rule_name1 = uuid4().hex + req, meta_rules = add_meta_rules(client, meta_rule_name1) + assert req.status_code == 200 + for meta_rule_id in meta_rules['meta_rules']: + if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name1: + data = meta_rules['meta_rules'][meta_rule_id] + break + + meta_rule_name2 = uuid4().hex + + req, meta_rules = add_meta_rules(client, meta_rule_name2) + + for meta_rule_id in meta_rules['meta_rules']: + if meta_rules['meta_rules'][meta_rule_id]['name'] == meta_rule_name2: + data['subject_categories'] += meta_rules['meta_rules'][meta_rule_id][ + 'subject_categories'] + data['object_categories'] += meta_rules['meta_rules'][meta_rule_id]['object_categories'] + data['action_categories'] += meta_rules['meta_rules'][meta_rule_id]['action_categories'] + break + + data['name'] = uuid4().hex + + req, meta_rules = add_meta_rules(client, name=data['name'], data=data) + assert req.status_code == 200 + + +def test_add_two_meta_rules_with_different_combination_but_similar_items(): + client = utilities.register_client() + meta_rule_name1 = uuid4().hex + meta_rule_name2 = uuid4().hex + + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id1 = list(subject_category.keys())[0] + + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id1 = list(object_category.keys())[0] + + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id1 = list(action_category.keys())[0] + + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id2 = list(subject_category.keys())[0] + + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id2 = list(object_category.keys())[0] + + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id2 = list(action_category.keys())[0] + + data = { + "name": meta_rule_name1, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1, object_category_id2], + "action_categories": [action_category_id1, action_category_id2] + } + req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data) + assert req.status_code == 200 + data = { + "name": meta_rule_name2, + "subject_categories": [subject_category_id2], + "object_categories": [object_category_id1], + "action_categories": [action_category_id2] + } + + req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data) + assert req.status_code == 200 + + +def test_add_meta_rule_with_existing_name_error(): + client = utilities.register_client() + name = uuid4().hex + req, meta_rules = add_meta_rules(client, name) + assert req.status_code == 200 + req, meta_rules = add_meta_rules(client, name) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Meta Rule Existing' + + +def test_add_meta_rules_with_forbidden_char_in_name(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "<a>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_meta_rules_with_blank_name(): + client = utilities.register_client() + req, meta_rules = add_meta_rules(client, "") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Meta Rule Error' + + +def test_add_meta_rules_without_subject_categories(): + client = utilities.register_client() + name_meta_rule = uuid4().hex + req, meta_rules = add_meta_rules_without_category_ids(client, name_meta_rule) + assert req.status_code == 200 + + +def test_delete_meta_rules(): + client = utilities.register_client() + name_meta_rule = uuid4().hex + req, meta_rules = add_meta_rules_without_category_ids(client, name_meta_rule) + meta_rule_id = next(iter(meta_rules['meta_rules'])) + req = delete_meta_rules(client, meta_rules['meta_rules'][meta_rule_id]['name']) + assert req.status_code == 200 + + +def test_delete_meta_rules_without_id(): + client = utilities.register_client() + req = delete_meta_rules_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Meta Rule Unknown" + + +def test_update_meta_rules(): + client = utilities.register_client() + req = add_meta_rules(client, "testuser") + meta_rule_id = list(req[1]['meta_rules'])[0] + req_update = update_meta_rules(client, "testuser", meta_rule_id) + assert req_update[0].status_code == 200 + delete_meta_rules(client, "testuser") + get_meta_rules(client) + + +def test_update_meta_rule_with_combination_existed(): + client = utilities.register_client() + meta_rule_name1 = uuid4().hex + req, meta_rules = add_meta_rules(client, meta_rule_name1) + meta_rule_id1 = next(iter(meta_rules['meta_rules'])) + data1 = meta_rules['meta_rules'][meta_rule_id1] + + meta_rule_name2 = uuid4().hex + req, meta_rules = add_meta_rules(client, meta_rule_name2) + meta_rule_id2 = next(iter(meta_rules['meta_rules'])) + data2 = meta_rules['meta_rules'][meta_rule_id2] + data1['name'] = data2['name'] + req_update = update_meta_rules(client, name=meta_rule_name2, metaRuleId=meta_rule_id2, + data=data1) + assert req_update[0].status_code == 409 + assert req_update[1]['message']== '409: Meta Rule Existing' + + +def test_update_meta_rule_with_different_combination_but_same_data(): + client = utilities.register_client() + meta_rule_name1 = uuid4().hex + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id1 = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id1 = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id1 = list(action_category.keys())[0] + subject_category = category_helper.add_subject_category( + value={"name": "subject category name" + uuid4().hex, "description": "description 1"}) + subject_category_id2 = list(subject_category.keys())[0] + object_category = category_helper.add_object_category( + value={"name": "object category name" + uuid4().hex, "description": "description 1"}) + object_category_id2 = list(object_category.keys())[0] + action_category = category_helper.add_action_category( + value={"name": "action category name" + uuid4().hex, "description": "description 1"}) + action_category_id2 = list(action_category.keys())[0] + + data = { + "name": meta_rule_name1, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1, object_category_id2], + "action_categories": [action_category_id1, action_category_id2] + } + req, meta_rules = add_meta_rules(client, meta_rule_name1, data=data) + assert req.status_code == 200 + + meta_rule_name2 = uuid4().hex + req, meta_rules = add_meta_rules(client, meta_rule_name2) + meta_rule_id2 = next(iter(meta_rules['meta_rules'])) + data2 = { + "name": meta_rule_name2, + "subject_categories": [subject_category_id1, subject_category_id2], + "object_categories": [object_category_id1], + "action_categories": [action_category_id1,action_category_id2] + } + + req_update = update_meta_rules(client, name=meta_rule_name2, metaRuleId=meta_rule_id2, + data=data2) + assert req_update[0].status_code == 200 + + +def test_update_meta_rules_without_id(): + client = utilities.register_client() + req_update = update_meta_rules(client, "testuser", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "400: Meta Rule Unknown" + + +def test_update_meta_rules_without_name(): + client = utilities.register_client() + req_update = update_meta_rules(client, "<br/>", "1234567") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)[ + "message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_update_meta_rules_without_categories(): + client = utilities.register_client() + req_update = update_meta_rules_with_categories(client, "testuser") + assert req_update[0].status_code == 200 + + +def test_update_meta_rules_with_empty_categories(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [""], + "object_categories": [""], + "action_categories": [""] + } + req_update = update_meta_rules_with_categories(client, "testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status_code == 400 + assert req_update[1]['message'] == '400: Subject Category Unknown' + + +def test_update_meta_rules_with_empty_action_category(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [""] + } + req_update = update_meta_rules_with_categories(client, "testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status_code == 400 + assert req_update[1]['message'] == '400: Action Category Unknown' + + +def test_update_meta_rules_with_empty_object_category(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [subject_category_id], + "object_categories": [""], + "action_categories": [action_category_id] + } + req_update = update_meta_rules_with_categories(client, "testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status_code == 400 + assert req_update[1]['message'] == '400: Object Category Unknown' + + +def test_update_meta_rules_with_categories_and_one_empty(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id = data_builder.create_new_meta_rule() + data = { + "name": "testuser", + "subject_categories": [subject_category_id, ""], + "object_categories": [object_category_id, ""], + "action_categories": [action_category_id, ""] + } + req_update = update_meta_rules_with_categories(client, "testuser", data=data, + meta_rule_id=meta_rule_id) + assert req_update[0].status_code == 400 + assert req_update[1]['message'] == '400: Subject Category Unknown' diff --git a/old/moon_manager/tests/unit_python/api/test_pdp.py b/old/moon_manager/tests/unit_python/api/test_pdp.py new file mode 100644 index 00000000..53a87b21 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_pdp.py @@ -0,0 +1,164 @@ +import json +import api.utilities as utilities +from helpers import data_builder as builder +from uuid import uuid4 + + +def get_pdp(client): + req = client.get("/pdp") + pdp = utilities.get_json(req.data) + return req, pdp + + +def add_pdp(client, data): + req = client.post("/pdp", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + pdp = utilities.get_json(req.data) + return req, pdp + + +def update_pdp(client, data, pdp_id): + req = client.patch("/pdp/{}".format(pdp_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + pdp = utilities.get_json(req.data) + return req, pdp + + +def delete_pdp(client, key): + req = client.delete("/pdp/{}".format(key)) + return req + + +def delete_pdp_without_id(client): + req = client.delete("/pdp/{}".format("")) + return req + + +def test_get_pdp(): + client = utilities.register_client() + req, pdp = get_pdp(client) + assert req.status_code == 200 + assert isinstance(pdp, dict) + assert "pdps" in pdp + + +def test_add_pdp(): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data = { + "name": "testuser", + "security_pipeline": [policy_id], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req, pdp = add_pdp(client, data) + assert req.status_code == 200 + assert isinstance(pdp, dict) + value = list(pdp["pdps"].values())[0] + assert "pdps" in pdp + assert value['name'] == "testuser" + assert value["description"] == "description of {}".format("testuser") + assert value["keystone_project_id"] == "keystone_project_id" + + +def test_delete_pdp(): + client = utilities.register_client() + request, pdp = get_pdp(client) + success_req = None + for key, value in pdp['pdps'].items(): + if value['name'] == "testuser": + success_req = delete_pdp(client, key) + break + assert success_req + assert success_req.status_code == 200 + + +def test_add_pdp_with_forbidden_char_in_user(): + data = { + "name": "<a>", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req, models = add_pdp(client, data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_pdp_with_forbidden_char_in_keystone(): + data = { + "name": "testuser", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "<a>", + "description": "description of testuser" + } + client = utilities.register_client() + req, meta_rules = add_pdp(client, data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'keystone_project_id', [Forbidden characters in string]" + + +def test_update_pdp(): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1"+uuid4().hex, + object_category_name="object_category1"+uuid4().hex, + action_category_name="action_category1"+uuid4().hex, + meta_rule_name="meta_rule_1"+uuid4().hex, + model_name="model1"+uuid4().hex) + data_add = { + "name": "testuser", + "security_pipeline": [policy_id], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id_update = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + data_update = { + "name": "testuser", + "security_pipeline": [policy_id_update], + "keystone_project_id": "keystone_project_id_update", + "description": "description of testuser" + } + client = utilities.register_client() + req = add_pdp(client, data_add) + pdp_id = list(req[1]['pdps'])[0] + req_update = update_pdp(client, data_update, pdp_id) + assert req_update[0].status_code == 200 + value = list(req_update[1]["pdps"].values())[0] + assert value["keystone_project_id"] == "keystone_project_id_update" + request, pdp = get_pdp(client) + for key, value in pdp['pdps'].items(): + if value['name'] == "testuser": + delete_pdp(client, key) + break + + +def test_update_pdp_without_id(): + client = utilities.register_client() + req_update = update_pdp(client, "testuser", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == 'Invalid Key :name not found' + + +def test_update_pdp_without_user(): + data = { + "name": "", + "security_pipeline": ["policy_id_1", "policy_id_2"], + "keystone_project_id": "keystone_project_id", + "description": "description of testuser" + } + client = utilities.register_client() + req_update = update_pdp(client, data, "<a>") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "Forbidden characters in string" diff --git a/old/moon_manager/tests/unit_python/api/test_perimeter.py b/old/moon_manager/tests/unit_python/api/test_perimeter.py new file mode 100644 index 00000000..ff7b09d7 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_perimeter.py @@ -0,0 +1,1028 @@ +# import moon_manager +# import moon_manager.api +import json +import api.utilities as utilities +from helpers import data_builder as builder +import helpers.policy_helper as policy_helper +from uuid import uuid4 + + +def get_subjects(client): + req = client.get("/subjects") + subjects = utilities.get_json(req.data) + return req, subjects + + +def add_subjects(client, policy_id, name, perimeter_id=None, data=None): + if not data: + name = name + uuid4().hex + data = { + "name": name, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + if not perimeter_id: + req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + else: + req = client.post("/policies/{}/subjects/{}".format(policy_id, perimeter_id), + data=json.dumps( + data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + return req, subjects + + +def delete_subjects_without_perimeter_id(client): + req = client.delete("/subjects/{}".format("")) + return req + + +def test_perimeter_get_subject(): + client = utilities.register_client() + req, subjects = get_subjects(client) + assert req.status_code == 200 + assert isinstance(subjects, dict) + assert "subjects" in subjects + + +def test_perimeter_add_subject(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + req, subjects = add_subjects(client, policy_id, "testuser") + value = list(subjects["subjects"].values())[0] + assert req.status_code == 200 + assert value["name"] + assert value["email"] + + +def test_perimeter_add_same_subject_perimeter_id_with_new_policy_id(): + client = utilities.register_client() + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + name = "testuser" + perimeter_id = uuid4().hex + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + add_subjects(client, policy_id1, data['name'], perimeter_id=perimeter_id, data=data) + policies2 = policy_helper.add_policies() + policy_id2 = list(policies2.keys())[0] + req, subjects = add_subjects(client, policy_id2, data['name'], + perimeter_id=perimeter_id, data=data) + value = list(subjects["subjects"].values())[0] + assert req.status_code == 200 + assert value["name"] + assert value["email"] + assert len(value['policy_list']) == 2 + assert policy_id1 in value['policy_list'] + assert policy_id2 in value['policy_list'] + + +def test_perimeter_add_same_subject_perimeter_id_with_different_name(): + client = utilities.register_client() + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id = uuid4().hex + add_subjects(client, policy_id1, "testuser", perimeter_id=perimeter_id) + policies2 = policy_helper.add_policies() + policy_id2 = list(policies2.keys())[0] + req, subjects = add_subjects(client, policy_id2, "testuser", perimeter_id=perimeter_id) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_add_same_subject_name_with_new_policy_id(): + client = utilities.register_client() + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id = uuid4().hex + name = "testuser" + uuid4().hex + data = { + "name": name, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id, + data=data) + policies2 = policy_helper.add_policies() + policy_id2 = list(policies2.keys())[0] + value = list(subjects["subjects"].values())[0] + data = { + "name": value['name'], + "description": "description of {}".format(value['name']), + "password": "password for {}".format(value['name']), + "email": "{}@moon".format(value['name']) + } + req, subjects = add_subjects(client, policy_id2, None, data=data) + value = list(subjects["subjects"].values())[0] + assert req.status_code == 200 + assert value["name"] + assert value["email"] + assert len(value['policy_list']) == 2 + assert policy_id1 in value['policy_list'] + assert policy_id2 in value['policy_list'] + + +def test_perimeter_add_same_subject_name_with_same_policy_id(): + client = utilities.register_client() + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id = uuid4().hex + name = "testuser" + uuid4().hex + data = { + "name": name, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + req, subjects = add_subjects(client, policy_id1, None, perimeter_id=perimeter_id, + data=data) + value = list(subjects["subjects"].values())[0] + data = { + "name": value['name'], + "description": "description of {}".format(value['name']), + "password": "password for {}".format(value['name']), + "email": "{}@moon".format(value['name']) + } + req, subjects = add_subjects(client, policy_id1, None, data=data) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_same_subject_perimeter_id_with_existed_policy_id_in_list(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + name = "testuser" + uuid4().hex + data = { + "name": name, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + req, subjects = add_subjects(client, policy_id, name, data=data) + perimeter_id = list(subjects["subjects"].values())[0]['id'] + req, subjects = add_subjects(client, policy_id, name, perimeter_id=perimeter_id, data=data) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_subject_invalid_policy_id(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + name = "testuser" + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + req, subjects = add_subjects(client, policy_id + "0", "testuser", data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' + + +def test_perimeter_add_subject_policy_id_none(): + client = utilities.register_client() + name = "testuser" + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + "password": "password for {}".format(name), + "email": "{}@moon".format(name) + } + req, subjects = add_subjects(client, None, "testuser", data) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' + + +def test_perimeter_add_subject_with_forbidden_char_in_name(): + client = utilities.register_client() + data = { + "name": "<a>", + "description": "description of {}".format(""), + "password": "password for {}".format(""), + "email": "{}@moon".format("") + } + req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_update_subject_name(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + req, subjects = add_subjects(client, policy_id, "testuser") + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update" + } + req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + value2 = list(subjects["subjects"].values())[0] + assert req.status_code == 200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] == value2['description'] + + +def test_perimeter_update_subject_description(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + req, subjects = add_subjects(client, policy_id, "testuser") + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update", + } + req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + value2 = list(subjects["subjects"].values())[0] + assert req.status_code == 200 + assert value1['name'] == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_subject_description_and_name(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + req, subjects = add_subjects(client, policy_id, "testuser") + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update", + 'name': value1['name'] + "update" + } + req = client.patch("/subjects/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + value2 = list(subjects["subjects"].values())[0] + assert req.status_code == 200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_subject_wrong_id(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, subjects = add_subjects(client, policy_id=policy_id1, name='testuser', data=data) + value1 = list(subjects["subjects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = client.patch("/subjects/{}".format(perimeter_id + "wrong"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_update_subject_name_with_existed_one(): + client = utilities.register_client() + name1 = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + perimeter_id1 = uuid4().hex + req, subjects = add_subjects(client, policy_id=policy_id1, name=name1, + perimeter_id=perimeter_id1) + value1 = list(subjects["subjects"].values())[0] + perimeter_id2 = uuid4().hex + name2 = 'testuser' + uuid4().hex + req, subjects = add_subjects(client, policy_id=policy_id1, name=name2, + perimeter_id=perimeter_id2) + data = { + 'name': value1['name'], + } + req = client.patch("/subjects/{}".format(perimeter_id2), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 409 + + +def test_perimeter_delete_subject(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + req, subjects = add_subjects(client, policy_id, "testuser") + subject_id = list(subjects["subjects"].values())[0]["id"] + req = client.delete("/policies/{}/subjects/{}".format(policy_id, subject_id)) + assert req.status_code == 200 + + +def test_perimeter_delete_subjects_without_perimeter_id(): + client = utilities.register_client() + req = delete_subjects_without_perimeter_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Subject Unknown" + + +def get_objects(client): + req = client.get("/objects") + objects = utilities.get_json(req.data) + return req, objects + + +def add_objects(client, name, policyId=None, data=None, perimeter_id=None): + if not policyId: + subject_category_id, object_category_id, action_category_id, meta_rule_id, policyId = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + if not data: + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + } + if not perimeter_id: + req = client.post("/policies/{}/objects/".format(policyId), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + else: + req = client.post("/policies/{}/objects/{}".format(policyId, perimeter_id), + data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + objects = utilities.get_json(req.data) + return req, objects + + +def delete_objects_without_perimeter_id(client): + req = client.delete("/objects/{}".format("")) + return req + + +def test_perimeter_get_object(): + client = utilities.register_client() + req, objects = get_objects(client) + assert req.status_code == 200 + assert isinstance(objects, dict) + assert "objects" in objects + + +def test_perimeter_add_object(): + client = utilities.register_client() + req, objects = add_objects(client, "testuser") + value = list(objects["objects"].values())[0] + assert req.status_code == 200 + assert value['name'] + + +def test_perimeter_add_object_with_wrong_policy_id(): + client = utilities.register_client() + req, objects = add_objects(client, "testuser", policyId='wrong') + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' + + +def test_perimeter_add_object_with_policy_id_none(): + client = utilities.register_client() + data = { + "name": "testuser" + uuid4().hex, + "description": "description of {}".format("testuser"), + } + req = client.post("/policies/{}/objects/".format(None), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' + + +def test_perimeter_add_same_object_name_with_new_policy_id(): + client = utilities.register_client() + req, objects = add_objects(client, "testuser") + value1 = list(objects["objects"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + value2 = list(objects["objects"].values())[0] + assert req.status_code == 200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_object_perimeter_id_with_new_policy_id(): + client = utilities.register_client() + req, objects = add_objects(client, "testuser") + value1 = list(objects["objects"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data, + perimeter_id=value1['id']) + value2 = list(objects["objects"].values())[0] + assert req.status_code == 200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_object_perimeter_id_with_different_name(): + client = utilities.register_client() + req, objects = add_objects(client, "testuser") + value1 = list(objects["objects"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'] + 'different', + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data, + perimeter_id=value1['id']) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_add_same_object_name_with_same_policy_id(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + value = list(objects["objects"].values())[0] + assert req.status_code == 200 + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_same_object_perimeter_id_with_existed_policy_id_in_list(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + value = list(objects["objects"].values())[0] + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data, + perimeter_id=value['id']) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_perimeter_update_object_name(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update" + } + req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + objects = utilities.get_json(req.data) + value2 = list(objects["objects"].values())[0] + assert req.status_code == 200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] == value2['description'] + + +def test_perimeter_update_object_description(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update" + } + req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + objects = utilities.get_json(req.data) + value2 = list(objects["objects"].values())[0] + assert req.status_code == 200 + assert value1['name'] == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_object_description_and_name(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = client.patch("/objects/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + objects = utilities.get_json(req.data) + value2 = list(objects["objects"].values())[0] + assert req.status_code == 200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_object_wrong_id(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data) + + value1 = list(objects["objects"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = client.patch("/objects/{}".format(perimeter_id + "wrong"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + + +def test_perimeter_update_object_name_with_existed_one(): + client = utilities.register_client() + name = 'testuser' + uuid4().hex + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data1 = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data1) + value1 = list(objects["objects"].values())[0] + + name = 'testuser' + uuid4().hex + + data2 = { + "name": name, + "description": "description of {}".format('testuser'), + } + req, objects = add_objects(client, 'testuser', policyId=policy_id1, data=data2) + + value2 = list(objects["objects"].values())[0] + perimeter_id2 = value2['id'] + + data3 = { + 'name': value1['name'] + } + req = client.patch("/objects/{}".format(perimeter_id2), data=json.dumps(data3), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Object Existing' + + +def test_perimeter_add_object_without_name(): + client = utilities.register_client() + data = { + "name": "<br/>", + "description": "description of {}".format(""), + } + req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_add_object_with_name_contain_spaces(): + client = utilities.register_client() + data = { + "name": "test<a>user", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/objects/".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_delete_object(): + client = utilities.register_client() + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + object_id = builder.create_object(policy_id) + req = client.delete("/policies/{}/objects/{}".format(policy_id, object_id)) + assert req.status_code == 200 + + +def test_perimeter_delete_objects_without_perimeter_id(): + client = utilities.register_client() + req = delete_objects_without_perimeter_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Object Unknown" + + +def get_actions(client): + req = client.get("/actions") + actions = utilities.get_json(req.data) + return req, actions + + +def add_actions(client, name, policy_id=None, data=None, perimeter_id=None): + if not policy_id: + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex, + model_name="model1" + uuid4().hex) + + if not data: + data = { + "name": name + uuid4().hex, + "description": "description of {}".format(name), + } + if not perimeter_id: + req = client.post("/policies/{}/actions/".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + else: + req = client.post("/policies/{}/actions/{}".format(policy_id, perimeter_id), + data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + actions = utilities.get_json(req.data) + return req, actions + + +def delete_actions_without_perimeter_id(client): + req = client.delete("/actions/{}".format("")) + return req + + +def test_perimeter_get_actions(): + client = utilities.register_client() + req, actions = get_actions(client) + assert req.status_code == 200 + assert isinstance(actions, dict) + assert "actions" in actions + + +def test_perimeter_add_actions(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") + value = list(actions["actions"].values())[0] + assert req.status_code == 200 + assert value['name'] + + +def test_perimeter_add_action_with_wrong_policy_id(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser", policy_id="wrong") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' + + +def test_perimeter_add_action_with_policy_id_none(): + client = utilities.register_client() + data = { + "name": "testuser" + uuid4().hex, + "description": "description of {}".format("testuser"), + } + req = client.post("/policies/{}/actions/".format(None), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' + + +def test_perimeter_add_same_action_name_with_new_policy_id(): + client = utilities.register_client() + req, action = add_actions(client, "testuser") + value1 = list(action["actions"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data) + value2 = list(action["actions"].values())[0] + assert req.status_code == 200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_action_perimeter_id_with_new_policy_id(): + client = utilities.register_client() + req, action = add_actions(client, "testuser") + value1 = list(action["actions"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data, + perimeter_id=value1['id']) + value2 = list(action["actions"].values())[0] + assert req.status_code == 200 + assert value1['id'] == value2['id'] + assert value1['name'] == value2['name'] + + +def test_perimeter_add_same_action_perimeter_id_with_different_name(): + client = utilities.register_client() + req, action = add_actions(client, "testuser") + value1 = list(action["actions"].values())[0] + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + data = { + "name": value1['name'] + 'different', + "description": "description of {}".format('testuser'), + } + req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data, + perimeter_id=value1['id']) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_add_same_action_name_with_same_policy_id(): + client = utilities.register_client() + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + req, action = add_actions(client, "testuser", policy_id=policy_id1) + value1 = list(action["actions"].values())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_same_action_perimeter_id_with_existed_policy_id_in_list(): + client = utilities.register_client() + policies1 = policy_helper.add_policies() + policy_id1 = list(policies1.keys())[0] + req, action = add_actions(client, "testuser", policy_id=policy_id1) + value1 = list(action["actions"].values())[0] + data = { + "name": value1['name'], + "description": "description of {}".format('testuser'), + } + req, action = add_actions(client, 'testuser', policy_id=policy_id1, data=data, + perimeter_id=value1['id']) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_perimeter_add_actions_without_name(): + client = utilities.register_client() + data = { + "name": "<a>", + "description": "description of {}".format(""), + } + req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_perimeter_add_actions_with_name_contain_spaces(): + client = utilities.register_client() + data = { + "name": "test<a>user", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_subjects_without_policy_id(): + client = utilities.register_client() + data = { + "name": "testuser", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/subjects".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_add_objects_without_policy_id(): + client = utilities.register_client() + data = { + "name": "testuser", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/objects".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_add_action_without_policy_id(): + client = utilities.register_client() + data = { + "name": "testuser", + "description": "description of {}".format("test user"), + } + req = client.post("/policies/{}/actions".format("111"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_perimeter_update_action_name(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update" + } + req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + value2 = list(subjects["actions"].values())[0] + assert req.status_code == 200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] == value2['description'] + + +def test_perimeter_update_actions_description(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'description': value1['description'] + "update" + } + req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + value2 = list(subjects["actions"].values())[0] + assert req.status_code == 200 + assert value1['name'] == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_actions_description_and_name(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = client.patch("/actions/{}".format(perimeter_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + subjects = utilities.get_json(req.data) + value2 = list(subjects["actions"].values())[0] + assert req.status_code == 200 + assert value1['name'] + 'update' == value2['name'] + assert value1['id'] == value2['id'] + assert value1['description'] + 'update' == value2['description'] + + +def test_perimeter_update_action_wrong_id(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") + value1 = list(actions["actions"].values())[0] + perimeter_id = value1['id'] + data = { + 'name': value1['name'] + "update", + 'description': value1['description'] + "update" + } + req = client.patch("/actions/{}".format(perimeter_id + "wrong"), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Perimeter content is invalid.' + + +def test_perimeter_update_action_name_with_existed_one(): + client = utilities.register_client() + req, actions = add_actions(client, "testuser") + value1 = list(actions["actions"].values())[0] + req, actions = add_actions(client, "testuser") + value2 = list(actions["actions"].values())[0] + perimeter_id2 = value2['id'] + data = { + 'name': value1['name'], + } + req = client.patch("/actions/{}".format(perimeter_id2), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Action Existing' + + +def test_perimeter_delete_actions(): + client = utilities.register_client() + + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + action_id = builder.create_action(policy_id) + req = client.delete("/policies/{}/actions/{}".format(policy_id, action_id)) + assert req.status_code == 200 + + +def test_delete_subject_without_policy(): + client = utilities.register_client() + + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + action_id = builder.create_action(policy_id) + + req = client.delete("/subjects/{}".format(action_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_delete_objects_without_policy(): + client = utilities.register_client() + + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + action_id = builder.create_action(policy_id) + + req = client.delete("/objects/{}".format(action_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_delete_actions_without_policy(): + client = utilities.register_client() + + policies = policy_helper.add_policies() + policy_id = list(policies.keys())[0] + + action_id = builder.create_action(policy_id) + + req = client.delete("/actions/{}".format(action_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + + +def test_perimeter_delete_actions_without_perimeter_id(): + client = utilities.register_client() + req = delete_actions_without_perimeter_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Action Unknown" diff --git a/old/moon_manager/tests/unit_python/api/test_policies.py b/old/moon_manager/tests/unit_python/api/test_policies.py new file mode 100644 index 00000000..76161d53 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_policies.py @@ -0,0 +1,342 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import json +from uuid import uuid4 +import api.utilities as utilities +from helpers import model_helper +from helpers import policy_helper +from helpers import data_builder + + +def get_policies(client): + req = client.get("/policies") + policies = utilities.get_json(req.data) + return req, policies + + +def add_policies(client, name): + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": name, + "description": "description of {}".format(name), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policies = utilities.get_json(req.data) + return req, policies + + +def delete_policies_without_id(client): + req = client.delete("/policies/{}".format("")) + return req + + +def test_get_policies(): + client = utilities.register_client() + req, policies = get_policies(client) + assert req.status_code == 200 + assert isinstance(policies, dict) + assert "policies" in policies + + +def test_add_policies(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + req, policies = add_policies(client, policy_name) + assert req.status_code == 200 + assert isinstance(policies, dict) + value = list(policies["policies"].values())[0] + assert "policies" in policies + assert value['name'] == policy_name + assert value["description"] == "description of {}".format(policy_name) + + +def test_add_policies_without_model(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": "", + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + assert req.status_code == 200 + + +def test_add_policies_with_same_name(): + name = uuid4().hex + policy_name = name + client = utilities.register_client() + req, policies = add_policies(client, policy_name) + assert req.status_code == 200 + assert isinstance(policies, dict) + value = list(policies["policies"].values())[0] + assert "policies" in policies + assert value['name'] == policy_name + assert value["description"] == "description of {}".format(policy_name) + client = utilities.register_client() + req, policies = add_policies(client, policy_name) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_add_policy_with_empty_name(): + policy_name = "" + client = utilities.register_client() + req, policies = add_policies(client, policy_name) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Content Error' + + +def test_update_policies_with_model(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": "", + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id = next(iter(utilities.get_json(req.data)['policies'])) + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name + "-2", + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 200 + assert json.loads(req.data)['policies'][policy_id]['name'] == policy_name + '-2' + + +def test_update_policies_name_success(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id = next(iter(utilities.get_json(req.data)['policies'])) + + data = { + "name": policy_name + "-2", + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 200 + assert json.loads(req.data)['policies'][policy_id]['name'] == policy_name + '-2' + + +def test_update_policies_model_unused(): + policy_name = uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id = next(iter(utilities.get_json(req.data)['policies'])) + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 200 + + +def test_update_policy_name_with_existed_one(): + policy_name1 = "testuser" + uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name1, + "description": "description of {}".format(policy_name1), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id1 = next(iter(utilities.get_json(req.data)['policies'])) + + policy_name2 = "testuser" + uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name2, + "description": "description of {}".format(policy_name2), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id2 = next(iter(utilities.get_json(req.data)['policies'])) + + data = { + "name": policy_name1, + "description": "description of {}".format(policy_name1), + "model_id": model_id, + "genre": "genre" + } + req = client.patch("/policies/{}".format(policy_id2), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Policy Already Exists' + + +def test_update_policies_with_empty_name(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id = next(iter(utilities.get_json(req.data)['policies'])) + + data = { + "name": "", + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Content Error' + + +def test_update_policies_with_blank_model(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies/", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + policy_id = next(iter(utilities.get_json(req.data)['policies'])) + + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": "", + "genre": "genre" + } + + req = client.patch("/policies/{}".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 200 + + +def test_update_policies_connected_to_rules_with_blank_model(): + client = utilities.register_client() + req, rules, policy_id = data_builder.add_rules(client) + req = client.get("/policies") + data = utilities.get_json(req.data) + for policy_obj_id in data['policies']: + if policy_obj_id == policy_id: + policy = data['policies'][policy_obj_id] + policy['model_id'] = '' + req = client.patch("/policies/{}".format(policy_id), data=json.dumps(policy), + headers={'Content-Type': 'application/json'}) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy update error' + + +def test_delete_policies(): + client = utilities.register_client() + + policy = policy_helper.add_policies() + policy_id = list(policy.keys())[0] + + req = client.delete("/policies/{}".format(policy_id)) + assert req.status_code == 200 + + +def test_delete_policy_with_dependencies_rule(): + client = utilities.register_client() + req, rules, policy_id = data_builder.add_rules(client) + req = client.delete("/policies/{}".format(policy_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy With Rule Error' + + +def test_delete_policy_with_dependencies_subject_data(): + client = utilities.register_client() + req, rules, policy_id = data_builder.add_rules(client) + req = client.delete("/policies/{}/rules/{}".format(policy_id, next(iter(rules['rules'])))) + assert req.status_code == 200 + req = client.delete("/policies/{}".format(policy_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy With Data Error' + + +def test_delete_policy_with_dependencies_perimeter(): + client = utilities.register_client() + policy = policy_helper.add_policies() + policy_id = next(iter(policy)) + + data = { + "name": 'testuser'+uuid4().hex, + "description": "description of {}".format(uuid4().hex), + "password": "password for {}".format(uuid4().hex), + "email": "{}@moon".format(uuid4().hex) + } + req = client.post("/policies/{}/subjects".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + assert req.status_code == 200 + req = client.delete("/policies/{}".format(policy_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy With Perimeter Error' + + +def test_delete_policies_without_id(): + client = utilities.register_client() + req = delete_policies_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Policy Unknown' diff --git a/old/moon_manager/tests/unit_python/api/test_rules.py b/old/moon_manager/tests/unit_python/api/test_rules.py new file mode 100644 index 00000000..a3c21839 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_rules.py @@ -0,0 +1,129 @@ +import api.utilities as utilities +import json +from helpers import data_builder as builder +from uuid import uuid4 +from helpers import policy_helper + + +def get_rules(client, policy_id): + req = client.get("/policies/{}/rules".format(policy_id)) + rules = utilities.get_json(req.data) + return req, rules + + +def add_rules_without_policy_id(client): + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() + data = { + "meta_rule_id": meta_rule_id, + "rule": [subject_category_id, object_category_id, action_category_id], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(None), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules + + +def add_rules_without_meta_rule_id(client, policy_id): + data = { + "meta_rule_id": "", + "rule": ["subject_data_id2", "object_data_id2", "action_data_id2"], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules + + +def add_rules_without_rule(client, policy_id): + data = { + "meta_rule_id": "meta_rule_id1", + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules + + +def delete_rules(client, policy_id, meta_rule_id): + req = client.delete("/policies/{}/rules/{}".format(policy_id, meta_rule_id)) + return req + + +def test_get_rules(): + policy_id = utilities.get_policy_id() + client = utilities.register_client() + req, rules = get_rules(client, policy_id) + assert req.status_code == 200 + assert isinstance(rules, dict) + assert "rules" in rules + return req, rules + + +def test_add_rules(): + client = utilities.register_client() + req, rules, policy = builder.add_rules(client, ) + assert req.status_code == 200 + + +def test_add_rules_without_policy_id(): + client = utilities.register_client() + req, rules = add_rules_without_policy_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Policy Unknown" + +# +# def test_add_rules_without_meta_rule_id(): +# policy_id = utilities.get_policy_id() +# client = utilities.register_client() +# req, rules = add_rules_without_meta_rule_id(client, policy_id) +# assert req.status_code == 400 +# assert json.loads(req.data)["message"] == "Key: 'meta_rule_id', [Empty String]" + + +def test_add_rules_without_rule(): + policy_id = utilities.get_policy_id() + client = utilities.register_client() + req, rules = add_rules_without_rule(client, policy_id) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == 'Invalid Key :rule not found' + + +def test_delete_rules_with_invalid_parameters(): + client = utilities.register_client() + req = delete_rules(client, "", "") + assert req.status_code == 404 + # assert json.loads(req.data)["message"] == 'Invalid Key :rule not found' + + +def test_delete_rules_without_policy_id(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = builder.create_new_policy() + sub_data_id = builder.create_subject_data(policy_id, subject_category_id) + obj_data_id = builder.create_object_data(policy_id, object_category_id) + act_data_id = builder.create_action_data(policy_id, action_category_id) + data = { + "meta_rule_id": meta_rule_id, + "rule": [sub_data_id, obj_data_id, act_data_id], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + req, added_rules = get_rules(client, policy_id) + id = list(added_rules["rules"]["rules"])[0]["id"] + rules = delete_rules(client, None, id) + assert rules.status_code == 200 diff --git a/old/moon_manager/tests/unit_python/api/test_unit_models.py b/old/moon_manager/tests/unit_python/api/test_unit_models.py new file mode 100644 index 00000000..6e93ed28 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/test_unit_models.py @@ -0,0 +1,352 @@ +# Copyright 2018 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import json +import api.utilities as utilities +from helpers import data_builder as builder +from helpers import policy_helper +from helpers import model_helper +from uuid import uuid4 + + +def get_models(client): + req = client.get("/models") + models = utilities.get_json(req.data) + return req, models + + +def add_models(client, name, data=None): + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() + + if not data: + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + req = client.post("/models", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def update_model(client, name, model_id): + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() + + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + req = client.patch("/models/{}".format(model_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def add_model_without_meta_rules_ids(client, name): + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [] + } + req = client.post("/models", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def add_model_with_empty_meta_rule_id(client, name): + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [""] + } + req = client.post("/models", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def update_model_without_meta_rules_ids(client, model_id): + name = "model_id" + uuid4().hex + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [] + } + req = client.patch("/models/{}".format(model_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + models = utilities.get_json(req.data) + return req, models + + +def delete_models(client, name): + request, models = get_models(client) + for key, value in models['models'].items(): + if value['name'] == name: + req = client.delete("/models/{}".format(key)) + break + return req + + +def delete_models_without_id(client): + req = client.delete("/models/{}".format("")) + return req + + +def test_delete_model_assigned_to_policy(): + policy_name = "testuser" + uuid4().hex + client = utilities.register_client() + req = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(req.keys())[0] + data = { + "name": policy_name, + "description": "description of {}".format(policy_name), + "model_id": model_id, + "genre": "genre" + } + req = client.post("/policies", data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + req = client.delete("/models/{}".format(model_id)) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Model With Policy Error' + + +def clean_models(): + client = utilities.register_client() + req, models = get_models(client) + for key, value in models['models'].items(): + print(key) + print(value) + client.delete("/models/{}".format(key)) + + +def test_get_models(): + client = utilities.register_client() + req, models = get_models(client) + assert req.status_code == 200 + assert isinstance(models, dict) + assert "models" in models + + +def test_add_models(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, "testuser") + assert req.status_code == 200 + assert isinstance(models, dict) + model_id = list(models["models"])[0] + assert "models" in models + assert models['models'][model_id]['name'] == "testuser" + assert models['models'][model_id]["description"] == "description of {}".format("testuser") + + +def test_delete_models(): + client = utilities.register_client() + req = delete_models(client, "testuser") + assert req.status_code == 200 + + +def test_update_models_with_assigned_policy(): + client = utilities.register_client() + + model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(model.keys())[0] + value = { + "name": "test_policy" + uuid4().hex, + "model_id": model_id, + "description": "test", + } + policy = policy_helper.add_policies(value=value) + data = { + "name": "model_" + uuid4().hex, + "description": "description of model_2", + "meta_rules": [] + } + req = client.patch("/models/{}".format(model_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Model With Policy Error" + + +def test_update_models_with_no_assigned_policy(): + client = utilities.register_client() + + model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(model.keys())[0] + + data = { + "name": "model_" + uuid4().hex, + "description": "description of model_2", + "meta_rules": [] + } + req = client.patch("/models/{}".format(model_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + assert req.status_code == 200 + + +def test_add_models_with_meta_rule_key(): + client = utilities.register_client() + + model = model_helper.add_model(model_id="mls_model_id" + uuid4().hex) + model_id = list(model.keys())[0] + + data = { + "name": "model_" + uuid4().hex, + "description": "description of model_2", + + } + req = client.patch("/models/{}".format(model_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Invalid Key :meta_rules not found" + + +def test_delete_models_without_id(): + client = utilities.register_client() + req = delete_models_without_id(client) + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "400: Model Unknown" + + +def test_add_model_with_empty_name(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, "<br/>") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_model_with_name_contain_space(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, "test<br>user") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_add_model_with_name_space(): + clean_models() + client = utilities.register_client() + req, models = add_models(client, " ") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Model Unknown' + + +def test_add_model_with_empty_meta_rule_id(): + clean_models() + client = utilities.register_client() + req, meta_rules = add_model_with_empty_meta_rule_id(client, "testuser") + assert req.status_code == 400 + assert json.loads(req.data)["message"] == '400: Meta Rule Unknown' + + +def test_add_model_with_existed_name(): + clean_models() + client = utilities.register_client() + name = uuid4().hex + req, models = add_models(client, name) + assert req.status_code == 200 + req, models = add_models(client, name) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Model Error' + + +def test_add_model_with_existed_meta_rules_list(): + clean_models() + client = utilities.register_client() + name = uuid4().hex + + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + name = uuid4().hex + req, models = add_models(client=client, name=name, data=data) + assert req.status_code == 200 + + data = { + "name": name, + "description": "description of {}".format(name), + "meta_rules": [meta_rule_id] + } + req, models = add_models(client=client, name=name, data=data) + assert req.status_code == 409 + assert json.loads(req.data)["message"] == '409: Model Error' + + +def test_add_model_without_meta_rules(): + clean_models() + client = utilities.register_client() + req, meta_rules = add_model_without_meta_rules_ids(client, "testuser") + assert req.status_code == 200 + # assert json.loads(req.data)["message"] == "Key: 'meta_rules', [Empty Container]" + + +def test_update_model(): + clean_models() + client = utilities.register_client() + req = add_models(client, "testuser") + model_id = list(req[1]['models'])[0] + req_update = update_model(client, "testuser", model_id) + assert req_update[0].status_code == 200 + model_id = list(req_update[1]["models"])[0] + assert req_update[1]["models"][model_id]["meta_rules"][0] is not None + delete_models(client, "testuser") + + +def test_update_model_name_with_space(): + clean_models() + client = utilities.register_client() + req = add_models(client, "testuser") + model_id = list(req[1]['models'])[0] + req_update = update_model(client, " ", model_id) + assert req_update[0].status_code == 400 + assert req_update[1]["message"] == '400: Model Unknown' + + +def test_update_model_with_empty_name(): + clean_models() + client = utilities.register_client() + req = add_models(client, "testuser") + model_id = list(req[1]['models'])[0] + req_update = update_model(client, "", model_id) + assert req_update[0].status_code == 400 + assert req_update[1]['message'] == '400: Model Unknown' + + +def test_update_meta_rules_without_id(): + clean_models() + client = utilities.register_client() + req_update = update_model(client, "testuser", "") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)["message"] == "400: Model Unknown" + + +def test_update_meta_rules_without_name(): + client = utilities.register_client() + req_update = update_model(client, "<a></a>", "1234567") + assert req_update[0].status_code == 400 + assert json.loads(req_update[0].data)[ + "message"] == "Key: 'name', [Forbidden characters in string]" + + +def test_update_meta_rules_without_meta_rules(): + value = { + "name": "mls_model_id" + uuid4().hex, + "description": "test", + "meta_rules": [] + } + model = model_helper.add_model(value=value) + model_id = list(model.keys())[0] + client = utilities.register_client() + req_update = update_model_without_meta_rules_ids(client, model_id) + assert req_update[0].status_code == 200 diff --git a/old/moon_manager/tests/unit_python/api/utilities.py b/old/moon_manager/tests/unit_python/api/utilities.py new file mode 100644 index 00000000..2e51fec8 --- /dev/null +++ b/old/moon_manager/tests/unit_python/api/utilities.py @@ -0,0 +1,26 @@ +import json +from uuid import uuid4 + +def get_json(data): + return json.loads(data.decode("utf-8")) + + +def register_client(): + import moon_manager.server + server = moon_manager.server.create_server() + client = server.app.test_client() + return client + + +def get_policy_id(): + from helpers import policy_helper + value = { + "name": "test_policy"+uuid4().hex, + "model_id": "", + "genre": "authz", + "description": "test", + } + policy_helper.add_policies(value=value) + req = policy_helper.get_policies() + policy_id = list(req.keys())[0] + return policy_id diff --git a/old/moon_manager/tests/unit_python/conftest.py b/old/moon_manager/tests/unit_python/conftest.py new file mode 100644 index 00000000..90a27e54 --- /dev/null +++ b/old/moon_manager/tests/unit_python/conftest.py @@ -0,0 +1,254 @@ +import base64 +import json +import logging +import pytest +import requests_mock + +CONF = { + "openstack": { + "keystone": { + "url": "http://keystone:5000/v3", + "user": "admin", + "check_token": False, + "password": "p4ssw0rd", + "domain": "default", + "certificate": False, + "project": "admin" + } + }, + "components": { + "wrapper": { + "bind": "0.0.0.0", + "port": 8080, + "container": "wukongsun/moon_wrapper:v4.3", + "timeout": 5, + "hostname": "wrapper" + }, + "manager": { + "bind": "0.0.0.0", + "port": 8082, + "container": "wukongsun/moon_manager:v4.3", + "hostname": "manager" + }, + "port_start": 31001, + "orchestrator": { + "bind": "0.0.0.0", + "port": 8083, + "container": "wukongsun/moon_orchestrator:v4.3", + "hostname": "orchestrator" + }, + "pipeline": { + "interface": { + "bind": "0.0.0.0", + "port": 8080, + "container": "wukongsun/moon_interface:v4.3", + "hostname": "interface" + }, + "authz": { + "bind": "0.0.0.0", + "port": 8081, + "container": "wukongsun/moon_authz:v4.3", + "hostname": "authz" + }, + } + }, + "logging": { + "handlers": { + "file": { + "filename": "/tmp/moon.log", + "class": "logging.handlers.RotatingFileHandler", + "level": "DEBUG", + "formatter": "custom", + "backupCount": 3, + "maxBytes": 1048576 + }, + "console": { + "class": "logging.StreamHandler", + "formatter": "brief", + "level": "INFO", + "stream": "ext://sys.stdout" + } + }, + "formatters": { + "brief": { + "format": "%(levelname)s %(name)s %(message)-30s" + }, + "custom": { + "format": "%(asctime)-15s %(levelname)s %(name)s %(message)s" + } + }, + "root": { + "handlers": [ + "console" + ], + "level": "ERROR" + }, + "version": 1, + "loggers": { + "moon": { + "handlers": [ + "console", + "file" + ], + "propagate": False, + "level": "DEBUG" + } + } + }, + "slave": { + "name": None, + "master": { + "url": None, + "login": None, + "password": None + } + }, + "docker": { + "url": "tcp://172.88.88.1:2376", + "network": "moon" + }, + "database": { + "url": "sqlite:///database.db", + # "url": "mysql+pymysql://moon:p4sswOrd1@db/moon", + "driver": "sql" + }, + "messenger": { + "url": "rabbit://moon:p4sswOrd1@messenger:5672/moon" + }, +} + +COMPONENTS = ( + "logging", + "openstack/keystone", + "database", + "slave", + "components/manager", + "components/orchestrator" +) + +PODS = { + "pods": { + "721760dd-de5f-11e7-8001-3863bbb766f3": [ + { + "pdp_id": "b3d3e18abf3340e8b635fd49e6634ccd", + "port": 8080, + "genre": "interface", + "name": "interface-paltry", + "keystone_project_id": "a64beb1cc224474fb4badd43173e7101", + "namespace": "moon", + "container": "wukongsun/moon_interface:v4.3" + }, + { + "pdp_id": "b3d3e18abf3340e8b635fd49e6634ccd", + "meta_rule_id": "f8f49a779ceb47b3ac810f01ef71b4e0", + "port": 8081, + "genre": "authz", + "name": "authz-economic", + "policy_id": "f8f49a779ceb47b3ac810f01ef71b4e0", + "keystone_project_id": "a64beb1cc224474fb4badd43173e7101", + "namespace": "moon", + "container": "wukongsun/moon_authz:v4.3" + } + ] + } +} + +SLAVES = { + "slaves": [ + { + "context": + { + "cluster": "kubernetes", + "user": "kubernetes-admin" + }, + "name": "kubernetes-admin@kubernetes", + "configured": True, + "wrapper_name": "mywrapper", + "ip": "NC", + "port": 31002, + "internal_port": 8080 + } + ] +} + + +def get_b64_conf(component=None): + if component in CONF: + return base64.b64encode( + json.dumps( + CONF[component]).encode('utf-8') + b"\n").decode('utf-8') + elif "/" in component: + key1, _, key2 = component.partition("/") + return base64.b64encode( + json.dumps( + CONF[key1][key2]).encode('utf-8') + b"\n").decode('utf-8') + else: + return base64.b64encode( + json.dumps(CONF).encode('utf-8') + b"\n").decode('utf-8') + + +@pytest.fixture(autouse=True) +def no_requests(monkeypatch): + """ Modify the response from Requests module + """ + with requests_mock.Mocker(real_http=True) as m: + for component in COMPONENTS: + m.register_uri( + 'GET', 'http://consul:8500/v1/kv/{}'.format(component), + json=[{'Key': component, 'Value': get_b64_conf(component)}] + ) + m.register_uri( + 'POST', 'http://keystone:5000/v3/auth/tokens', + headers={'X-Subject-Token': "111111111"} + ) + m.register_uri( + 'DELETE', 'http://keystone:5000/v3/auth/tokens', + headers={'X-Subject-Token': "111111111"} + ) + + def match_request_text(request): + # request.url may be None, or '' prevents a TypeError. + return 'http://keystone:5000/v3/users?name=testuser' in request.url + + m.register_uri( + requests_mock.ANY, '/v3/users', + additional_matcher=match_request_text, + json={"users": {}} + ) + m.register_uri( + 'POST', 'http://keystone:5000/v3/users/', + json={"users": [{"id": "1111111111111"}]} + ) + m.register_uri( + 'POST', 'http://orchestrator:8083/pods', + json=PODS, + headers={"content-type": "application/json"} + ) + m.register_uri( + 'GET', 'http://orchestrator:8083/pods', + json=PODS + ) + m.register_uri( + 'GET', 'http://localhost/slaves', + json=SLAVES + ) + m.register_uri( + 'DELETE', 'http://orchestrator:8083/pods/{}'.format(list([PODS['pods'].keys()])[0]), + headers={"content-type": "application/json"} + ) + + print("Start populating the DB.") + from python_moondb.db_manager import init_engine, main + engine = init_engine() + print("engine={}".format(engine)) + main("upgrade", logging.getLogger("db_manager"), engine) + print("End populating the DB.") + yield m + +# @pytest.fixture(autouse=True, scope="session") +# def manage_database(): +# from moon_db.db_manager import init_engine, run +# engine = init_engine() +# run("upgrade", logging.getLogger("db_manager"), engine) +# yield +# print("Will close the DB") diff --git a/old/moon_manager/tests/unit_python/helpers/__init__.py b/old/moon_manager/tests/unit_python/helpers/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/__init__.py diff --git a/old/moon_manager/tests/unit_python/helpers/assignment_helper.py b/old/moon_manager/tests/unit_python/helpers/assignment_helper.py new file mode 100644 index 00000000..22a56e38 --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/assignment_helper.py @@ -0,0 +1,49 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +def get_action_assignments(policy_id, action_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_action_assignments("", policy_id, action_id, category_id) + + +def add_action_assignment(policy_id, action_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id) + + +def delete_action_assignment(policy_id, action_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id) + + +def get_object_assignments(policy_id, object_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_object_assignments("", policy_id, object_id, category_id) + + +def add_object_assignment(policy_id, object_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id) + + +def delete_object_assignment(policy_id, object_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id) + + +def get_subject_assignments(policy_id, subject_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id) + + +def add_subject_assignment(policy_id, subject_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id) + + +def delete_subject_assignment(policy_id, subject_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id) + diff --git a/old/moon_manager/tests/unit_python/helpers/category_helper.py b/old/moon_manager/tests/unit_python/helpers/category_helper.py new file mode 100644 index 00000000..6c419ca8 --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/category_helper.py @@ -0,0 +1,40 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + + +def add_subject_category(cat_id=None, value=None): + from python_moondb.core import ModelManager + category = ModelManager.add_subject_category(user_id=None, category_id=cat_id, value=value) + return category + + +def get_subject_category(cat_id=None): + from python_moondb.core import ModelManager + category = ModelManager.get_subject_categories(user_id=None, category_id=cat_id) + return category + + +def add_object_category(cat_id=None, value=None): + from python_moondb.core import ModelManager + category = ModelManager.add_object_category(user_id=None, category_id=cat_id, value=value) + return category + + +def get_object_category(cat_id=None): + from python_moondb.core import ModelManager + category = ModelManager.get_object_categories(user_id=None, category_id=cat_id) + return category + + +def add_action_category(cat_id=None, value=None): + from python_moondb.core import ModelManager + category = ModelManager.add_action_category(user_id=None, category_id=cat_id, value=value) + return category + + +def get_action_category(cat_id=None): + from python_moondb.core import ModelManager + category = ModelManager.get_action_categories(user_id=None, category_id=cat_id) + return category diff --git a/old/moon_manager/tests/unit_python/helpers/data_builder.py b/old/moon_manager/tests/unit_python/helpers/data_builder.py new file mode 100644 index 00000000..91808cbe --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/data_builder.py @@ -0,0 +1,260 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from .category_helper import * +from .policy_helper import * +from .data_helper import * +from helpers import model_helper +from .meta_rule_helper import * +import api.utilities as utilities +import json +from uuid import uuid4 + + +def create_subject_category(name): + subject_category = add_subject_category( + value={"name": name + uuid4().hex, "description": "description 1"}) + return list(subject_category.keys())[0] + + +def create_object_category(name): + object_category = add_object_category( + value={"name": name + uuid4().hex, "description": "description 1"}) + return list(object_category.keys())[0] + + +def create_action_category(name): + action_category = add_action_category( + value={"name": name + uuid4().hex, "description": "description 1"}) + return list(action_category.keys())[0] + + +def create_model(meta_rule_id, model_name="test_model"): + value = { + "name": model_name + uuid4().hex, + "description": "test", + "meta_rules": [meta_rule_id] + + } + return value + + +def create_policy(model_id, policy_name="policy_1"): + value = { + "name": policy_name, + "model_id": model_id, + "genre": "authz", + "description": "test", + } + return value + + +def create_pdp(policies_ids): + value = { + "name": "test_pdp", + "security_pipeline": policies_ids, + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + return value + + +def create_new_policy(subject_category_name=None, object_category_name=None, + action_category_name=None, model_name=None, policy_name=None, + meta_rule_name=None): + if not subject_category_name: + subject_category_name = "subjectCategory_" + uuid4().hex + if not object_category_name: + object_category_name = "objectCategory_" + uuid4().hex + if not action_category_name: + action_category_name = "actionCategory_" + uuid4().hex + + if not meta_rule_name: + meta_rule_name = "meta_rule_" + uuid4().hex + + if not model_name: + model_name = "model_name_" + uuid4().hex + if not policy_name: + policy_name = "policy_name_" + uuid4().hex + + subject_category_id, object_category_id, action_category_id, meta_rule_id = create_new_meta_rule( + subject_category_name=subject_category_name + uuid4().hex, + object_category_name=object_category_name + uuid4().hex, + action_category_name=action_category_name + uuid4().hex, + meta_rule_name=meta_rule_name + uuid4().hex + ) + + model = model_helper.add_model(value=create_model(meta_rule_id, model_name + uuid4().hex)) + model_id = list(model.keys())[0] + value = create_policy(model_id, policy_name + uuid4().hex) + policy = add_policies(value=value) + assert policy + policy_id = list(policy.keys())[0] + return subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id + + +def create_new_meta_rule(subject_category_name=None, object_category_name=None, + action_category_name=None, meta_rule_name=None): + if not subject_category_name: + subject_category_name = "subjectCategory_" + uuid4().hex + if not object_category_name: + object_category_name = "objectCategory_" + uuid4().hex + if not action_category_name: + action_category_name = "actionCategory_" + uuid4().hex + + if not meta_rule_name: + meta_rule_name = "meta_rule_" + uuid4().hex + + subject_category_id = create_subject_category(subject_category_name) + object_category_id = create_object_category(object_category_name) + action_category_id = create_action_category(action_category_name) + value = {"name": meta_rule_name, + "description": "name of the meta rule algorithm", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + meta_rule = add_meta_rule(value=value) + return subject_category_id, object_category_id, action_category_id, list(meta_rule.keys())[0] + + +def create_subject(policy_id): + value = { + "name": "testuser" + uuid4().hex, + "description": "test", + } + subject = add_subject(policy_id=policy_id, value=value) + return list(subject.keys())[0] + + +def create_object(policy_id): + value = { + "name": "testobject" + uuid4().hex, + "description": "test", + } + object = add_object(policy_id=policy_id, value=value) + return list(object.keys())[0] + + +def create_action(policy_id): + value = { + "name": "testaction" + uuid4().hex, + "description": "test", + } + action = add_action(policy_id=policy_id, value=value) + return list(action.keys())[0] + + +def create_subject_data(policy_id, category_id): + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + subject_data = add_subject_data(policy_id=policy_id, category_id=category_id, value=value).get( + 'data') + assert subject_data + return list(subject_data.keys())[0] + + +def create_object_data(policy_id, category_id): + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + object_data = add_object_data(policy_id=policy_id, category_id=category_id, value=value).get( + 'data') + return list(object_data.keys())[0] + + +def create_action_data(policy_id, category_id): + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + action_data = add_action_data(policy_id=policy_id, category_id=category_id, value=value).get( + 'data') + return list(action_data.keys())[0] + + +def get_policy_id_with_subject_assignment(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + subject_id = create_subject(policy_id) + data_id = create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + data = { + "id": subject_id, + "category_id": subject_category_id, + "data_id": data_id + } + client.post("/policies/{}/subject_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + return policy_id + + +def get_policy_id_with_object_assignment(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + object_id = create_object(policy_id) + data_id = create_object_data(policy_id=policy_id, category_id=object_category_id) + + data = { + "id": object_id, + "category_id": object_category_id, + "data_id": data_id + } + + client.post("/policies/{}/object_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + return policy_id + + +def get_policy_id_with_action_assignment(): + client = utilities.register_client() + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = create_new_policy( + subject_category_name="subject_category1" + uuid4().hex, + object_category_name="object_category1" + uuid4().hex, + action_category_name="action_category1" + uuid4().hex, + meta_rule_name="meta_rule_1" + uuid4().hex) + action_id = create_action(policy_id) + data_id = create_action_data(policy_id=policy_id, category_id=action_category_id) + + data = { + "id": action_id, + "category_id": action_category_id, + "data_id": data_id + } + client.post("/policies/{}/action_assignments".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + return policy_id + + +def add_rules(client): + sub_id, obj_id, act_id, meta_rule_id, policy_id = create_new_policy("sub_cat" + uuid4().hex, + "obj_cat" + uuid4().hex, + "act_cat" + uuid4().hex) + sub_data_id = create_subject_data(policy_id, sub_id) + obj_data_id = create_object_data(policy_id, obj_id) + act_data_id = create_action_data(policy_id, act_id) + data = { + "meta_rule_id": meta_rule_id, + "rule": [sub_data_id, obj_data_id, act_data_id], + "instructions": ( + {"decision": "grant"}, + ), + "enabled": True + } + req = client.post("/policies/{}/rules".format(policy_id), data=json.dumps(data), + headers={'Content-Type': 'application/json'}) + rules = utilities.get_json(req.data) + return req, rules, policy_id diff --git a/old/moon_manager/tests/unit_python/helpers/data_helper.py b/old/moon_manager/tests/unit_python/helpers/data_helper.py new file mode 100644 index 00000000..e1c05640 --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/data_helper.py @@ -0,0 +1,99 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + + +def get_action_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_action_data("", policy_id, data_id, category_id) + + +def add_action_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_action_data("", policy_id, data_id, category_id, value) + + +def delete_action_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action_data("", policy_id=policy_id, data_id=data_id) + + +def get_object_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_object_data("", policy_id, data_id, category_id) + + +def add_object_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_object_data("", policy_id, data_id, category_id, value) + + +def delete_object_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object_data("", policy_id=policy_id, data_id=data_id) + + +def get_subject_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subject_data("", policy_id, data_id, category_id) + + +def add_subject_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value) + + +def delete_subject_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject_data("", policy_id=policy_id, data_id=data_id) + + +def get_actions(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_actions("", policy_id, perimeter_id) + + +def add_action(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_action("", policy_id, perimeter_id, value) + + +def delete_action(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action("", policy_id, perimeter_id) + + +def get_objects(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_objects("", policy_id, perimeter_id) + + +def add_object(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_object("", policy_id, perimeter_id, value) + + +def delete_object(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object("", policy_id, perimeter_id) + + +def get_subjects(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subjects("", policy_id, perimeter_id) + + +def add_subject(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_subject("", policy_id, perimeter_id, value) + + +def delete_subject(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject("", policy_id, perimeter_id) + + +def get_available_metadata(policy_id): + from python_moondb.core import PolicyManager + return PolicyManager.get_available_metadata("", policy_id) diff --git a/old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py b/old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py new file mode 100644 index 00000000..e882706b --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/meta_rule_helper.py @@ -0,0 +1,49 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from helpers import data_builder as builder +from uuid import uuid4 + + +def set_meta_rule(meta_rule_id, value=None): + from python_moondb.core import ModelManager + if not value: + action_category_id = builder.create_action_category("action_category_id1"+uuid4().hex) + subject_category_id = builder.create_subject_category("subject_category_id1"+uuid4().hex) + object_category_id = builder.create_object_category("object_category_id1"+uuid4().hex) + value = { + "name": "MLS_meta_rule", + "description": "test", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + return ModelManager.set_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value) + + +def add_meta_rule(meta_rule_id=None, value=None): + from python_moondb.core import ModelManager + if not value: + action_category_id = builder.create_action_category("action_category_id1"+uuid4().hex) + subject_category_id = builder.create_subject_category("subject_category_id1"+uuid4().hex) + object_category_id = builder.create_object_category("object_category_id1"+uuid4().hex) + value = { + "name": "MLS_meta_rule"+uuid4().hex, + "description": "test", + "subject_categories": [subject_category_id], + "object_categories": [object_category_id], + "action_categories": [action_category_id] + } + return ModelManager.add_meta_rule(user_id=None, meta_rule_id=meta_rule_id, value=value) + + +def get_meta_rules(meta_rule_id=None): + from python_moondb.core import ModelManager + return ModelManager.get_meta_rules(user_id=None, meta_rule_id=meta_rule_id) + + +def delete_meta_rules(meta_rule_id=None): + from python_moondb.core import ModelManager + ModelManager.delete_meta_rule(user_id=None, meta_rule_id=meta_rule_id) diff --git a/old/moon_manager/tests/unit_python/helpers/model_helper.py b/old/moon_manager/tests/unit_python/helpers/model_helper.py new file mode 100644 index 00000000..73808e03 --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/model_helper.py @@ -0,0 +1,48 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from helpers import data_builder as builder +from uuid import uuid4 + + +def get_models(model_id=None): + from python_moondb.core import ModelManager + return ModelManager.get_models(user_id=None, model_id=model_id) + + +def add_model(model_id=None, value=None): + from python_moondb.core import ModelManager + if not value: + subject_category_id, object_category_id, action_category_id, meta_rule_id = builder.create_new_meta_rule() + name = "MLS"+uuid4().hex if model_id is None else "MLS " + model_id + value = { + "name": name, + "description": "test", + "meta_rules": [meta_rule_id] + } + return ModelManager.add_model(user_id=None, model_id=model_id, value=value) + + +def delete_models(uuid=None, name=None): + from python_moondb.core import ModelManager + if not uuid: + for model_id, model_value in get_models(): + if name == model_value['name']: + uuid = model_id + break + ModelManager.delete_model(user_id=None, model_id=uuid) + + +def delete_all_models(): + from python_moondb.core import ModelManager + models_values = get_models() + print(models_values) + for model_id, model_value in models_values.items(): + ModelManager.delete_model(user_id=None, model_id=model_id) + + +def update_model(model_id=None, value=None): + from python_moondb.core import ModelManager + return ModelManager.update_model(user_id=None, model_id=model_id, value=value) diff --git a/old/moon_manager/tests/unit_python/helpers/pdp_helper.py b/old/moon_manager/tests/unit_python/helpers/pdp_helper.py new file mode 100644 index 00000000..3d169b06 --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/pdp_helper.py @@ -0,0 +1,23 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +def update_pdp(pdp_id, value): + from python_moondb.core import PDPManager + return PDPManager.update_pdp("", pdp_id, value) + + +def delete_pdp(pdp_id): + from python_moondb.core import PDPManager + PDPManager.delete_pdp("", pdp_id) + + +def add_pdp(pdp_id=None, value=None): + from python_moondb.core import PDPManager + return PDPManager.add_pdp("", pdp_id, value) + + +def get_pdp(pdp_id=None): + from python_moondb.core import PDPManager + return PDPManager.get_pdp("", pdp_id) diff --git a/old/moon_manager/tests/unit_python/helpers/policy_helper.py b/old/moon_manager/tests/unit_python/helpers/policy_helper.py new file mode 100644 index 00000000..eddd0b8d --- /dev/null +++ b/old/moon_manager/tests/unit_python/helpers/policy_helper.py @@ -0,0 +1,63 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +from uuid import uuid4 + +def get_policies(): + from python_moondb.core import PolicyManager + return PolicyManager.get_policies("admin") + + +def add_policies(policy_id=None, value=None): + from python_moondb.core import PolicyManager + if not value: + value = { + "name": "test_policy"+ uuid4().hex, + "model_id": "", + "genre": "authz", + "description": "test", + } + return PolicyManager.add_policy("admin", policy_id=policy_id, value=value) + + +def delete_policies(uuid=None, name=None): + from python_moondb.core import PolicyManager + if not uuid: + for policy_id, policy_value in get_policies(): + if name == policy_value['name']: + uuid = policy_id + break + PolicyManager.delete_policy("admin", uuid) + + +def update_policy(policy_id, value): + from python_moondb.core import PolicyManager + return PolicyManager.update_policy("admin", policy_id, value) + + +def get_policy_from_meta_rules(meta_rule_id): + from python_moondb.core import PolicyManager + return PolicyManager.get_policy_from_meta_rules("admin", meta_rule_id) + + +def get_rules(policy_id=None, meta_rule_id=None, rule_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id) + + +def add_rule(policy_id=None, meta_rule_id=None, value=None): + from python_moondb.core import PolicyManager + if not value: + value = { + "rule": ("high", "medium", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + return PolicyManager.add_rule("", policy_id, meta_rule_id, value) + + +def delete_rule(policy_id=None, rule_id=None): + from python_moondb.core import PolicyManager + PolicyManager.delete_rule("", policy_id, rule_id) diff --git a/old/moon_manager/tests/unit_python/requirements.txt b/old/moon_manager/tests/unit_python/requirements.txt new file mode 100644 index 00000000..d6f190e4 --- /dev/null +++ b/old/moon_manager/tests/unit_python/requirements.txt @@ -0,0 +1,5 @@ +flask +flask_cors +flask_restful +python_moondb==1.2.20 +python_moonutilities==1.4.20 |