diff options
6 files changed, 964 insertions, 77 deletions
diff --git a/python_moondb/tests/unit_python/policies/__init__.py b/python_moondb/tests/unit_python/policies/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/python_moondb/tests/unit_python/policies/__init__.py diff --git a/python_moondb/tests/unit_python/policies/mock_data.py b/python_moondb/tests/unit_python/policies/mock_data.py new file mode 100644 index 00000000..b2642979 --- /dev/null +++ b/python_moondb/tests/unit_python/policies/mock_data.py @@ -0,0 +1,45 @@ +def create_meta_rule(): + meta_rule_value = { + "name": "meta_rule1", + "algorithm": "name of the meta rule algorithm", + "subject_categories": ["subject_category_id1", + "subject_category_id2"], + "object_categories": ["object_category_id1"], + "action_categories": ["action_category_id1"] + } + return meta_rule_value + + +def create_model(meta_rule_id): + value = { + "name": "test_model", + "description": "test", + "meta_rules": [meta_rule_id] + + } + return value + + +def create_policy(model_id): + value = { + "name": "policy_1", + "model_id": model_id, + "genre": "authz", + "description": "test", + } + return value + + +def get_policy_id(): + import policies.test_policies as test_policies + import models.test_models as test_models + import models.test_meta_rules as test_meta_rules + meta_rule = test_meta_rules.add_meta_rule(value=create_meta_rule()) + meta_rule_id = list(meta_rule.keys())[0] + model = test_models.add_model(value=create_model(meta_rule_id)) + model_id = list(model.keys())[0] + value = create_policy(model_id) + policy = test_policies.add_policies(value) + assert policy + policy_id = list(policy.keys())[0] + return policy_id diff --git a/python_moondb/tests/unit_python/policies/test_assignments.py b/python_moondb/tests/unit_python/policies/test_assignments.py new file mode 100755 index 00000000..ccac205a --- /dev/null +++ b/python_moondb/tests/unit_python/policies/test_assignments.py @@ -0,0 +1,245 @@ +def get_action_assignments(policy_id, action_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_action_assignments("", policy_id, action_id, category_id) + + +def add_action_assignment(policy_id, action_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_action_assignment("", policy_id, action_id, category_id, data_id) + + +def delete_action_assignment(policy_id, action_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action_assignment("", policy_id, action_id, category_id, data_id) + + +def get_object_assignments(policy_id, object_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_object_assignments("", policy_id, object_id, category_id) + + +def add_object_assignment(policy_id, object_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_object_assignment("", policy_id, object_id, category_id, data_id) + + +def delete_object_assignment(policy_id, object_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object_assignment("", policy_id, object_id, category_id, data_id) + + +def get_subject_assignments(policy_id, subject_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subject_assignments("", policy_id, subject_id, category_id) + + +def add_subject_assignment(policy_id, subject_id, category_id, data_id): + from python_moondb.core import PolicyManager + return PolicyManager.add_subject_assignment("", policy_id, subject_id, category_id, data_id) + + +def delete_subject_assignment(policy_id, subject_id, category_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject_assignment("", policy_id, subject_id, category_id, data_id) + + +def test_get_action_assignments(db): + policy_id = "admin" + action_id = "action_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + add_action_assignment(policy_id, action_id, category_id, data_id) + act_assignments = get_action_assignments(policy_id, action_id, category_id) + action_id_1 = list(act_assignments.keys())[0] + assert act_assignments[action_id_1]["policy_id"] == policy_id + assert act_assignments[action_id_1]["action_id"] == action_id + assert act_assignments[action_id_1]["category_id"] == category_id + assert len(act_assignments[action_id_1].get("assignments")) == 1 + assert data_id in act_assignments[action_id_1].get("assignments") + + +def test_get_action_assignments_by_policy_id(db): + policy_id = "admin" + action_id = "action_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + add_action_assignment(policy_id, action_id, category_id, data_id) + data_id = "data_id_2" + add_action_assignment(policy_id, action_id, category_id, data_id) + data_id = "data_id_3" + add_action_assignment(policy_id, action_id, category_id, data_id) + act_assignments = get_action_assignments(policy_id) + action_id_1 = list(act_assignments.keys())[0] + assert act_assignments[action_id_1]["policy_id"] == policy_id + assert act_assignments[action_id_1]["action_id"] == action_id + assert act_assignments[action_id_1]["category_id"] == category_id + assert len(act_assignments[action_id_1].get("assignments")) == 3 + + +def test_add_action_assignments(db): + policy_id = "admin" + action_id = "action_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + action_assignments = add_action_assignment(policy_id, action_id, category_id, data_id) + assert action_assignments + action_id_1 = list(action_assignments.keys())[0] + assert action_assignments[action_id_1]["policy_id"] == policy_id + assert action_assignments[action_id_1]["action_id"] == action_id + assert action_assignments[action_id_1]["category_id"] == category_id + assert len(action_assignments[action_id_1].get("assignments")) == 1 + assert data_id in action_assignments[action_id_1].get("assignments") + + +def test_delete_action_assignment(db): + policy_id = "admin_1" + add_action_assignment(policy_id, "", "", "") + policy_id = "admin_2" + action_id = "action_id_2" + category_id = "category_id_2" + data_id = "data_id_2" + add_action_assignment(policy_id, action_id, category_id, data_id) + delete_action_assignment(policy_id, "", "", "") + assignments = get_action_assignments(policy_id, ) + assert len(assignments) == 1 + + +def test_delete_action_assignment_with_invalid_policy_id(db): + policy_id = "invalid_id" + delete_action_assignment(policy_id, "", "", "") + assignments = get_action_assignments(policy_id, ) + assert len(assignments) == 0 + + +def test_get_object_assignments(db): + policy_id = "admin" + object_id = "object_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + add_object_assignment(policy_id, object_id, category_id, data_id) + obj_assignments = get_object_assignments(policy_id, object_id, category_id) + object_id_1 = list(obj_assignments.keys())[0] + assert obj_assignments[object_id_1]["policy_id"] == policy_id + assert obj_assignments[object_id_1]["object_id"] == object_id + assert obj_assignments[object_id_1]["category_id"] == category_id + assert len(obj_assignments[object_id_1].get("assignments")) == 1 + assert data_id in obj_assignments[object_id_1].get("assignments") + + +def test_get_object_assignments_by_policy_id(db): + policy_id = "admin" + object_id_1 = "object_id_1" + category_id_1 = "category_id_1" + data_id = "data_id_1" + add_action_assignment(policy_id, object_id_1, category_id_1, data_id) + object_id_2 = "object_id_2" + category_id_2 = "category_id_2" + data_id = "data_id_2" + add_action_assignment(policy_id, object_id_2, category_id_2, data_id) + object_id_3 = "object_id_3" + category_id_3 = "category_id_3" + data_id = "data_id_3" + add_action_assignment(policy_id, object_id_3, category_id_3, data_id) + act_assignments = get_action_assignments(policy_id) + assert len(act_assignments) == 3 + + +def test_add_object_assignments(db): + policy_id = "admin" + object_id = "object_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + object_assignments = add_object_assignment(policy_id, object_id, category_id, data_id) + assert object_assignments + object_id_1 = list(object_assignments.keys())[0] + assert object_assignments[object_id_1]["policy_id"] == policy_id + assert object_assignments[object_id_1]["object_id"] == object_id + assert object_assignments[object_id_1]["category_id"] == category_id + assert len(object_assignments[object_id_1].get("assignments")) == 1 + assert data_id in object_assignments[object_id_1].get("assignments") + + +def test_delete_object_assignment(db): + policy_id = "admin_1" + add_object_assignment(policy_id, "", "", "") + object_id = "action_id_2" + category_id = "category_id_2" + data_id = "data_id_2" + add_object_assignment(policy_id, object_id, category_id, data_id) + delete_object_assignment(policy_id, "", "", "") + assignments = get_object_assignments(policy_id, ) + assert len(assignments) == 1 + + +def test_delete_object_assignment_with_invalid_policy_id(db): + policy_id = "invalid_id" + delete_object_assignment(policy_id, "", "", "") + assignments = get_object_assignments(policy_id, ) + assert len(assignments) == 0 + + +def test_get_subject_assignments(db): + policy_id = "admin" + subject_id = "object_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + add_subject_assignment(policy_id, subject_id, category_id, data_id) + subj_assignments = get_subject_assignments(policy_id, subject_id, category_id) + subject_id_1 = list(subj_assignments.keys())[0] + assert subj_assignments[subject_id_1]["policy_id"] == policy_id + assert subj_assignments[subject_id_1]["subject_id"] == subject_id + assert subj_assignments[subject_id_1]["category_id"] == category_id + assert len(subj_assignments[subject_id_1].get("assignments")) == 1 + assert data_id in subj_assignments[subject_id_1].get("assignments") + + +def test_get_subject_assignments_by_policy_id(db): + policy_id = "admin" + subject_id_1 = "subject_id_1" + category_id_1 = "category_id_1" + data_id = "data_id_1" + add_subject_assignment(policy_id, subject_id_1, category_id_1, data_id) + subject_id_2 = "subject_id_2" + category_id_2 = "category_id_2" + data_id = "data_id_2" + add_subject_assignment(policy_id, subject_id_2, category_id_2, data_id) + subject_id_3 = "subject_id_3" + category_id_3 = "category_id_3" + data_id = "data_id_3" + add_subject_assignment(policy_id, subject_id_3, category_id_3, data_id) + subj_assignments = get_subject_assignments(policy_id) + assert len(subj_assignments) == 3 + + +def test_add_subject_assignments(db): + policy_id = "admin" + subject_id = "subject_id_1" + category_id = "category_id_1" + data_id = "data_id_1" + subject_assignments = add_subject_assignment(policy_id, subject_id, category_id, data_id) + assert subject_assignments + subject_id_1 = list(subject_assignments.keys())[0] + assert subject_assignments[subject_id_1]["policy_id"] == policy_id + assert subject_assignments[subject_id_1]["subject_id"] == subject_id + assert subject_assignments[subject_id_1]["category_id"] == category_id + assert len(subject_assignments[subject_id_1].get("assignments")) == 1 + assert data_id in subject_assignments[subject_id_1].get("assignments") + + +def test_delete_subject_assignment(db): + policy_id = "admin_1" + add_subject_assignment(policy_id, "", "", "") + subject_id = "subject_id_2" + category_id = "category_id_2" + data_id = "data_id_2" + add_subject_assignment(policy_id, subject_id, category_id, data_id) + delete_subject_assignment(policy_id, "", "", "") + assignments = get_subject_assignments(policy_id, ) + assert len(assignments) == 1 + + +def test_delete_subject_assignment_with_invalid_policy_id(db): + policy_id = "invalid_id" + delete_subject_assignment(policy_id, "", "", "") + assignments = get_subject_assignments(policy_id, ) + assert len(assignments) == 0 diff --git a/python_moondb/tests/unit_python/policies/test_data.py b/python_moondb/tests/unit_python/policies/test_data.py new file mode 100755 index 00000000..68b1d2a0 --- /dev/null +++ b/python_moondb/tests/unit_python/policies/test_data.py @@ -0,0 +1,513 @@ +import policies.mock_data as mock_data +import pytest + + +def get_action_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_action_data("", policy_id, data_id, category_id) + + +def add_action_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_action_data("", policy_id, data_id, category_id, value) + + +def delete_action_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action_data("", policy_id, data_id) + + +def get_object_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_object_data("", policy_id, data_id, category_id) + + +def add_object_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_object_data("", policy_id, data_id, category_id, value) + + +def delete_object_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object_data("", policy_id, data_id) + + +def get_subject_data(policy_id, data_id=None, category_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subject_data("", policy_id, data_id, category_id) + + +def add_subject_data(policy_id, data_id=None, category_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.set_subject_data("", policy_id, data_id, category_id, value) + + +def delete_subject_data(policy_id, data_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject_data("", policy_id, data_id) + + +def get_actions(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_actions("", policy_id, perimeter_id) + + +def add_action(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_action("", policy_id, perimeter_id, value) + + +def delete_action(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_action("", policy_id, perimeter_id) + + +def get_objects(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_objects("", policy_id, perimeter_id) + + +def add_object(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_object("", policy_id, perimeter_id, value) + + +def delete_object(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_object("", policy_id, perimeter_id) + + +def get_subjects(policy_id, perimeter_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_subjects("", policy_id, perimeter_id) + + +def add_subject(policy_id, perimeter_id=None, value=None): + from python_moondb.core import PolicyManager + return PolicyManager.add_subject("", policy_id, perimeter_id, value) + + +def delete_subject(policy_id, perimeter_id): + from python_moondb.core import PolicyManager + PolicyManager.delete_subject("", policy_id, perimeter_id) + + +def get_available_metadata(policy_id): + from python_moondb.core import PolicyManager + return PolicyManager.get_available_metadata("", policy_id) + + +def test_get_action_data(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + + policy_id = policy_id + data_id = "data_id_1" + category_id = "action_category_id1" + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + add_action_data(policy_id, data_id, category_id, value) + action_data = get_action_data(policy_id, data_id, category_id) + assert action_data + assert len(action_data[0]['data']) == 1 + + +def test_get_action_data_with_invalid_category_id(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + + policy_id = policy_id + data_id = "data_id_1" + category_id = "action_category_id1" + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + add_action_data(policy_id, data_id, category_id, value) + action_data = get_action_data(policy_id) + assert action_data + assert len(action_data[0]['data']) == 1 + + +def test_add_action_data(db): + policy_id = "policy_id_1" + data_id = "data_id_1" + category_id = "category_id_1" + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + action_data = add_action_data(policy_id, data_id, category_id, value).get('data') + assert action_data + action_data_id = list(action_data.keys())[0] + assert action_data[action_data_id].get('policy_id') == policy_id + + +def test_delete_action_data(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + data_id = "data_id_1" + category_id = "category_id_1" + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + action_data = add_action_data(policy_id, data_id, category_id, value).get('data') + action_data_id = list(action_data.keys())[0] + delete_action_data(action_data[action_data_id].get('policy_id'), None) + new_action_data = get_action_data(policy_id) + assert len(new_action_data[0]['data']) == 0 + + +def test_get_object_data(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + + policy_id = policy_id + data_id = "data_id_1" + category_id = "object_category_id1" + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + add_object_data(policy_id, data_id, category_id, value) + object_data = get_object_data(policy_id, data_id, category_id) + assert object_data + assert len(object_data[0]['data']) == 1 + + +def test_get_object_data_with_invalid_category_id(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + + policy_id = policy_id + data_id = "data_id_1" + category_id = "object_category_id1" + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + add_object_data(policy_id, data_id, category_id, value) + object_data = get_object_data(policy_id) + assert object_data + assert len(object_data[0]['data']) == 1 + + +def test_add_object_data(db): + policy_id = "policy_id_1" + data_id = "data_id_1" + category_id = "object_category_id1" + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + object_data = add_object_data(policy_id, data_id, category_id, value).get('data') + assert object_data + object_data_id = list(object_data.keys())[0] + assert object_data[object_data_id].get('policy_id') == policy_id + + +def test_delete_object_data(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + data_id = "data_id_1" + category_id = "object_category_id1" + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + object_data = add_object_data(policy_id, data_id, category_id, value).get('data') + object_data_id = list(object_data.keys())[0] + delete_object_data(object_data[object_data_id].get('policy_id'), data_id) + new_object_data = get_object_data(policy_id) + assert len(new_object_data[0]['data']) == 0 + + +def test_get_subject_data(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + + policy_id = policy_id + data_id = "data_id_1" + category_id = "subject_category_id1" + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + add_subject_data(policy_id, data_id, category_id, value) + subject_data = get_subject_data(policy_id, data_id, category_id) + assert subject_data + assert len(subject_data[0]['data']) == 1 + + +def test_get_subject_data_with_invalid_category_id(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + + policy_id = policy_id + data_id = "data_id_1" + category_id = "subject_category_id1" + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + add_subject_data(policy_id, data_id, category_id, value) + subject_data = get_subject_data(policy_id) + assert subject_data + assert len(subject_data[0]['data']) == 1 + + +def test_add_subject_data(db): + policy_id = "policy_id_1" + data_id = "data_id_1" + category_id = "subject_category_id1" + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + subject_data = add_object_data(policy_id, data_id, category_id, value).get('data') + assert subject_data + subject_data_id = list(subject_data.keys())[0] + assert subject_data[subject_data_id].get('policy_id') == policy_id + + +def test_delete_subject_data(db): + policy_id = mock_data.get_policy_id() + get_available_metadata(policy_id) + data_id = "data_id_1" + category_id = "subject_category_id1" + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + subject_data = add_subject_data(policy_id, data_id, category_id, value).get('data') + subject_data_id = list(subject_data.keys())[0] + delete_subject_data(subject_data[subject_data_id].get('policy_id'), data_id) + new_subject_data = get_subject_data(policy_id) + assert len(new_subject_data[0]['data']) == 0 + + +def test_get_actions(db): + policy_id = "policy_id_1" + value = { + "name": "test_action", + "description": "test", + } + add_action(policy_id=policy_id, value=value) + actions = get_actions(policy_id, ) + assert actions + assert len(actions) == 1 + action_id = list(actions.keys())[0] + assert actions[action_id].get('policy_list')[0] == policy_id + + +def test_add_action(db): + policy_id = "policy_id_1" + value = { + "name": "test_action", + "description": "test", + } + action = add_action(policy_id=policy_id, value=value) + assert action + action_id = list(action.keys())[0] + assert len(action[action_id].get('policy_list')) == 1 + + +def test_add_action_multiple_times(db): + policy_id = "policy_id_1" + value = { + "name": "test_action", + "description": "test", + } + action = add_action(policy_id=policy_id, value=value) + action_id = list(action.keys())[0] + perimeter_id = action[action_id].get('id') + assert action + value = { + "name": "test_action", + "description": "test", + "policy_list": ['policy_id_3', 'policy_id_4'] + } + action = add_action('policy_id_7', perimeter_id, value) + assert action + action_id = list(action.keys())[0] + assert len(action[action_id].get('policy_list')) == 2 + + +def test_delete_action(db): + policy_id = "policy_id_1" + value = { + "name": "test_action", + "description": "test", + } + action = add_action(policy_id=policy_id, value=value) + action_id = list(action.keys())[0] + delete_action(policy_id, action_id) + actions = get_actions(policy_id, ) + assert not actions + + +def test_delete_action_with_invalid_perimeter_id(db): + policy_id = "invalid" + perimeter_id = "invalid" + with pytest.raises(Exception) as exception_info: + delete_action(policy_id, perimeter_id) + assert str(exception_info.value) == '400: Action Unknown' + + +def test_get_objects(db): + policy_id = "policy_id_1" + value = { + "name": "test_object", + "description": "test", + } + add_object(policy_id=policy_id, value=value) + objects = get_objects(policy_id, ) + assert objects + assert len(objects) == 1 + object_id = list(objects.keys())[0] + assert objects[object_id].get('policy_list')[0] == policy_id + + +def test_add_object(db): + policy_id = "policy_id_1" + value = { + "name": "test_object", + "description": "test", + } + added_object = add_object(policy_id=policy_id, value=value) + assert added_object + object_id = list(added_object.keys())[0] + assert len(added_object[object_id].get('policy_list')) == 1 + + +def test_add_objects_multiple_times(db): + policy_id = "policy_id_1" + value = { + "name": "test_object", + "description": "test", + } + added_object = add_object(policy_id=policy_id, value=value) + object_id = list(added_object.keys())[0] + perimeter_id = added_object[object_id].get('id') + assert added_object + value = { + "name": "test_object", + "description": "test", + "policy_list": ['policy_id_3', 'policy_id_4'] + } + added_object = add_object('policy_id_7', perimeter_id, value) + assert added_object + object_id = list(added_object.keys())[0] + assert len(added_object[object_id].get('policy_list')) == 2 + + +def test_delete_object(db): + policy_id = "policy_id_1" + value = { + "name": "test_object", + "description": "test", + } + added_object = add_object(policy_id=policy_id, value=value) + object_id = list(added_object.keys())[0] + delete_object(policy_id, object_id) + objects = get_objects(policy_id, ) + assert not objects + + +def test_delete_object_with_invalid_perimeter_id(db): + policy_id = "invalid" + perimeter_id = "invalid" + with pytest.raises(Exception) as exception_info: + delete_object(policy_id, perimeter_id) + assert str(exception_info.value) == '400: Object Unknown' + + +def test_get_subjects(db): + policy_id = "policy_id_1" + value = { + "name": "testuser", + "description": "test", + } + add_subject(policy_id=policy_id, value=value) + subjects = get_subjects(policy_id, ) + assert subjects + assert len(subjects) == 1 + subject_id = list(subjects.keys())[0] + assert subjects[subject_id].get('policy_list')[0] == policy_id + + +def test_add_subject(db): + policy_id = "policy_id_1" + value = { + "name": "testuser", + "description": "test", + } + subject = add_subject(policy_id=policy_id, value=value) + assert subject + subject_id = list(subject.keys())[0] + assert len(subject[subject_id].get('policy_list')) == 1 + + +def test_add_subjects_multiple_times(db): + policy_id = "policy_id_1" + value = { + "name": "testuser", + "description": "test", + } + subject = add_subject(policy_id=policy_id, value=value) + subject_id = list(subject.keys())[0] + perimeter_id = subject[subject_id].get('id') + assert subject + value = { + "name": "testuser", + "description": "test", + "policy_list": ['policy_id_3', 'policy_id_4'] + } + subject = add_subject('policy_id_7', perimeter_id, value) + assert subject + subject_id = list(subject.keys())[0] + assert len(subject[subject_id].get('policy_list')) == 2 + + +def test_delete_subject(db): + policy_id = "policy_id_1" + value = { + "name": "testuser", + "description": "test", + } + subject = add_subject(policy_id=policy_id, value=value) + subject_id = list(subject.keys())[0] + delete_subject(policy_id, subject_id) + subjects = get_subjects(policy_id, ) + assert not subjects + + +def test_delete_subject_with_invalid_perimeter_id(db): + policy_id = "invalid" + perimeter_id = "invalid" + with pytest.raises(Exception) as exception_info: + delete_subject(policy_id, perimeter_id) + assert str(exception_info.value) == '400: Subject Unknown' + + +def test_get_available_metadata(db): + policy_id = mock_data.get_policy_id() + metadata = get_available_metadata(policy_id) + assert metadata + assert metadata['object'][0] == "object_category_id1" + assert metadata['subject'][0] == "subject_category_id1" + assert metadata['subject'][1] == "subject_category_id2" + + +def test_get_available_metadata_empty_model(db): + import policies.test_policies as test_policies + policy_id = mock_data.get_policy_id() + value = mock_data.create_policy("invalid") + policy = test_policies.add_policies(value) + assert policy + policy_id = list(policy.keys())[0] + metadata = get_available_metadata(policy_id) + assert metadata
\ No newline at end of file diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py new file mode 100755 index 00000000..acd5d7a8 --- /dev/null +++ b/python_moondb/tests/unit_python/policies/test_policies.py @@ -0,0 +1,161 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + + +def get_policies(): + from python_moondb.core import PolicyManager + return PolicyManager.get_policies("admin") + + +def add_policies(value=None): + from python_moondb.core import PolicyManager + if not value: + value = { + "name": "test_policiy", + "model_id": "", + "genre": "authz", + "description": "test", + } + return PolicyManager.add_policy("admin", value=value) + + +def delete_policies(uuid=None, name=None): + from python_moondb.core import PolicyManager + if not uuid: + for policy_id, policy_value in get_policies(): + if name == policy_value['name']: + uuid = policy_id + break + PolicyManager.delete_policy("admin", uuid) + + +def get_rules(policy_id=None, meta_rule_id=None, rule_id=None): + from python_moondb.core import PolicyManager + return PolicyManager.get_rules("", policy_id, meta_rule_id, rule_id) + + +def add_rule(policy_id=None, meta_rule_id=None, value=None): + from python_moondb.core import PolicyManager + if not value: + value = { + "rule": ("high", "medium", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + return PolicyManager.add_rule("", policy_id, meta_rule_id, value) + + +def delete_rule(policy_id=None, rule_id=None): + from python_moondb.core import PolicyManager + PolicyManager.delete_rule("", policy_id, rule_id) + + +def test_get_policies(db): + policies = get_policies() + assert isinstance(policies, dict) + assert not policies + + +def test_add_policies(db): + value = { + "name": "test_policy", + "model_id": "", + "genre": "authz", + "description": "test", + } + policies = add_policies(value) + assert isinstance(policies, dict) + assert policies + assert len(policies.keys()) == 1 + policy_id = list(policies.keys())[0] + for key in ("genre", "name", "model_id", "description"): + assert key in policies[policy_id] + assert policies[policy_id][key] == value[key] + + +def test_delete_policies(db): + value = { + "name": "test_policy1", + "model_id": "", + "genre": "authz", + "description": "test", + } + policies = add_policies(value) + policy_id1 = list(policies.keys())[0] + value = { + "name": "test_policy2", + "model_id": "", + "genre": "authz", + "description": "test", + } + policies = add_policies(value) + policy_id2 = list(policies.keys())[0] + assert policy_id1 != policy_id2 + delete_policies(policy_id1) + policies = get_policies() + assert policy_id1 not in policies + + +def test_get_rules(db): + value = { + "rule": ("low", "medium", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + policy_id = "1" + meta_rule_id = "1" + add_rule(policy_id, meta_rule_id, value) + value = { + "rule": ("low", "low", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + policy_id = "1" + meta_rule_id = "1" + add_rule(policy_id, meta_rule_id, value) + rules = get_rules(policy_id, meta_rule_id) + assert isinstance(rules, dict) + assert rules + obj = rules.get('rules') + assert len(obj) == 2 + + +def test_get_rules_with_invalid_policy_id_failure(db): + rules = get_rules("invalid_policy_id", "meta_rule_id") + assert not rules.get('meta_rule-id') + assert len(rules.get('rules')) == 0 + + +def test_add_rule(db): + value = { + "rule": ("high", "medium", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + policy_id = "1" + meta_rule_id = "1" + rules = add_rule(policy_id, meta_rule_id, value) + assert rules + assert len(rules) == 1 + assert isinstance(rules, dict) + rule_id = list(rules.keys())[0] + for key in ("rule", "instructions", "enabled"): + assert key in rules[rule_id] + assert rules[rule_id][key] == value[key] + + +def test_delete_rule(db): + value = { + "rule": ("low", "low", "vm-action"), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + policy_id = "2" + meta_rule_id = "2" + rules = add_rule(policy_id, meta_rule_id, value) + rule_id = list(rules.keys())[0] + delete_rule(policy_id, rule_id) + rules = get_rules(policy_id, meta_rule_id) + assert not rules.get('rules') diff --git a/python_moondb/tests/unit_python/test_policies.py b/python_moondb/tests/unit_python/test_policies.py deleted file mode 100644 index 2d654660..00000000 --- a/python_moondb/tests/unit_python/test_policies.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors -# This software is distributed under the terms and conditions of the 'Apache-2.0' -# license which can be found in the file 'LICENSE' in this package distribution -# or at 'http://www.apache.org/licenses/LICENSE-2.0'. - - -def get_policies(): - from python_moondb.core import PolicyManager - return PolicyManager.get_policies("admin") - - -def add_policies(value=None): - from python_moondb.core import PolicyManager - if not value: - value = { - "name": "test_policiy", - "model_id": "", - "genre": "authz", - "description": "test", - } - return PolicyManager.add_policy("admin", value=value) - - -def delete_policies(uuid=None, name=None): - from python_moondb.core import PolicyManager - if not uuid: - for policy_id, policy_value in get_policies(): - if name == policy_value['name']: - uuid = policy_id - break - PolicyManager.delete_policy("admin", uuid) - - -def test_get_policies(db): - policies = get_policies() - assert isinstance(policies, dict) - assert not policies - - -def test_add_policies(db): - value = { - "name": "test_policy", - "model_id": "", - "genre": "authz", - "description": "test", - } - policies = add_policies(value) - assert isinstance(policies, dict) - assert policies - assert len(policies.keys()) == 1 - policy_id = list(policies.keys())[0] - for key in ("genre", "name", "model_id", "description"): - assert key in policies[policy_id] - assert policies[policy_id][key] == value[key] - - -def test_delete_policies(db): - value = { - "name": "test_policy1", - "model_id": "", - "genre": "authz", - "description": "test", - } - policies = add_policies(value) - policy_id1 = list(policies.keys())[0] - value = { - "name": "test_policy2", - "model_id": "", - "genre": "authz", - "description": "test", - } - policies = add_policies(value) - policy_id2 = list(policies.keys())[0] - assert policy_id1 != policy_id2 - delete_policies(policy_id1) - policies = get_policies() - assert policy_id1 not in policies |