diff options
author | Asteroide <thomas.duval@orange.com> | 2018-10-05 15:01:17 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2018-10-05 15:01:17 +0000 |
commit | cbea4e360e9bfaa9698cf7c61c83c96a1ba89b8c (patch) | |
tree | a8bf6a7bfb06605ed5bfab77570afbe1e46cff4b /python_moondb/tests/unit_python/policies | |
parent | a3f68df52836676b23ac0f5e3d8c40c957ee80a7 (diff) | |
parent | 2e35a7e46f0929438c1c206e3116caa829f07dc6 (diff) |
Merge "Update code to 4.6 official version"
Diffstat (limited to 'python_moondb/tests/unit_python/policies')
3 files changed, 593 insertions, 75 deletions
diff --git a/python_moondb/tests/unit_python/policies/test_assignments.py b/python_moondb/tests/unit_python/policies/test_assignments.py index 675c2ff9..24a3a7b0 100755 --- a/python_moondb/tests/unit_python/policies/test_assignments.py +++ b/python_moondb/tests/unit_python/policies/test_assignments.py @@ -19,7 +19,8 @@ def test_get_action_assignments(db): data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id) assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id) - act_assignments = assignment_helper.get_action_assignments(policy_id, action_id, action_category_id) + act_assignments = assignment_helper.get_action_assignments(policy_id, action_id, + action_category_id) action_id_1 = list(act_assignments.keys())[0] assert act_assignments[action_id_1]["policy_id"] == policy_id assert act_assignments[action_id_1]["action_id"] == action_id @@ -36,7 +37,8 @@ def test_add_action_assignments(db): meta_rule_name="meta_rule_1") action_id = mock_data.create_action(policy_id) data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id) - action_assignments = assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id) + action_assignments = assignment_helper.add_action_assignment(policy_id, action_id, + action_category_id, data_id) assert action_assignments action_id_1 = list(action_assignments.keys())[0] assert action_assignments[action_id_1]["policy_id"] == policy_id @@ -47,6 +49,8 @@ def test_add_action_assignments(db): with pytest.raises(ActionAssignmentExisting) as exception_info: assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id) + assert str(exception_info.value) == '409: Action Assignment Existing' + assert str(exception_info.value.description) == 'The given action assignment value is existing.' def test_delete_action_assignment(db): @@ -79,7 +83,8 @@ def test_get_object_assignments(db): object_id = mock_data.create_object(policy_id) data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id) assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id) - obj_assignments = assignment_helper.get_object_assignments(policy_id, object_id, object_category_id) + obj_assignments = assignment_helper.get_object_assignments(policy_id, object_id, + object_category_id) object_id_1 = list(obj_assignments.keys())[0] assert obj_assignments[object_id_1]["policy_id"] == policy_id assert obj_assignments[object_id_1]["object_id"] == object_id @@ -109,7 +114,8 @@ def test_add_object_assignments(db): meta_rule_name="meta_rule_1") object_id = mock_data.create_object(policy_id) data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id) - object_assignments = assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id) + object_assignments = assignment_helper.add_object_assignment(policy_id, object_id, + object_category_id, data_id) assert object_assignments object_id_1 = list(object_assignments.keys())[0] assert object_assignments[object_id_1]["policy_id"] == policy_id @@ -118,8 +124,10 @@ def test_add_object_assignments(db): assert len(object_assignments[object_id_1].get("assignments")) == 1 assert data_id in object_assignments[object_id_1].get("assignments") - with pytest.raises(ObjectAssignmentExisting): + with pytest.raises(ObjectAssignmentExisting) as exception_info: assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id) + assert str(exception_info.value) == '409: Object Assignment Existing' + assert str(exception_info.value.description) == 'The given object assignment value is existing.' def test_delete_object_assignment(db): @@ -132,7 +140,8 @@ def test_delete_object_assignment(db): data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id) assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id) - assignment_helper.delete_object_assignment(policy_id, object_id, object_category_id, data_id=data_id) + assignment_helper.delete_object_assignment(policy_id, object_id, object_category_id, + data_id=data_id) assignments = assignment_helper.get_object_assignments(policy_id) assert len(assignments) == 0 @@ -154,7 +163,8 @@ def test_get_subject_assignments(db): data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id) assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id) - subj_assignments = assignment_helper.get_subject_assignments(policy_id, subject_id, subject_category_id) + subj_assignments = assignment_helper.get_subject_assignments(policy_id, subject_id, + subject_category_id) subject_id_1 = list(subj_assignments.keys())[0] assert subj_assignments[subject_id_1]["policy_id"] == policy_id assert subj_assignments[subject_id_1]["subject_id"] == subject_id @@ -186,7 +196,8 @@ def test_add_subject_assignments(db): subject_id = mock_data.create_subject(policy_id) data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id) - subject_assignments = assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id) + subject_assignments = assignment_helper.add_subject_assignment(policy_id, subject_id, + subject_category_id, data_id) assert subject_assignments subject_id_1 = list(subject_assignments.keys())[0] assert subject_assignments[subject_id_1]["policy_id"] == policy_id @@ -195,8 +206,12 @@ def test_add_subject_assignments(db): assert len(subject_assignments[subject_id_1].get("assignments")) == 1 assert data_id in subject_assignments[subject_id_1].get("assignments") - with pytest.raises(SubjectAssignmentExisting): - assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id) + with pytest.raises(SubjectAssignmentExisting) as exception_info: + assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, + data_id) + assert str(exception_info.value) == '409: Subject Assignment Existing' + assert str( + exception_info.value.description) == 'The given subject assignment value is existing.' def test_delete_subject_assignment(db): diff --git a/python_moondb/tests/unit_python/policies/test_data.py b/python_moondb/tests/unit_python/policies/test_data.py index fa3f8c06..8ce1ac00 100755 --- a/python_moondb/tests/unit_python/policies/test_data.py +++ b/python_moondb/tests/unit_python/policies/test_data.py @@ -6,7 +6,9 @@ import helpers.mock_data as mock_data import policies.mock_data import helpers.data_helper as data_helper +import helpers.assignment_helper as assignment_helper import pytest +from uuid import uuid4 import logging from python_moonutilities.exceptions import * @@ -56,6 +58,21 @@ def test_add_action_data(db): assert len(action_data['data']) == 1 +def test_add_action_data_duplicate(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1") + value = { + "name": "action-type", + "description": {"vm-action": "", "storage-action": "", }, + } + action_data = data_helper.add_action_data(policy_id=policy_id, category_id=action_category_id, value=value) + with pytest.raises(ActionScopeExisting) as exception_info: + action_data = data_helper.add_action_data(policy_id=policy_id, category_id=action_category_id, value=value) + assert str(exception_info.value) == '409: Action Scope Existing' + def test_add_action_data_with_invalid_category_id(db): subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( subject_category_name="subject_category1", @@ -66,9 +83,9 @@ def test_add_action_data_with_invalid_category_id(db): "name": "action-type", "description": {"vm-action": "", "storage-action": "", }, } - with pytest.raises(Exception) as exception_info: + with pytest.raises(ActionCategoryUnknown) as exception_info: data_helper.add_action_data(policy_id=policy_id, value=value).get('data') - assert str(exception_info.value) == 'Invalid category id' + assert str(exception_info.value) == '400: Action Category Unknown' def test_delete_action_data(db): @@ -84,7 +101,7 @@ def test_delete_action_data(db): } action_data = data_helper.add_action_data(policy_id=policy_id, category_id=action_category_id, value=value) data_id = list(action_data["data"])[0] - data_helper.delete_action_data(policy_id, data_id) + data_helper.delete_action_data(policy_id=policy_id, data_id=data_id) new_action_data = data_helper.get_action_data(policy_id) assert len(new_action_data[0]['data']) == 0 @@ -144,9 +161,27 @@ def test_add_object_data_with_invalid_category_id(db): "name": "object-security-level", "description": {"low": "", "medium": "", "high": ""}, } - with pytest.raises(MetaDataUnknown) as exception_info: + with pytest.raises(ObjectCategoryUnknown) as exception_info: data_helper.add_object_data(policy_id=policy_id, category_id="invalid", value=value).get('data') - assert str(exception_info.value) == '400: Meta data Unknown' + assert str(exception_info.value) == '400: Object Category Unknown' + + +def test_add_object_data_duplicate(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1") + value = { + "name": "object-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + object_data = data_helper.add_object_data(policy_id=policy_id, category_id=object_category_id, value=value).get( + 'data') + with pytest.raises(ObjectScopeExisting) as exception_info: + data_helper.add_object_data(policy_id=policy_id, category_id=object_category_id, value=value).get( + 'data') + assert str(exception_info.value) == '409: Object Scope Existing' def test_delete_object_data(db): @@ -229,9 +264,28 @@ def test_add_subject_data_with_no_category_id(db): "name": "subject-security-level", "description": {"low": "", "medium": "", "high": ""}, } - with pytest.raises(Exception) as exception_info: + with pytest.raises(SubjectCategoryUnknown) as exception_info: data_helper.add_subject_data(policy_id=policy_id, data_id=subject_category_id, value=value).get('data') - assert str(exception_info.value) == 'Invalid category id' + assert str(exception_info.value) == '400: Subject Category Unknown' + + +def test_add_subject_data_duplicate(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1") + value = { + "name": "subject-security-level", + "description": {"low": "", "medium": "", "high": ""}, + } + subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, value=value).get( + 'data') + + with pytest.raises(SubjectScopeExisting) as exception_info: + subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, + value=value).get('data') + assert str(exception_info.value) == '409: Subject Scope Existing' def test_delete_subject_data(db): @@ -247,7 +301,7 @@ def test_delete_subject_data(db): subject_data = data_helper.add_subject_data(policy_id=policy_id, category_id=subject_category_id, value=value).get( 'data') subject_data_id = list(subject_data.keys())[0] - data_helper.delete_subject_data(subject_data[subject_data_id].get('policy_id'), subject_data_id) + data_helper.delete_subject_data(policy_id=subject_data[subject_data_id].get('policy_id'), data_id=subject_data_id) new_subject_data = data_helper.get_subject_data(policy_id) assert len(new_subject_data[0]['data']) == 0 @@ -297,8 +351,9 @@ def test_add_action_twice(db): "description": "test", } data_helper.add_action(policy_id=policy_id, value=value) - with pytest.raises(ActionExisting): + with pytest.raises(PolicyExisting) as exception_info: data_helper.add_action(policy_id=policy_id, value=value) + assert str(exception_info.value) == '409: Policy Already Exists' def test_add_action_blank_name(db): @@ -307,9 +362,9 @@ def test_add_action_blank_name(db): "name": "", "description": "test", } - with pytest.raises(Exception) as exception_info: + with pytest.raises(PerimeterContentError) as exception_info: data_helper.add_action(policy_id=policy_id, value=value) - assert str(exception_info.value) == '400: Perimeter Name is Invalid' + assert str(exception_info.value) == '400: Perimeter content is invalid.' def test_add_action_with_name_space(db): @@ -318,9 +373,9 @@ def test_add_action_with_name_space(db): "name": " ", "description": "test", } - with pytest.raises(Exception) as exception_info: + with pytest.raises(PerimeterContentError) as exception_info: data_helper.add_action(policy_id=policy_id, value=value) - assert str(exception_info.value) == '400: Perimeter Name is Invalid' + assert str(exception_info.value) == '400: Perimeter content is invalid.' def test_add_action_multiple_times(db): @@ -377,7 +432,7 @@ def test_delete_action(db): def test_delete_action_with_invalid_perimeter_id(db): policy_id = "invalid" perimeter_id = "invalid" - with pytest.raises(Exception) as exception_info: + with pytest.raises(PolicyUnknown) as exception_info: data_helper.delete_action(policy_id, perimeter_id) assert str(exception_info.value) == '400: Policy Unknown' @@ -400,7 +455,7 @@ def test_get_objects(db): assert objects[object_id].get('policy_list')[0] == policy_id -def test_add_object(db): +def test_add_object_with_same_policy_twice(db): subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( subject_category_name="subject_category1", object_category_name="object_category1", @@ -415,8 +470,9 @@ def test_add_object(db): object_id = list(added_object.keys())[0] assert len(added_object[object_id].get('policy_list')) == 1 - with pytest.raises(ObjectExisting): + with pytest.raises(PolicyExisting) as exception_info: data_helper.add_object(policy_id=policy_id, value=value) + assert str(exception_info.value) == '409: Policy Already Exists' def test_add_objects_multiple_times(db): @@ -470,7 +526,7 @@ def test_delete_object(db): def test_delete_object_with_invalid_perimeter_id(db): policy_id = "invalid" perimeter_id = "invalid" - with pytest.raises(Exception) as exception_info: + with pytest.raises(PolicyUnknown) as exception_info: data_helper.delete_object(policy_id, perimeter_id) assert str(exception_info.value) == '400: Policy Unknown' @@ -504,11 +560,12 @@ def test_get_subjects_with_invalid_policy_id(db): "description": "test", } data_helper.add_subject(policy_id=policy_id, value=value) - with pytest.raises(PolicyUnknown): + with pytest.raises(PolicyUnknown) as exception_info: data_helper.get_subjects(policy_id="invalid") + assert str(exception_info.value) == '400: Policy Unknown' -def test_add_subject(db): +def test_add_subject_with_same_policy_twice(db): subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( subject_category_name="subject_category1", object_category_name="object_category1", @@ -522,9 +579,9 @@ def test_add_subject(db): assert subject subject_id = list(subject.keys())[0] assert len(subject[subject_id].get('policy_list')) == 1 - with pytest.raises(SubjectExisting) as exception_info: + with pytest.raises(PolicyExisting) as exception_info: data_helper.add_subject(policy_id=policy_id, value=value) - assert str(exception_info.value) == '409: Subject Existing' + assert str(exception_info.value) == '409: Policy Already Exists' def test_add_subjects_multiple_times(db): @@ -578,11 +635,59 @@ def test_delete_subject(db): def test_delete_subject_with_invalid_perimeter_id(db): policy_id = "invalid" perimeter_id = "invalid" - with pytest.raises(Exception) as exception_info: + with pytest.raises(PolicyUnknown) as exception_info: data_helper.delete_subject(policy_id, perimeter_id) assert str(exception_info.value) == '400: Policy Unknown' +def test_delete_subject_with_assignment(db): + + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category"+uuid4().hex, + object_category_name="object_category"+uuid4().hex, + action_category_name="action_category"+uuid4().hex, + meta_rule_name="meta_rule_"+uuid4().hex) + + subject_id = mock_data.create_subject(policy_id) + data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id) + + with pytest.raises(DeletePerimeterWithAssignment) as exception_info: + data_helper.delete_subject(policy_id, subject_id) + assert '400: Perimeter With Assignment Error' == str(exception_info.value) + + +def test_delete_object_with_assignment(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category" + uuid4().hex, + object_category_name="object_category" + uuid4().hex, + action_category_name="action_category" + uuid4().hex, + meta_rule_name="meta_rule_" + uuid4().hex) + + object_id = mock_data.create_object(policy_id) + data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id) + assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id) + + with pytest.raises(DeletePerimeterWithAssignment) as exception_info: + data_helper.delete_object(policy_id, object_id) + assert '400: Perimeter With Assignment Error' == str(exception_info.value) + +def test_delete_action_with_assignment(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category" + uuid4().hex, + object_category_name="object_category" + uuid4().hex, + action_category_name="action_category" + uuid4().hex, + meta_rule_name="meta_rule_" + uuid4().hex) + + action_id = mock_data.create_action(policy_id) + data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id) + assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id) + + with pytest.raises(DeletePerimeterWithAssignment) as exception_info: + data_helper.delete_action(policy_id, action_id) + assert '400: Perimeter With Assignment Error' == str(exception_info.value) + + def test_get_available_metadata(db): subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( subject_category_name="subject_category1", @@ -597,6 +702,6 @@ def test_get_available_metadata(db): def test_get_available_metadata_with_invalid_policy_id(db): - with pytest.raises(Exception) as exception_info: + with pytest.raises(PolicyUnknown) as exception_info: data_helper.get_available_metadata(policy_id='invalid') assert '400: Policy Unknown' == str(exception_info.value) diff --git a/python_moondb/tests/unit_python/policies/test_policies.py b/python_moondb/tests/unit_python/policies/test_policies.py index 07ee87fd..b2394203 100755 --- a/python_moondb/tests/unit_python/policies/test_policies.py +++ b/python_moondb/tests/unit_python/policies/test_policies.py @@ -6,8 +6,13 @@ import pytest import helpers.mock_data as mock_data import helpers.policy_helper as policy_helper +import helpers.model_helper as model_helper +import helpers.model_helper as model_helper from python_moonutilities.exceptions import * import helpers.pdp_helper as pdp_helper +import helpers.data_helper as data_helper +import helpers.assignment_helper as assignment_helper +from uuid import uuid4 def test_get_policies(db): @@ -17,9 +22,11 @@ def test_get_policies(db): def test_add_policies(db): + model = model_helper.add_model(model_id=uuid4().hex) + model_id = next(iter(model)) value = { "name": "test_policy", - "model_id": "", + "model_id": model_id, "genre": "authz", "description": "test", } @@ -35,43 +42,53 @@ def test_add_policies(db): def test_add_policies_twice_with_same_id(db): policy_id = 'policy_id_1' + model = model_helper.add_model(model_id=uuid4().hex) + model_id = next(iter(model)) value = { "name": "test_policy", - "model_id": "", + "model_id": model_id, "genre": "authz", "description": "test", } policy_helper.add_policies(policy_id, value) with pytest.raises(PolicyExisting) as exception_info: policy_helper.add_policies(policy_id, value) - # assert str(exception_info.value) == '409: Policy Error' + assert str(exception_info.value) == '409: Policy Already Exists' def test_add_policies_twice_with_same_name(db): + model = model_helper.add_model(model_id=uuid4().hex) + model_id = next(iter(model)) + policy_name=uuid4().hex value = { - "name": "test_policy", - "model_id": "", + "name": policy_name, + "model_id": model_id, "genre": "authz", "description": "test", } policy_helper.add_policies(value=value) with pytest.raises(Exception) as exception_info: policy_helper.add_policies(value=value) - # assert str(exception_info.value) == '409: Policy Error' + assert str(exception_info.value) == '409: Policy Already Exists' + assert str(exception_info.value.description)== 'Policy name Existed' def test_delete_policies(db): + model = model_helper.add_model(model_id=uuid4().hex) + model_id = next(iter(model)) + policy_name1 = uuid4().hex value = { - "name": "test_policy1", - "model_id": "", + "name": policy_name1, + "model_id": model_id, "genre": "authz", "description": "test", } policies = policy_helper.add_policies(value=value) policy_id1 = list(policies.keys())[0] + policy_name2 = uuid4().hex value = { - "name": "test_policy2", - "model_id": "", + "name": policy_name2, + "model_id": model_id, "genre": "authz", "description": "test", } @@ -95,7 +112,7 @@ def test_update_policy(db): policy_id = list(policies.keys())[0] value = { "name": "test_policy4", - "model_id": "", + "model_id": policies[policy_id]['model_id'], "genre": "authz", "description": "test-3", } @@ -106,6 +123,24 @@ def test_update_policy(db): assert updated_policy[policy_id][key] == value[key] +def test_update_policy_name_with_existed_one(db): + policies = policy_helper.add_policies() + policy_id1 = list(policies.keys())[0] + policy_name = uuid4().hex + value = { + "name": policy_name, + "model_id": policies[policy_id1]['model_id'], + "genre": "authz", + "description": "test-3", + } + policy_helper.add_policies(value=value) + with pytest.raises(PolicyExisting) as exception_info: + policy_helper.update_policy(policy_id=policy_id1,value=value) + + assert str(exception_info.value) == '409: Policy Already Exists' + assert str(exception_info.value.description)== 'Policy name Existed' + + def test_update_policy_with_invalid_id(db): policy_id = 'invalid-id' value = { @@ -116,7 +151,7 @@ def test_update_policy_with_invalid_id(db): } with pytest.raises(PolicyUnknown) as exception_info: policy_helper.update_policy(policy_id, value) - # assert str(exception_info.value) == '400: Policy Unknown' + assert str(exception_info.value) == '400: Policy Unknown' def test_get_policy_from_meta_rules(db): @@ -154,18 +189,10 @@ def test_get_rules(db): action_category_name="action_category12", meta_rule_name="meta_rule_12", model_name="model12") - value = { - "rule": ("low", "medium", "vm-action"), - "instructions": ({"decision": "grant"}), - "enabled": "", - } - policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) - value = { - "rule": ("low", "low", "vm-action"), - "instructions": ({"decision": "grant"}), - "enabled": "", - } - policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id) + + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id) rules = policy_helper.get_rules(policy_id=policy_id, meta_rule_id=meta_rule_id) assert isinstance(rules, dict) assert rules @@ -179,15 +206,22 @@ def test_get_rules_with_invalid_policy_id_failure(db): assert len(rules.get('rules')) == 0 -def test_add_rule(db): +def test_add_rule_existing(db): subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( subject_category_name="subject_category1", object_category_name="object_category1", action_category_name="action_category1", meta_rule_name="meta_rule_1", model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { - "rule": ("high", "medium", "vm-action"), + "rule": (subject_data_id, object_data_id, action_data_id), "instructions": ({"decision": "grant"}), "enabled": "", } @@ -201,23 +235,263 @@ def test_add_rule(db): assert key in rules[rule_id] assert rules[rule_id][key] == value[key] - with pytest.raises(RuleExisting): + with pytest.raises(RuleExisting) as exception_info: policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '409: Rule Existing' -def test_delete_rule(db): +def test_check_existing_rule_valid_request(db): subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( - subject_category_name="subject_category14", - object_category_name="object_category14", - action_category_name="action_category14", - meta_rule_name="meta_rule_14", - model_name="model14") + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) value = { - "rule": ("low", "low", "vm-action"), + "rule": (subject_data_id, object_data_id, action_data_id), "instructions": ({"decision": "grant"}), "enabled": "", } - rules = policy_helper.add_rule(policy_id, meta_rule_id, value) + + rules = policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert rules + assert len(rules) == 1 + assert isinstance(rules, dict) + rule_id = list(rules.keys())[0] + for key in ("rule", "instructions", "enabled"): + assert key in rules[rule_id] + assert rules[rule_id][key] == value[key] + + with pytest.raises(RuleExisting) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '409: Rule Existing' + + +def test_check_existing_rule_valid_multiple__data(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id1 = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + subject_data_id2 = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id1 = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + object_data_id2 = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id1 = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + action_data_id2 = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": ( + subject_data_id1, object_data_id2, action_data_id1), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + rules = policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert rules + assert len(rules) == 1 + assert isinstance(rules, dict) + rule_id = list(rules.keys())[0] + for key in ("rule", "instructions", "enabled"): + assert key in rules[rule_id] + assert rules[rule_id][key] == value[key] + + with pytest.raises(RuleExisting) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '409: Rule Existing' + + +def test_check_existing_rule_missing_data(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (object_data_id, action_data_id), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(RuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Rule Error' + assert exception_info.value.description== "Missing Data" + + +def test_check_existing_rule_meta_rule_missing_data(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (subject_data_id, object_data_id, action_data_id, action_data_id), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(MetaRuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Meta Rule Error' + assert exception_info.value.description == "Missing Data" + + +def test_check_existing_rule_invalid_data_id_order(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (object_data_id, action_data_id, subject_data_id), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(RuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Rule Error' + assert "Missing Subject_category" in exception_info.value.description + + +def test_check_existing_rule_invalid_data_id_order_scenrio_2(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (subject_data_id, action_data_id, object_data_id), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(RuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Rule Error' + assert "Missing Object_category" in exception_info.value.description + + +def test_check_existing_rule_wrong_subject_data_id(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (uuid4().hex, object_data_id, action_data_id), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(RuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Rule Error' + assert "Missing Subject_category" in exception_info.value.description + + +def test_check_existing_rule_wrong_object_data_id(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (subject_data_id, uuid4().hex, action_data_id), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(RuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Rule Error' + assert "Missing Object_category" in exception_info.value.description + + +def test_check_existing_rule_wrong_action_data_id(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy( + subject_category_name="subject_category1", + object_category_name="object_category1", + action_category_name="action_category1", + meta_rule_name="meta_rule_1", + model_name="model1") + subject_data_id = mock_data.create_subject_data(policy_id=policy_id, + category_id=subject_category_id) + object_data_id = mock_data.create_object_data(policy_id=policy_id, + category_id=object_category_id) + action_data_id = mock_data.create_action_data(policy_id=policy_id, + category_id=action_category_id) + value = { + "rule": (subject_data_id, object_data_id, uuid4().hex), + "instructions": ({"decision": "grant"}), + "enabled": "", + } + + with pytest.raises(RuleContentError) as exception_info: + policy_helper.add_rule(policy_id=policy_id, meta_rule_id=meta_rule_id, value=value) + assert str(exception_info.value) == '400: Rule Error' + assert "Missing Action_category" in exception_info.value.description + + +def test_delete_rule(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + rules = policy_helper.add_rule(policy_id, meta_rule_id) rule_id = list(rules.keys())[0] policy_helper.delete_rule(policy_id, rule_id) rules = policy_helper.get_rules(policy_id, meta_rule_id) @@ -225,13 +499,7 @@ def test_delete_rule(db): def test_delete_policies_with_pdp(db): - value = { - "name": "test_policy1", - "model_id": "", - "genre": "authz", - "description": "test", - } - policies = policy_helper.add_policies(value=value) + policies = policy_helper.add_policies() policy_id1 = list(policies.keys())[0] pdp_id = "pdp_id1" value = { @@ -243,3 +511,133 @@ def test_delete_policies_with_pdp(db): pdp_helper.add_pdp(pdp_id=pdp_id, value=value) with pytest.raises(DeletePolicyWithPdp) as exception_info: policy_helper.delete_policies(policy_id1) + assert str(exception_info.value) == '400: Policy With PDP Error' + assert 'Cannot delete policy with pdp' == exception_info.value.description + + +def test_delete_policies_with_subject_perimeter(db): + policies = policy_helper.add_policies() + policy_id1 = list(policies.keys())[0] + + value = { + "name": "testuser", + "security_pipeline": [policy_id1], + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + data_helper.add_subject(policy_id=policy_id1, value=value) + with pytest.raises(DeletePolicyWithPerimeter) as exception_info: + policy_helper.delete_policies(policy_id1) + assert str(exception_info.value) == '400: Policy With Perimeter Error' + assert 'Cannot delete policy with perimeter'== exception_info.value.description + + +def test_delete_policies_with_object_perimeter(db): + policies = policy_helper.add_policies() + policy_id1 = list(policies.keys())[0] + + value = { + "name": "test_obj", + "security_pipeline": [policy_id1], + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + data_helper.add_object(policy_id=policy_id1, value=value) + with pytest.raises(DeletePolicyWithPerimeter) as exception_info: + policy_helper.delete_policies(policy_id1) + assert str(exception_info.value) == '400: Policy With Perimeter Error' + assert 'Cannot delete policy with perimeter'== exception_info.value.description + + +def test_delete_policies_with_action_perimeter(db): + policies = policy_helper.add_policies() + policy_id1 = list(policies.keys())[0] + + value = { + "name": "test_act", + "security_pipeline": [policy_id1], + "keystone_project_id": "keystone_project_id1", + "description": "...", + } + data_helper.add_action(policy_id=policy_id1, value=value) + with pytest.raises(DeletePolicyWithPerimeter) as exception_info: + policy_helper.delete_policies(policy_id1) + assert '400: Policy With Perimeter Error' == str(exception_info.value) + + +def test_delete_policies_with_subject_assignment(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + subject_id = mock_data.create_subject(policy_id) + data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + assignment_helper.add_subject_assignment(policy_id, subject_id, subject_category_id, data_id) + + with pytest.raises(DeletePolicyWithPerimeter) as exception_info: + policy_helper.delete_policies(policy_id) + + assert '400: Policy With Perimeter Error' == str(exception_info.value) + + +def test_delete_policies_with_object_assignment(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + object_id = mock_data.create_object(policy_id) + data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id) + assignment_helper.add_object_assignment(policy_id, object_id, object_category_id, data_id) + + with pytest.raises(DeletePolicyWithPerimeter) as exception_info: + policy_helper.delete_policies(policy_id) + assert '400: Policy With Perimeter Error' == str(exception_info.value) + + +def test_delete_policies_with_action_assignment(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + action_id = mock_data.create_action(policy_id) + data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id) + assignment_helper.add_action_assignment(policy_id, action_id, action_category_id, data_id) + + with pytest.raises(DeletePolicyWithPerimeter) as exception_info: + policy_helper.delete_policies(policy_id) + assert '400: Policy With Perimeter Error' == str(exception_info.value) + + +def test_delete_policies_with_subject_data(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + data_id = mock_data.create_subject_data(policy_id=policy_id, category_id=subject_category_id) + + with pytest.raises(DeletePolicyWithData) as exception_info: + policy_helper.delete_policies(policy_id) + + assert '400: Policy With Data Error' == str(exception_info.value) + + +def test_delete_policies_with_object_data(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + data_id = mock_data.create_object_data(policy_id=policy_id, category_id=object_category_id) + + with pytest.raises(DeletePolicyWithData) as exception_info: + policy_helper.delete_policies(policy_id) + assert '400: Policy With Data Error' == str(exception_info.value) + + +def test_delete_policies_with_action_data(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + data_id = mock_data.create_action_data(policy_id=policy_id, category_id=action_category_id) + + with pytest.raises(DeletePolicyWithData) as exception_info: + policy_helper.delete_policies(policy_id) + assert '400: Policy With Data Error' == str(exception_info.value) + + +def test_delete_policies_with_rule(db): + subject_category_id, object_category_id, action_category_id, meta_rule_id, policy_id = mock_data.create_new_policy() + + rules = policy_helper.add_rule(policy_id, meta_rule_id) + + with pytest.raises(DeletePolicyWithRules) as exception_info: + policy_helper.delete_policies(policy_id) + assert '400: Policy With Rule Error' == str(exception_info.value) |