aboutsummaryrefslogtreecommitdiffstats
path: root/python_moonutilities/tests/unit_python/test_cache.py
diff options
context:
space:
mode:
Diffstat (limited to 'python_moonutilities/tests/unit_python/test_cache.py')
-rw-r--r--python_moonutilities/tests/unit_python/test_cache.py452
1 files changed, 0 insertions, 452 deletions
diff --git a/python_moonutilities/tests/unit_python/test_cache.py b/python_moonutilities/tests/unit_python/test_cache.py
deleted file mode 100644
index bef10a21..00000000
--- a/python_moonutilities/tests/unit_python/test_cache.py
+++ /dev/null
@@ -1,452 +0,0 @@
-import pytest
-import mock_repo.data as data_mock
-import mock_repo.urls as register_urls
-import requests_mock
-
-
-def test_authz_request():
- from python_moonutilities import cache
- c = cache.Cache()
- assert isinstance(c.authz_requests, dict)
-
-
-# tests for get (subject) in cache
-# ================================================
-def test_get_subject_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'subject_name'
- subject_id = cache_obj.get_subject(data_mock.shared_ids["policy"]["policy_id_1"], name)
- assert subject_id is not None
-
-def test_get_subject_no_policy():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_subject(None, "")
- assert str(exception_info.value) == '400: Policy Unknown'
-
-def test_get_subject_invalid_name():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'invalid name'
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_subject(data_mock.shared_ids["policy"]["policy_id_1"], name)
- assert str(exception_info.value) == '400: Subject Unknown'
-
-def test_get_subject_invalid_response():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'policy_id_invalid_response'
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_subject(data_mock.shared_ids["policy"]["policy_id_invalid_response"], name)
- assert str(exception_info.value) == '400: Subject Unknown'
-
-# tests for get (object) in cache
-# ================================================
-def test_get_object_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'object_name'
- object_id = cache_obj.get_object(data_mock.shared_ids["policy"]["policy_id_1"], name)
- assert object_id is not None
-
-def test_get_object_no_policy():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_object(None, "")
- assert str(exception_info.value) == '400: Policy Unknown'
-
-def test_get_object_invalid_name():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'invalid name'
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_object(data_mock.shared_ids["policy"]["policy_id_1"], name)
- assert str(exception_info.value) == '400: Object Unknown'
-
-def test_get_object_invalid_response():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'policy_id_invalid_response'
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_object(data_mock.shared_ids["policy"]["policy_id_invalid_response"], name)
- assert str(exception_info.value) == '400: Object Unknown'
-
-# tests for get (action) in cache
-# ================================================
-def test_get_action_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'action_name'
- action_id = cache_obj.get_action(data_mock.shared_ids["policy"]["policy_id_1"], name)
- assert action_id is not None
-
-
-def test_get_action_no_policy():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_action(None, "")
- assert str(exception_info.value) == '400: Policy Unknown'
-
-def test_get_action_invalid_name():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'invalid name'
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_action(data_mock.shared_ids["policy"]["policy_id_1"], name)
- assert str(exception_info.value) == '400: Action Unknown'
-
-def test_get_action_invalid_response():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- name = 'policy_id_invalid_response'
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_action(data_mock.shared_ids["policy"]["policy_id_invalid_response"], name)
- assert str(exception_info.value) == '400: Action Unknown'
-
-# ====================================================================================================
-
-# tests for get (subject_assignment) in cache
-# =================================================================================
-
-def test_get_subject_assignment_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- subject_assignments = cache_obj.get_subject_assignments(data_mock.shared_ids["policy"]["policy_id_1"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert subject_assignments is not None
-
-def test_get_subject_assignment_no_policy():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_subject_assignments(None,
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert str(exception_info.value) == '400: Policy Unknown'
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_subject_assignment_invalid_subject_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
-
- kwargs['mock'].get('http://manager:8082/policies/{}/subject_assignments/{}'
- .format(data_mock.shared_ids["subject"]["invalid_subject_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'subject_assignments': data_mock.subject_assignment_mock_invalid_subject_id}
- )
- cache_obj = cache.Cache()
- subject_assignments = cache_obj.get_subject_assignments(data_mock.shared_ids["subject"]["invalid_subject_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(subject_assignments) == 0
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_subject_assignment_invalid_category_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
- kwargs['mock'].get('http://manager:8082/policies/{}/subject_assignments/{}'
- .format(data_mock.shared_ids["subject"]["invalid_category_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'subject_assignments': data_mock.subject_assignment_mock_invalid_category_id}
- )
- cache_obj = cache.Cache()
- subject_assignments = cache_obj.get_subject_assignments(data_mock.shared_ids["subject"]["invalid_category_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(subject_assignments) == 0
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_subject_assignment_invalid_assignment_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
- kwargs['mock'].get('http://manager:8082/policies/{}/subject_assignments/{}'
- .format(data_mock.shared_ids["subject"]["invalid_assignment_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'subject_assignments': data_mock.subject_assignment_mock_invalid_assignment_id}
- )
-
- cache_obj = cache.Cache()
- subject_assignments = cache_obj.get_subject_assignments(data_mock.shared_ids["subject"]["invalid_assignment_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(subject_assignments) == 0
-
-
-def test_get_subject_assignment_empty_perimeter():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- subject_assignments = cache_obj.get_subject_assignments(data_mock.shared_ids["policy"]["policy_id_2"],
- None,
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(subject_assignments) == 0
-
-
-def test_get_subject_assignment_invalid_category_failure():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- subject_assignments = cache_obj.get_subject_assignments(data_mock.shared_ids["policy"]["policy_id_1"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["invalid_category_id_1"])
- assert len(subject_assignments) == 0
-
-# tests for get (object_assignment) in cache
-# ==========================================
-def test_get_object_assignment_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- object_assignments = cache_obj.get_object_assignments(data_mock.shared_ids["policy"]["policy_id_1"],
- data_mock.shared_ids["perimeter"]["perimeter_id_2"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert object_assignments is not None
-
-
-def test_get_object_assignment_no_policy():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_object_assignments(None, data_mock.shared_ids["perimeter"]["perimeter_id_2"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert str(exception_info.value) == '400: Policy Unknown'
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_object_assignment_invalid_object_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
-
- kwargs['mock'].get('http://manager:8082/policies/{}/object_assignments/{}'
- .format(data_mock.shared_ids["object"]["invalid_object_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'object_assignments': data_mock.object_assignment_mock_invalid_object_id}
- )
- cache_obj = cache.Cache()
- object_assignments = cache_obj.get_object_assignments(data_mock.shared_ids["object"]["invalid_object_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(object_assignments) == 0
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_object_assignment_invalid_category_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
- kwargs['mock'].get('http://manager:8082/policies/{}/object_assignments/{}'
- .format(data_mock.shared_ids["object"]["invalid_category_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'object_assignments': data_mock.object_assignment_mock_invalid_category_id}
- )
- cache_obj = cache.Cache()
- object_assignments = cache_obj.get_object_assignments(data_mock.shared_ids["object"]["invalid_category_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(object_assignments) == 0
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_object_assignment_invalid_assignment_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
- kwargs['mock'].get('http://manager:8082/policies/{}/object_assignments/{}'
- .format(data_mock.shared_ids["object"]["invalid_assignment_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'object_assignments': data_mock.object_assignment_mock_invalid_assignment_id}
- )
-
- cache_obj = cache.Cache()
- object_assignments = cache_obj.get_object_assignments(data_mock.shared_ids["object"]["invalid_assignment_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(object_assignments) == 0
-
-def test_get_object_assignment_none_perimeter():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- object_assignments = cache_obj.get_object_assignments(data_mock.shared_ids["policy"]["policy_id_2"],
- None,
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(object_assignments) == 0
-
-
-def test_get_object_assignment_invalid_category_failure():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- object_assignments = cache_obj.get_object_assignments(data_mock.shared_ids["policy"]["policy_id_1"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["invalid_category_id_1"])
- assert len(object_assignments) == 0
-
-# tests for get (action_assignment) in cache
-# ==========================================
-def test_get_action_assignment_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- action_assignments = cache_obj.get_action_assignments(data_mock.shared_ids["policy"]["policy_id_1"],
- data_mock.shared_ids["perimeter"]["perimeter_id_3"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert action_assignments is not None
-
-
-def test_get_action_assignment_no_policy():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- with pytest.raises(Exception) as exception_info:
- cache_obj.get_action_assignments(None, data_mock.shared_ids["perimeter"]["perimeter_id_2"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert str(exception_info.value) == '400: Policy Unknown'
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_action_assignment_invalid_object_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
-
- kwargs['mock'].get('http://manager:8082/policies/{}/action_assignments/{}'
- .format(data_mock.shared_ids["action"]["invalid_action_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'action_assignments': data_mock.action_assignment_mock_invalid_action_id}
- )
- cache_obj = cache.Cache()
- action_assignments = cache_obj.get_action_assignments(data_mock.shared_ids["action"]["invalid_action_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(action_assignments) == 0
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_action_assignment_invalid_category_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
- kwargs['mock'].get('http://manager:8082/policies/{}/action_assignments/{}'
- .format(data_mock.shared_ids["action"]["invalid_category_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'action_assignments': data_mock.action_assignment_mock_invalid_category_id}
- )
- cache_obj = cache.Cache()
- action_assignments = cache_obj.get_action_assignments(data_mock.shared_ids["action"]["invalid_category_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(action_assignments) == 0
-
-
-@requests_mock.Mocker(kw='mock')
-def test_get_action_assignment_invalid_assignment_id(**kwargs):
- from python_moonutilities import cache
-
- register_urls.register_components(kwargs['mock'])
- kwargs['mock'].get('http://manager:8082/policies/{}/action_assignments/{}'
- .format(data_mock.shared_ids["action"]["invalid_assignment_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"]),
- json={'action_assignments': data_mock.action_assignment_mock_invalid_assignment_id}
- )
-
- cache_obj = cache.Cache()
- action_assignments = cache_obj.get_action_assignments(data_mock.shared_ids["action"]["invalid_assignment_id"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(action_assignments) == 0
-
-def test_get_action_assignment_none_perimeter():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- action_assignments = cache_obj.get_action_assignments(data_mock.shared_ids["policy"]["policy_id_2"],
- None,
- data_mock.shared_ids["category"]["category_id_1"])
- assert len(action_assignments) == 0
-
-
-def test_get_action_assignment_invalid_category_failure():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- action_assignments = cache_obj.get_action_assignments(data_mock.shared_ids["policy"]["policy_id_1"],
- data_mock.shared_ids["perimeter"]["perimeter_id_1"],
- data_mock.shared_ids["category"]["invalid_category_id_1"])
- assert len(action_assignments) == 0
-
-
-# ====================================================================================================
-
-# tests for helper function in cache
-# ==================================
-def test_get_policy_from_meta_rules_success():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
- policy_id = cache_obj.get_policy_from_meta_rules(data_mock.shared_ids["meta_rule"]["meta_rule_id_1"])
- assert policy_id is not None
-
-''' tests for containers function , security pipline in cache which not used for now
- need to mock pdp object, /pods correctly
-'''
-
-# def test_get_policy_from_meta_rules_failure():
-# from python_moonutilities import cache
-# cache_obj = cache.Cache()
-# meta_rule_id = 'meta_rule_id3'
-# policy_id = cache_obj.get_policy_from_meta_rules(meta_rule_id)
-# assert policy_id is None
-
-# def test_get_pdp_from_keystone_project_success():
-# from python_moonutilities import cache
-# cache_obj = cache.Cache()
-# keystone_project_id = 'keystone_project_id1'
-# pdp_key = cache_obj.get_pdp_from_keystone_project(keystone_project_id)
-# assert pdp_key is not None
-#
-#
-# def test_get_pdp_from_keystone_project_failure():
-# from python_moonutilities import cache
-# cache_obj = cache.Cache()
-# keystone_project_id = 'keystone_project_id2'
-# pdp_key = cache_obj.get_pdp_from_keystone_project(keystone_project_id)
-# assert pdp_key is None
-#
-#
-# def test_get_keystone_project_id_from_policy_id_success():
-# from python_moonutilities import cache
-# cache_obj = cache.Cache()
-# keystone_project_id = cache_obj.get_keystone_project_id_from_policy_id(
-# data_mock.shared_ids["policy"]["policy_id_1"])
-# assert keystone_project_id is not None
-#
-#
-# def test_get_keystone_project_id_from_policy_id_failure():
-# from python_moonutilities import cache
-# cache_obj = cache.Cache()
-# policy_id = 'policy_id_3'
-# keystone_project_id = cache_obj.get_keystone_project_id_from_policy_id(policy_id)
-# assert keystone_project_id is None
-
-
-# def test_get_containers_from_keystone_project_id_success():
-# from python_moonutilities import cache
-# cache_obj = cache.Cache()
-# keystone_project_id = 1
-# meta_rule_id = 'meta_rule_id1'
-# container_id, container_value = cache_obj.get_containers_from_keystone_project_id(keystone_project_id, meta_rule_id)
-# assert container_id, container_value is not None
-
-
-def test_cache_manager():
- from python_moonutilities import cache
- cache_obj = cache.Cache()
-# assert cache_obj.pdp is not None
- assert cache_obj.meta_rules is not None
- assert len(cache_obj.meta_rules) == 2
- assert cache_obj.policies is not None
- assert len(cache_obj.policies) == 1
- assert cache_obj.models is not None