aboutsummaryrefslogtreecommitdiffstats
path: root/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py')
-rw-r--r--moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py234
1 files changed, 234 insertions, 0 deletions
diff --git a/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py b/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py
new file mode 100644
index 00000000..ff22abe2
--- /dev/null
+++ b/moon_utilities/tests/unit_python/helpers/import_export_cache_helper.py
@@ -0,0 +1,234 @@
+# Software Name: MOON
+
+# Version: 5.4
+
+# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
+# SPDX-License-Identifier: Apache-2.0
+
+# This software is distributed under the 'Apache License 2.0',
+# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+# or see the "LICENSE" file for more details.
+
+
+import logging
+
+logger = logging.getLogger("moon.manager.test.api." + __name__)
+
+
+def clean_models():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ keys = list(_cache.models.keys())
+ for key in keys:
+ _cache.delete_model(key)
+
+
+def clean_policies():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ keys = list(_cache.policies.keys())
+ for key in keys:
+ _cache.delete_policy(key)
+
+
+def clean_subjects():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ policy_keys = list(_cache.policies.keys())
+ subjects = _cache.subjects
+ for policy_key in policy_keys:
+ for key in subjects:
+ try:
+ _cache.delete_subject(policy_key, key)
+ except AttributeError:
+ pass
+ _cache.delete_subject()
+
+
+def clean_objects():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ policy_keys = list(_cache.policies.keys())
+ objects = _cache.objects
+ for policy_key in policy_keys:
+ for key in objects:
+ try:
+ _cache.delete_object(policy_key, key)
+ except AttributeError:
+ pass
+ _cache.delete_object()
+
+
+def clean_actions():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ policy_keys = list(_cache.policies.keys())
+ actions = _cache.actions
+ for policy_key in policy_keys:
+ for key in actions:
+ try:
+ _cache.delete_action(policy_key, key)
+ except AttributeError:
+ pass
+ _cache.delete_action()
+
+
+def clean_subject_categories():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.subject_categories.keys())
+ for key in categories:
+ _cache.delete_subject_category(key)
+
+
+def clean_object_categories():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.object_categories.keys())
+ for key in categories:
+ _cache.delete_object_category(key)
+
+
+def clean_action_categories():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.action_categories.keys())
+ for key in categories:
+ _cache.delete_action_category(key)
+
+
+def clean_subject_data():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ _cache.delete_subject_data()
+
+
+def clean_object_data():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ _cache.delete_object_data()
+
+
+def clean_action_data():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+ _cache.delete_action_data()
+
+
+def clean_meta_rule():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ categories = list(_cache.meta_rules.keys())
+ for key in categories:
+ _cache.delete_meta_rule(key)
+
+
+def clean_subject_assignments():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ if not _cache.subject_assignments:
+ return
+ for policy_key in _cache.policies.keys():
+ if policy_key not in _cache.subject_assignments:
+ continue
+ for key in _cache.subject_assignments[policy_key]:
+ # policy_key = _cache.subject_assignments[policy_key][key]["policy_id"]
+ subject_key = _cache.subject_assignments[policy_key][key]["subject_id"]
+ cat_key = _cache.subject_assignments[policy_key][key]["category_id"]
+ data_keys = _cache.subject_assignments[policy_key][key]["assignments"]
+ for data_key in data_keys:
+ _cache.delete_subject_assignment(
+ policy_id=policy_key,
+ perimeter_id=subject_key,
+ category_id=cat_key,
+ data_id=data_key
+ )
+ _cache.delete_subject_assignment()
+
+
+def clean_object_assignments():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ if not _cache.object_assignments:
+ return
+ for policy_key in _cache.policies.keys():
+ if policy_key not in _cache.object_assignments:
+ continue
+ for key in _cache.object_assignments[policy_key]:
+ # policy_key = _cache.object_assignments[policy_key][key]["policy_id"]
+ object_key = _cache.object_assignments[policy_key][key]["object_id"]
+ cat_key = _cache.object_assignments[policy_key][key]["category_id"]
+ data_keys = _cache.object_assignments[policy_key][key]["assignments"]
+ for data_key in data_keys:
+ _cache.delete_object_assignment(
+ policy_id=policy_key,
+ perimeter_id=object_key,
+ category_id=cat_key,
+ data_id=data_key
+ )
+ _cache.delete_object_assignment()
+
+
+def clean_action_assignments():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ if not _cache.action_assignments:
+ return
+ for policy_key in _cache.policies.keys():
+ if policy_key not in _cache.action_assignments:
+ continue
+ for key in _cache.action_assignments[policy_key]:
+ action_key = _cache.action_assignments[policy_key][key]["action_id"]
+ cat_key = _cache.action_assignments[policy_key][key]["category_id"]
+ data_keys = _cache.action_assignments[policy_key][key]["assignments"]
+ for data_key in data_keys:
+ _cache.delete_action_assignment(
+ policy_id=policy_key,
+ perimeter_id=action_key,
+ category_id=cat_key,
+ data_id=data_key
+ )
+ _cache.delete_action_assignment()
+
+
+def clean_rules():
+ from moon_cache.cache import Cache
+ _cache = Cache.getInstance()
+
+ rules = list(_cache.rules.keys())
+ for key in rules:
+ _cache.delete_rule(key)
+
+
+def clean_all():
+ clean_rules()
+
+ clean_subject_assignments()
+ clean_object_assignments()
+ clean_action_assignments()
+
+ clean_subject_data()
+ clean_object_data()
+ clean_action_data()
+
+ clean_actions()
+ clean_objects()
+ clean_subjects()
+
+ clean_policies()
+ clean_models()
+ clean_meta_rule()
+
+ clean_subject_categories()
+ clean_object_categories()
+ clean_action_categories()