diff options
Diffstat (limited to 'keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py')
-rw-r--r-- | keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py | 861 |
1 files changed, 861 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py new file mode 100644 index 00000000..d08ecf39 --- /dev/null +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py @@ -0,0 +1,861 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +"""Unit tests for core IntraExtensionAuthzManager""" + +import json +import os +import uuid +from oslo_config import cfg +from keystone.tests import unit as tests +from keystone.contrib.moon.core import IntraExtensionAdminManager, IntraExtensionAuthzManager +from keystone.tests.unit.ksfixtures import database +from keystone import resource +from keystone.contrib.moon.exception import * +from keystone.tests.unit import default_fixtures +from keystone.contrib.moon.core import LogManager, TenantManager + +CONF = cfg.CONF + +USER_ADMIN = { + 'name': 'admin', + 'domain_id': "default", + 'password': 'admin' +} + +IE = { + "name": "test IE", + "policymodel": "policy_rbac_authz", + "description": "a simple description." +} + +class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): + + def setUp(self): + self.useFixture(database.Database()) + super(TestIntraExtensionAuthzManager, self).setUp() + self.load_backends() + self.load_fixtures(default_fixtures) + self.manager = IntraExtensionAuthzManager() + self.admin_manager = IntraExtensionAdminManager() + + def __get_key_from_value(self, value, values_dict): + return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] + + def load_extra_backends(self): + return { + "moonlog_api": LogManager(), + "tenant_api": TenantManager(), + # "resource_api": resource.Manager(), + } + + def config_overrides(self): + super(TestIntraExtensionAuthzManager, self).config_overrides() + self.policy_directory = '../../../examples/moon/policies' + self.config_fixture.config( + group='moon', + intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') + self.config_fixture.config( + group='moon', + policy_directory=self.policy_directory) + + +class TestIntraExtensionAuthzManager(tests.TestCase): + + def setUp(self): + self.useFixture(database.Database()) + super(TestIntraExtensionAuthzManager, self).setUp() + self.load_backends() + self.load_fixtures(default_fixtures) + self.manager = IntraExtensionAuthzManager() + self.admin_manager = IntraExtensionAdminManager() + + def __get_key_from_value(self, value, values_dict): + return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] + + def load_extra_backends(self): + return { + "moonlog_api": LogManager(), + "tenant_api": TenantManager(), + # "resource_api": resource.Manager(), + } + + def config_overrides(self): + super(TestIntraExtensionAuthzManager, self).config_overrides() + self.policy_directory = '../../../examples/moon/policies' + self.config_fixture.config( + group='moon', + intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') + self.config_fixture.config( + group='moon', + policy_directory=self.policy_directory) + + def create_intra_extension(self, policy_model="policy_rbac_authz"): + # Create the admin user because IntraExtension needs it + self.admin = self.identity_api.create_user(USER_ADMIN) + IE["policymodel"] = policy_model + self.ref = self.admin_manager.load_intra_extension(IE) + self.assertIsInstance(self.ref, dict) + self.create_tenant(self.ref["id"]) + + def create_tenant(self, authz_uuid): + tenant = { + "id": uuid.uuid4().hex, + "name": "TestIntraExtensionAuthzManager", + "enabled": True, + "description": "", + "domain_id": "default" + } + project = self.resource_api.create_project(tenant["id"], tenant) + mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None) + self.assertIsInstance(mapping, dict) + self.assertIn("authz", mapping) + self.assertEqual(mapping["authz"], authz_uuid) + return mapping + + def create_user(self, username="TestIntraExtensionAuthzManagerUser"): + user = { + "id": uuid.uuid4().hex, + "name": username, + "enabled": True, + "description": "", + "domain_id": "default" + } + _user = self.identity_api.create_user(user) + return _user + + def delete_admin_intra_extension(self): + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.delete_intra_extension, + self.ref["id"]) + + def test_subjects(self): + self.create_intra_extension() + + subjects = self.manager.get_subject_dict("admin", self.ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) + self.assertIsInstance(subjects["subjects"], dict) + + new_subject = self.create_user() + new_subjects = dict() + new_subjects[new_subject["id"]] = new_subject["name"] + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_subject_dict, + "admin", self.ref["id"], new_subjects) + + # Delete the new subject + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_subject, + "admin", self.ref["id"], new_subject["id"]) + + # Add a particular subject + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_subject_dict, + "admin", self.ref["id"], new_subject["id"]) + + def test_objects(self): + self.create_intra_extension() + + objects = self.manager.get_object_dict("admin", self.ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) + self.assertIsInstance(objects["objects"], dict) + + new_object = self.create_user() + new_objects = dict() + new_objects[new_object["id"]] = new_object["name"] + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_object_dict, + "admin", self.ref["id"], new_object["id"]) + + # Delete the new object + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_object, + "admin", self.ref["id"], new_object["id"]) + + # Add a particular object + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_object_dict, + "admin", self.ref["id"], new_object["name"]) + + def test_actions(self): + self.create_intra_extension() + + actions = self.manager.get_action_dict("admin", self.ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) + self.assertIsInstance(actions["actions"], dict) + + new_action = self.create_user() + new_actions = dict() + new_actions[new_action["id"]] = new_action["name"] + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_action_dict, + "admin", self.ref["id"], new_actions) + + # Delete the new action + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_action, + "admin", self.ref["id"], new_action["id"]) + + # Add a particular action + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_action_dict, + "admin", self.ref["id"], new_action["id"]) + + def test_subject_categories(self): + self.create_intra_extension() + + subject_categories = self.manager.get_subject_category_dict("admin", self.ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIsInstance(subject_categories["subject_categories"], dict) + + new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} + new_subject_categories = dict() + new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_subject_category_dict, + "admin", self.ref["id"], new_subject_categories) + + # Delete the new subject_category + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_subject_category, + "admin", self.ref["id"], new_subject_category["id"]) + + # Add a particular subject_category + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_subject_category_dict, + "admin", self.ref["id"], new_subject_category["name"]) + + def test_object_categories(self): + self.create_intra_extension() + + object_categories = self.manager.get_object_category_dict("admin", self.ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) + self.assertIsInstance(object_categories["object_categories"], dict) + + new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} + new_object_categories = dict() + new_object_categories[new_object_category["id"]] = new_object_category["name"] + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_object_category_dict, + "admin", self.ref["id"], new_object_categories) + + # Delete the new object_category + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_object_category, + "admin", self.ref["id"], new_object_category["id"]) + + # Add a particular object_category + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_object_category_dict, + "admin", self.ref["id"], new_object_category["name"]) + + def test_action_categories(self): + self.create_intra_extension() + + action_categories = self.manager.get_action_category_dict("admin", self.ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) + self.assertIsInstance(action_categories["action_categories"], dict) + + new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} + new_action_categories = dict() + new_action_categories[new_action_category["id"]] = new_action_category["name"] + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_action_category_dict, + "admin", self.ref["id"], new_action_categories) + + # Delete the new action_category + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_action_category, + "admin", self.ref["id"], new_action_category["id"]) + + # Add a particular action_category + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_action_category_dict, + "admin", self.ref["id"], new_action_category["name"]) + + def test_subject_category_scope(self): + self.create_intra_extension() + + subject_categories = self.admin_manager.set_subject_category_dict( + "admin", + self.ref["id"], + { + uuid.uuid4().hex: "admin", + uuid.uuid4().hex: "dev", + } + ) + + for subject_category in subject_categories["subject_categories"]: + subject_category_scope = self.manager.get_subject_category_scope_dict( + "admin", + self.ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + + new_subject_category_scope = dict() + new_subject_category_scope_uuid = uuid.uuid4().hex + new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_subject_category_scope_dict, + "admin", self.ref["id"], subject_category, new_subject_category_scope) + + # Delete the new subject_category_scope + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_subject_category_scope, + "admin", self.ref["id"], subject_category, new_subject_category_scope_uuid) + + # Add a particular subject_category_scope + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_subject_category_scope_dict, + "admin", self.ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) + + def test_object_category_scope(self): + self.create_intra_extension() + + object_categories = self.admin_manager.set_object_category_dict( + "admin", + self.ref["id"], + { + uuid.uuid4().hex: "id", + uuid.uuid4().hex: "domain", + } + ) + + for object_category in object_categories["object_categories"]: + object_category_scope = self.manager.get_object_category_scope_dict( + "admin", + self.ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIsInstance(object_category_scope["object_category_scope"], dict) + + new_object_category_scope = dict() + new_object_category_scope_uuid = uuid.uuid4().hex + new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_object_category_scope_dict, + "admin", self.ref["id"], object_category, new_object_category_scope) + + # Delete the new object_category_scope + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_object_category_scope, + "admin", self.ref["id"], object_category, new_object_category_scope_uuid) + + # Add a particular object_category_scope + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_object_category_scope_dict, + "admin", self.ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) + + def test_action_category_scope(self): + self.create_intra_extension() + + action_categories = self.admin_manager.set_action_category_dict( + "admin", + self.ref["id"], + { + uuid.uuid4().hex: "compute", + uuid.uuid4().hex: "identity", + } + ) + + for action_category in action_categories["action_categories"]: + action_category_scope = self.manager.get_action_category_scope_dict( + "admin", + self.ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIsInstance(action_category_scope["action_category_scope"], dict) + + new_action_category_scope = dict() + new_action_category_scope_uuid = uuid.uuid4().hex + new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_action_category_scope_dict, + "admin", self.ref["id"], action_category, new_action_category_scope) + + # Delete the new action_category_scope + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_action_category_scope, + "admin", self.ref["id"], action_category, new_action_category_scope_uuid) + + # Add a particular action_category_scope + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_action_category_scope_dict, + "admin", self.ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) + + def test_subject_category_assignment(self): + self.create_intra_extension() + + new_subject = self.create_user() + new_subjects = dict() + new_subjects[new_subject["id"]] = new_subject["name"] + subjects = self.admin_manager.set_subject_dict("admin", self.ref["id"], new_subjects) + + new_subject_category_uuid = uuid.uuid4().hex + new_subject_category_value = "role" + subject_categories = self.admin_manager.set_subject_category_dict( + "admin", + self.ref["id"], + { + new_subject_category_uuid: new_subject_category_value + } + ) + + for subject_category in subject_categories["subject_categories"]: + subject_category_scope = self.admin_manager.get_subject_category_scope_dict( + "admin", + self.ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + + new_subject_category_scope = dict() + new_subject_category_scope_uuid = uuid.uuid4().hex + new_subject_category_scope[new_subject_category_scope_uuid] = "admin" + subject_category_scope = self.admin_manager.set_subject_category_scope_dict( + "admin", + self.ref["id"], + subject_category, + new_subject_category_scope) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + new_subject_category_scope2 = dict() + new_subject_category_scope2_uuid = uuid.uuid4().hex + new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" + subject_category_scope = self.admin_manager.set_subject_category_scope_dict( + "admin", + self.ref["id"], + subject_category, + new_subject_category_scope2) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + "admin", + self.ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_subject_category_assignment_dict, + "admin", self.ref["id"], new_subject["id"], + { + new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], + }) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_subject_category_assignment, + "admin", self.ref["id"], new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_subject_category_assignment_dict, + "admin", self.ref["id"], new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid) + + def test_object_category_assignment(self): + self.create_intra_extension() + + new_object = self.create_user() + new_objects = dict() + new_objects[new_object["id"]] = new_object["name"] + objects = self.admin_manager.set_object_dict("admin", self.ref["id"], new_objects) + + new_object_category_uuid = uuid.uuid4().hex + new_object_category_value = "role" + object_categories = self.admin_manager.set_object_category_dict( + "admin", + self.ref["id"], + { + new_object_category_uuid: new_object_category_value + } + ) + + for object_category in object_categories["object_categories"]: + object_category_scope = self.admin_manager.get_object_category_scope_dict( + "admin", + self.ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIsInstance(object_category_scope["object_category_scope"], dict) + + new_object_category_scope = dict() + new_object_category_scope_uuid = uuid.uuid4().hex + new_object_category_scope[new_object_category_scope_uuid] = "admin" + object_category_scope = self.admin_manager.set_object_category_scope_dict( + "admin", + self.ref["id"], + object_category, + new_object_category_scope) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + new_object_category_scope2 = dict() + new_object_category_scope2_uuid = uuid.uuid4().hex + new_object_category_scope2[new_object_category_scope2_uuid] = "dev" + object_category_scope = self.admin_manager.set_object_category_scope_dict( + "admin", + self.ref["id"], + object_category, + new_object_category_scope2) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + object_category_assignments = self.manager.get_object_category_assignment_dict( + "admin", + self.ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_object_category_assignment_dict, + "admin", self.ref["id"], new_object["id"], + { + new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], + }) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_object_category_assignment, + "admin", self.ref["id"], new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_object_category_assignment_dict, + "admin", self.ref["id"], new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid) + + def test_action_category_assignment(self): + self.create_intra_extension() + + new_action = self.create_user() + new_actions = dict() + new_actions[new_action["id"]] = new_action["name"] + actions = self.admin_manager.set_action_dict("admin", self.ref["id"], new_actions) + + new_action_category_uuid = uuid.uuid4().hex + new_action_category_value = "role" + action_categories = self.admin_manager.set_action_category_dict( + "admin", + self.ref["id"], + { + new_action_category_uuid: new_action_category_value + } + ) + + for action_category in action_categories["action_categories"]: + action_category_scope = self.admin_manager.get_action_category_scope_dict( + "admin", + self.ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIsInstance(action_category_scope["action_category_scope"], dict) + + new_action_category_scope = dict() + new_action_category_scope_uuid = uuid.uuid4().hex + new_action_category_scope[new_action_category_scope_uuid] = "admin" + action_category_scope = self.admin_manager.set_action_category_scope_dict( + "admin", + self.ref["id"], + action_category, + new_action_category_scope) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + new_action_category_scope2 = dict() + new_action_category_scope2_uuid = uuid.uuid4().hex + new_action_category_scope2[new_action_category_scope2_uuid] = "dev" + action_category_scope = self.admin_manager.set_action_category_scope_dict( + "admin", + self.ref["id"], + action_category, + new_action_category_scope2) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + action_category_assignments = self.manager.get_action_category_assignment_dict( + "admin", + self.ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_action_category_assignment_dict, + "admin", self.ref["id"], new_action["id"], + { + new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], + }) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.del_action_category_assignment, + "admin", self.ref["id"], new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.add_action_category_assignment_dict, + "admin", self.ref["id"], new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid) + + def test_sub_meta_rules(self): + self.create_intra_extension() + + aggregation_algorithms = self.manager.get_aggregation_algorithms("admin", self.ref["id"]) + self.assertIsInstance(aggregation_algorithms, dict) + self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) + self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) + self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) + + aggregation_algorithm = self.manager.get_aggregation_algorithm("admin", self.ref["id"]) + self.assertIsInstance(aggregation_algorithm, dict) + self.assertIn("aggregation", aggregation_algorithm) + self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) + + _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) + _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_aggregation_algorithm, + "admin", self.ref["id"], _aggregation_algorithm[0]) + + sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + self.assertIsInstance(sub_meta_rules, dict) + self.assertIn("sub_meta_rules", sub_meta_rules) + sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, self.ref["model"], "metarule.json"))) + metarule = dict() + categories = { + "subject_categories": self.manager.get_subject_category_dict("admin", self.ref["id"]), + "object_categories": self.manager.get_object_category_dict("admin", self.ref["id"]), + "action_categories": self.manager.get_action_category_dict("admin", self.ref["id"]) + } + for relation in sub_meta_rules_conf["sub_meta_rules"]: + metarule[relation] = dict() + for item in ("subject_categories", "object_categories", "action_categories"): + metarule[relation][item] = list() + for element in sub_meta_rules_conf["sub_meta_rules"][relation][item]: + metarule[relation][item].append(self.__get_key_from_value( + element, + categories[item][item] + )) + + for relation in sub_meta_rules["sub_meta_rules"]: + self.assertIn(relation, metarule) + for item in ("subject_categories", "object_categories", "action_categories"): + self.assertEqual( + sub_meta_rules["sub_meta_rules"][relation][item], + metarule[relation][item] + ) + + new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} + # Add a particular subject_category + data = self.admin_manager.add_subject_category_dict( + "admin", + self.ref["id"], + new_subject_category["name"]) + new_subject_category["id"] = data["subject_category"]["uuid"] + subject_categories = self.manager.get_subject_category_dict( + "admin", + self.ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + metarule[relation]["subject_categories"].append(new_subject_category["id"]) + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_sub_meta_rule, + "admin", self.ref["id"], metarule) + + def test_sub_rules(self): + self.create_intra_extension() + + sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + self.assertIsInstance(sub_meta_rules, dict) + self.assertIn("sub_meta_rules", sub_meta_rules) + + sub_rules = self.manager.get_sub_rules("admin", self.ref["id"]) + self.assertIsInstance(sub_rules, dict) + self.assertIn("rules", sub_rules) + rules = dict() + for relation in sub_rules["rules"]: + self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"]) + rules[relation] = list() + for rule in sub_rules["rules"][relation]: + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + "admin", + self.ref["id"], + cat_value + ) + a_scope = rule.pop(0) + self.assertIn(a_scope, scope[func_name][cat_value]) + + # add a new subrule + + relation = sub_rules["rules"].keys()[0] + sub_rule = [] + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + "admin", + self.ref["id"], + cat_value + ) + sub_rule.append(scope[func_name][cat_value].keys()[0]) + + self.assertRaises( + AuthIntraExtensionModificationNotAuthorized, + self.manager.set_sub_rule, + "admin", self.ref["id"], relation, sub_rule) |