diff options
Diffstat (limited to 'keystone-moon')
-rw-r--r-- | keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx | bin | 34574 -> 34626 bytes | |||
-rw-r--r-- | keystone-moon/keystone/contrib/moon/core.py | 90 | ||||
-rw-r--r-- | keystone-moon/keystone/contrib/moon/exception.py | 15 | ||||
-rw-r--r-- | keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py | 1549 | ||||
-rw-r--r-- | keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py | 713 |
5 files changed, 2075 insertions, 292 deletions
diff --git a/keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx b/keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx Binary files differindex f0f020fd..af18d231 100644 --- a/keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx +++ b/keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py index aa7fd884..69e8585b 100644 --- a/keystone-moon/keystone/contrib/moon/core.py +++ b/keystone-moon/keystone/contrib/moon/core.py @@ -245,6 +245,16 @@ class IntraExtensionManager(manager.Manager): :param obj: object of the request :param act: action of the request :return: True or False or raise an exception + :raises: (in that order) + SubjectUnknown + ObjectUnknown + ActionUnknown + SubjectCategoryAssignmentOutOfScope + ActionCategoryAssignmentOutOfScope + ObjectCategoryAssignmentOutOfScope + SubjectCategoryAssignmentUnknown + ObjectCategoryAssignmentUnknown + ActionCategoryAssignmentUnknown """ if not self.driver.get_intra_extension(uuid): raise IntraExtensionNotFound() @@ -1244,124 +1254,124 @@ class IntraExtensionAuthzManager(IntraExtensionManager): raise AdminException() def set_subject_dict(self, user_name, intra_extension_uuid, subject_dict): - raise AdminException() + raise SubjectAddNotAuthorized() def add_subject_dict(self, user_name, intra_extension_uuid, subject_uuid): - raise AdminException() + raise SubjectAddNotAuthorized() def del_subject(self, user_name, intra_extension_uuid, subject_uuid): - raise AdminException() + raise SubjectDelNotAuthorized() def set_object_dict(self, user_name, intra_extension_uuid, object_dict): - raise AdminException() + raise ObjectAddNotAuthorized() def add_object_dict(self, user_name, intra_extension_uuid, object_name): - raise AdminException() + raise ObjectAddNotAuthorized() def del_object(self, user_name, intra_extension_uuid, object_uuid): - raise AdminException() + raise ObjectDelNotAuthorized() def set_action_dict(self, user_name, intra_extension_uuid, action_dict): - raise AdminException() + raise ActionAddNotAuthorized() def add_action_dict(self, user_name, intra_extension_uuid, action_name): - raise AdminException() + raise ActionAddNotAuthorized() def del_action(self, user_name, intra_extension_uuid, action_uuid): - raise AdminException() + raise ActionDelNotAuthorized() def set_subject_category_dict(self, user_name, intra_extension_uuid, subject_category): - raise AdminException() + raise SubjectCategoryAddNotAuthorized() def add_subject_category_dict(self, user_name, intra_extension_uuid, subject_category_name): - raise AdminException() + raise SubjectCategoryAddNotAuthorized() def del_subject_category(self, user_name, intra_extension_uuid, subject_uuid): - raise AdminException() + raise SubjectCategoryDelNotAuthorized() def set_object_category_dict(self, user_name, intra_extension_uuid, object_category): - raise AdminException() + raise ObjectCategoryAddNotAuthorized() def add_object_category_dict(self, user_name, intra_extension_uuid, object_category_name): - raise AdminException() + raise ObjectCategoryAddNotAuthorized() def del_object_category(self, user_name, intra_extension_uuid, object_uuid): - raise AdminException() + raise ObjectCategoryDelNotAuthorized() def set_action_category_dict(self, user_name, intra_extension_uuid, action_category): - raise AdminException() + raise ActionCategoryAddNotAuthorized() def add_action_category_dict(self, user_name, intra_extension_uuid, action_category_name): - raise AdminException() + raise ActionCategoryAddNotAuthorized() def del_action_category(self, user_name, intra_extension_uuid, action_uuid): - raise AdminException() + raise ActionCategoryDelNotAuthorized() def set_subject_category_scope_dict(self, user_name, intra_extension_uuid, category, scope): - raise AdminException() + raise SubjectCategoryScopeAddNotAuthorized() def add_subject_category_scope_dict(self, user_name, intra_extension_uuid, subject_category, scope_name): - raise AdminException() + raise SubjectCategoryScopeAddNotAuthorized() def del_subject_category_scope(self, user_name, intra_extension_uuid, subject_category, subject_category_scope): - raise AdminException() + raise SubjectCategoryScopeDelNotAuthorized() def set_object_category_scope_dict(self, user_name, intra_extension_uuid, category, scope): - raise AdminException() + raise ObjectCategoryScopeAddNotAuthorized() def add_object_category_scope_dict(self, user_name, intra_extension_uuid, object_category, scope_name): - raise AdminException() + raise ObjectCategoryScopeAddNotAuthorized() def del_object_category_scope(self, user_name, intra_extension_uuid, object_category, object_category_scope): - raise AdminException() + raise ObjectCategoryScopeDelNotAuthorized() def set_action_category_scope_dict(self, user_name, intra_extension_uuid, category, scope): - raise AdminException() + raise ActionCategoryScopeAddNotAuthorized() def add_action_category_scope_dict(self, user_name, intra_extension_uuid, action_category, scope_name): - raise AdminException() + raise ActionCategoryScopeAddNotAuthorized() def del_action_category_scope(self, user_name, intra_extension_uuid, action_category, action_category_scope): - raise AdminException() + raise ActionCategoryScopeDelNotAuthorized() def set_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, assignment_dict): - raise AdminException() + raise SubjectCategoryAssignmentAddNotAuthorized() def del_subject_category_assignment(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid): - raise AdminException() + raise SubjectCategoryAssignmentAddNotAuthorized() def add_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid): - raise AdminException() + raise SubjectCategoryAssignmentDelNotAuthorized() def set_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, assignment_dict): - raise AdminException() + raise ObjectCategoryAssignmentAddNotAuthorized() def del_object_category_assignment(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ObjectCategoryAssignmentAddNotAuthorized() def add_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ObjectCategoryAssignmentDelNotAuthorized() def set_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, assignment_dict): - raise AdminException() + raise ActionCategoryAssignmentAddNotAuthorized() def del_action_category_assignment(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ActionCategoryAssignmentAddNotAuthorized() def add_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ActionCategoryAssignmentDelNotAuthorized() def set_aggregation_algorithm(self, user_name, intra_extension_uuid, aggregation_algorithm): - raise AdminException() + raise MetaRuleAddNotAuthorized() def set_sub_meta_rule(self, user_name, intra_extension_uuid, sub_meta_rules): - raise AdminException() + raise MetaRuleAddNotAuthorized() def set_sub_rule(self, user_name, intra_extension_uuid, relation, sub_rule): - raise AdminException() + raise RuleAddNotAuthorized() def del_sub_rule(self, user_name, intra_extension_uuid, relation_name, rule): - raise AdminException() + raise RuleAddNotAuthorized() @dependency.provider('admin_api') @dependency.requires('identity_api', 'moonlog_api', 'tenant_api') diff --git a/keystone-moon/keystone/contrib/moon/exception.py b/keystone-moon/keystone/contrib/moon/exception.py index b0ec740b..b206fc76 100644 --- a/keystone-moon/keystone/contrib/moon/exception.py +++ b/keystone-moon/keystone/contrib/moon/exception.py @@ -239,6 +239,9 @@ class AdminAssignment(AuthzException): class AdminRule(AuthzException): title = 'Rule Exception' +class AdminMetaRule(AuthzException): + title = 'MetaRule Exception' + class SubjectReadNotAuthorized(AdminPerimeter): title = 'Subject Read Not Authorized' @@ -395,3 +398,15 @@ class RuleAddNotAuthorized(AdminRule): class RuleDelNotAuthorized(AdminRule): title = 'Rule Del Not Authorized' + + +class MetaRuleReadNotAuthorized(AdminRule): + title = 'MetaRule Read Not Authorized' + + +class MetaRuleAddNotAuthorized(AdminRule): + title = 'MetaRule Add Not Authorized' + + +class MetaRuleDelNotAuthorized(AdminRule): + title = 'MetaRule Del Not Authorized' diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py index 29ebe7a2..6426bf84 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py @@ -19,7 +19,7 @@ from keystone.contrib.moon.core import LogManager, TenantManager CONF = cfg.CONF -USER_ADMIN = { +USER = { 'name': 'admin', 'domain_id': "default", 'password': 'admin' @@ -31,11 +31,11 @@ IE = { "description": "a simple description." } -class TestIntraExtensionAdminManager(tests.TestCase): +class TestIntraExtensionAdminManagerOK(tests.TestCase): def setUp(self): self.useFixture(database.Database()) - super(TestIntraExtensionAdminManager, self).setUp() + super(TestIntraExtensionAdminManagerOK, self).setUp() self.load_backends() self.load_fixtures(default_fixtures) self.manager = IntraExtensionAdminManager() @@ -51,7 +51,7 @@ class TestIntraExtensionAdminManager(tests.TestCase): } def config_overrides(self): - super(TestIntraExtensionAdminManager, self).config_overrides() + super(TestIntraExtensionAdminManagerOK, self).config_overrides() self.policy_directory = 'examples/moon/policies' self.config_fixture.config( group='moon', @@ -62,7 +62,7 @@ class TestIntraExtensionAdminManager(tests.TestCase): def create_intra_extension(self, policy_model="policy_rbac_admin"): # Create the admin user because IntraExtension needs it - self.admin = self.identity_api.create_user(USER_ADMIN) + self.admin = self.identity_api.create_user(USER) IE["policymodel"] = policy_model self.ref = self.manager.load_intra_extension(IE) self.assertIsInstance(self.ref, dict) @@ -1225,5 +1225,1544 @@ class TestIntraExtensionAdminManager(tests.TestCase): self.assertIn(a_scope, scope[func_name][cat_value]) +class TestIntraExtensionAdminManagerKO(tests.TestCase): + def setUp(self): + self.useFixture(database.Database()) + super(TestIntraExtensionAdminManagerKO, self).setUp() + self.load_backends() + self.load_fixtures(default_fixtures) + self.manager = IntraExtensionAuthzManager() + self.admin_manager = IntraExtensionAdminManager() + + def __get_key_from_value(self, value, values_dict): + return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] + + def load_extra_backends(self): + return { + "moonlog_api": LogManager(), + "tenant_api": TenantManager(), + "resource_api": resource.Manager(), + } + + def config_overrides(self): + super(TestIntraExtensionAdminManagerKO, self).config_overrides() + self.policy_directory = 'examples/moon/policies' + self.config_fixture.config( + group='moon', + intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') + self.config_fixture.config( + group='moon', + policy_directory=self.policy_directory) + + def create_tenant(self): + tenant = { + "id": uuid.uuid4().hex, + "name": "TestIntraExtensionAuthzManager", + "enabled": True, + "description": "", + "domain_id": "default" + } + return self.resource_api.create_project(tenant["id"], tenant) + + def create_mapping(self, tenant, authz_uuid=None, admin_uuid=None): + + mapping = self.tenant_api.set_tenant_dict(tenant["id"], tenant["name"], authz_uuid, admin_uuid) + self.assertIsInstance(mapping, dict) + self.assertIn("authz", mapping) + self.assertEqual(mapping["authz"], authz_uuid) + return mapping + + def create_user(self, username="admin"): + + _USER = dict(USER) + _USER["name"] = username + return self.identity_api.create_user(_USER) + + def create_intra_extension(self, policy_model="policy_rbac_authz"): + + IE["policymodel"] = policy_model + ref = self.admin_manager.load_intra_extension(IE) + self.assertIsInstance(self.ref, dict) + return ref + + def test_subjects(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + SubjectReadNotAuthorized, + self.manager.get_subject_dict, + demo_user["id"], ref["id"]) + + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertIsInstance(subjects["subjects"], dict) + + new_subject = self.create_user("new_user") + new_subjects = dict() + new_subjects[new_subject["id"]] = new_subject["name"] + + self.assertRaises( + SubjectAddNotAuthorized, + self.manager.set_subject_dict, + demo_user["id"], ref["id"], new_subjects) + + subjects = self.manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertEqual(subjects["subjects"], new_subjects) + self.assertIn(new_subject["id"], subjects["subjects"]) + + # Delete the new subject + self.assertRaises( + SubjectDelNotAuthorized, + self.manager.del_subject_dict, + demo_user["id"], ref["id"], new_subject["id"]) + + self.manager.del_subject(admin_user["id"], ref["id"], new_subject["id"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertNotIn(new_subject["id"], subjects["subjects"]) + + # Add a particular subject + self.assertRaises( + SubjectAddNotAuthorized, + self.manager.add_subject_dict, + demo_user["id"], ref["id"], new_subject["id"]) + + subjects = self.manager.add_subject_dict(admin_user["id"], ref["id"], new_subject["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subject", subjects) + self.assertIn("uuid", subjects["subject"]) + self.assertEqual(new_subject["name"], subjects["subject"]["name"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertIn(new_subject["id"], subjects["subjects"]) + + def test_objects(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ObjectReadNotAuthorized, + self.manager.get_object_dict, + demo_user["id"], ref["id"]) + + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertIsInstance(objects["objects"], dict) + + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} + new_objects = dict() + new_objects[new_object["id"]] = new_object["name"] + + self.assertRaises( + ObjectAddNotAuthorized, + self.manager.set_object_dict, + demo_user["id"], ref["id"], new_objects) + + objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertEqual(objects["objects"], new_objects) + self.assertIn(new_object["id"], objects["objects"]) + + # Delete the new object + self.assertRaises( + ObjectDelNotAuthorized, + self.manager.del_object_dict, + demo_user["id"], ref["id"], new_object["id"]) + + self.manager.del_object(admin_user["id"], ref["id"], new_object["id"]) + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertNotIn(new_object["id"], objects["objects"]) + + # Add a particular object + self.assertRaises( + ObjectAddNotAuthorized, + self.manager.add_object_dict, + demo_user["id"], ref["id"], new_object["name"]) + + objects = self.manager.add_object_dict(admin_user["id"], ref["id"], new_object["name"]) + self.assertIsInstance(objects, dict) + self.assertIn("object", objects) + self.assertIn("uuid", objects["object"]) + self.assertEqual(new_object["name"], objects["object"]["name"]) + new_object["id"] = objects["object"]["uuid"] + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertIn(new_object["id"], objects["objects"]) + + def test_actions(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ActionReadNotAuthorized, + self.manager.get_action_dict, + demo_user["id"], ref["id"]) + + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertIsInstance(actions["actions"], dict) + + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} + new_actions = dict() + new_actions[new_action["id"]] = new_action["name"] + + self.assertRaises( + ActionAddNotAuthorized, + self.manager.set_action_dict, + demo_user["id"], ref["id"], new_actions) + + actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertEqual(actions["actions"], new_actions) + self.assertIn(new_action["id"], actions["actions"]) + + # Delete the new action + self.assertRaises( + ActionDelNotAuthorized, + self.manager.del_action_dict, + demo_user["id"], ref["id"], new_action["id"]) + + self.manager.del_action(admin_user["id"], ref["id"], new_action["id"]) + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertNotIn(new_action["id"], actions["actions"]) + + # Add a particular action + self.assertRaises( + ActionAddNotAuthorized, + self.manager.add_action_dict, + demo_user["id"], ref["id"], new_action["name"]) + + actions = self.manager.add_action_dict(admin_user["id"], ref["id"], new_action["name"]) + self.assertIsInstance(actions, dict) + self.assertIn("action", actions) + self.assertIn("uuid", actions["action"]) + self.assertEqual(new_action["name"], actions["action"]["name"]) + new_action["id"] = actions["action"]["uuid"] + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertIn(new_action["id"], actions["actions"]) + + def test_subject_categories(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + SubjectCategoryReadNotAuthorized, + self.manager.get_subject_category_dict, + demo_user["id"], ref["id"]) + + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIsInstance(subject_categories["subject_categories"], dict) + + new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} + new_subject_categories = dict() + new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] + + self.assertRaises( + SubjectCategoryAddNotAuthorized, + self.manager.set_subject_category_dict, + demo_user["id"], ref["id"], new_subject_categories) + + subject_categories = self.manager.set_subject_category_dict(admin_user["id"], ref["id"], new_subject_categories) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(subject_categories["subject_categories"], new_subject_categories) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + + # Delete the new subject_category + self.assertRaises( + SubjectCategoryDelNotAuthorized, + self.manager.del_subject_category_dict, + demo_user["id"], ref["id"], new_subject_category["id"]) + + self.manager.del_subject_category(admin_user["id"], ref["id"], new_subject_category["id"]) + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertNotIn(new_subject_category["id"], subject_categories["subject_categories"]) + + # Add a particular subject_category + self.assertRaises( + SubjectCategoryAddNotAuthorized, + self.manager.add_subject_category_dict, + demo_user["id"], ref["id"], new_subject_category["name"]) + + subject_categories = self.manager.add_subject_category_dict( + admin_user["id"], + ref["id"], + new_subject_category["name"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_category", subject_categories) + self.assertIn("uuid", subject_categories["subject_category"]) + self.assertEqual(new_subject_category["name"], subject_categories["subject_category"]["name"]) + new_subject_category["id"] = subject_categories["subject_category"]["uuid"] + subject_categories = self.manager.get_subject_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + + def test_object_categories(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ObjectCategoryReadNotAuthorized, + self.manager.get_object_category_dict, + demo_user["id"], ref["id"]) + + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertIsInstance(object_categories["object_categories"], dict) + + new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} + new_object_categories = dict() + new_object_categories[new_object_category["id"]] = new_object_category["name"] + + self.assertRaises( + ObjectCategoryAddNotAuthorized, + self.manager.set_object_category_dict, + demo_user["id"], ref["id"], new_object_categories) + + object_categories = self.manager.set_object_category_dict(admin_user["id"], ref["id"], new_object_categories) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertEqual(object_categories["object_categories"], new_object_categories) + self.assertIn(new_object_category["id"], object_categories["object_categories"]) + + # Delete the new object_category + self.assertRaises( + ObjectCategoryDelNotAuthorized, + self.manager.del_object_category, + demo_user["id"], ref["id"], new_object_category["id"]) + + self.manager.del_object_category(admin_user["id"], ref["id"], new_object_category["id"]) + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertNotIn(new_object_category["id"], object_categories["object_categories"]) + + # Add a particular object_category + self.assertRaises( + ObjectCategoryAddNotAuthorized, + self.manager.add_object_category, + demo_user["id"], ref["id"], new_object_category["name"]) + + object_categories = self.manager.add_object_category_dict( + admin_user["id"], + ref["id"], + new_object_category["name"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_category", object_categories) + self.assertIn("uuid", object_categories["object_category"]) + self.assertEqual(new_object_category["name"], object_categories["object_category"]["name"]) + new_object_category["id"] = object_categories["object_category"]["uuid"] + object_categories = self.manager.get_object_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertIn(new_object_category["id"], object_categories["object_categories"]) + + def test_action_categories(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ActionCategoryReadNotAuthorized, + self.manager.get_action_category_dict, + demo_user["id"], ref["id"]) + + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertIsInstance(action_categories["action_categories"], dict) + + new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} + new_action_categories = dict() + new_action_categories[new_action_category["id"]] = new_action_category["name"] + self.assertRaises( + ActionCategoryAddNotAuthorized, + self.manager.set_action_category_dict, + demo_user["id"], ref["id"], new_action_categories) + + action_categories = self.manager.set_action_category_dict(admin_user["id"], ref["id"], new_action_categories) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertEqual(action_categories["action_categories"], new_action_categories) + self.assertIn(new_action_category["id"], action_categories["action_categories"]) + + # Delete the new action_category + self.assertRaises( + ActionCategoryDelNotAuthorized, + self.manager.del_action_category_dict, + demo_user["id"], ref["id"], new_action_category["id"]) + + self.manager.del_action_category(admin_user["id"], ref["id"], new_action_category["id"]) + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertNotIn(new_action_category["id"], action_categories["action_categories"]) + + # Add a particular action_category + self.assertRaises( + ActionCategoryAddNotAuthorized, + self.manager.add_action_category_dict, + demo_user["id"], ref["id"], new_action_category["name"]) + + action_categories = self.manager.add_action_category_dict( + admin_user["id"], + ref["id"], + new_action_category["name"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_category", action_categories) + self.assertIn("uuid", action_categories["action_category"]) + self.assertEqual(new_action_category["name"], action_categories["action_category"]["name"]) + new_action_category["id"] = action_categories["action_category"]["uuid"] + action_categories = self.manager.get_action_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertIn(new_action_category["id"], action_categories["action_categories"]) + + def test_subject_category_scope(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + subject_categories = self.manager.set_subject_category_dict( + admin_user["id"], + ref["id"], + { + uuid.uuid4().hex: admin_user["id"], + uuid.uuid4().hex: "dev", + } + ) + + for subject_category in subject_categories["subject_categories"]: + self.assertRaises( + SubjectCategoryScopeReadNotAuthorized, + self.manager.get_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category) + + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + + new_subject_category_scope = dict() + new_subject_category_scope_uuid = uuid.uuid4().hex + new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" + + self.assertRaises( + SubjectCategoryScopeAddNotAuthorized, + self.manager.set_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category, new_subject_category_scope) + + subject_category_scope = self.manager.set_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + # Delete the new subject_category_scope + self.assertRaises( + SubjectCategoryScopeDelNotAuthorized, + self.manager.del_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category, new_subject_category_scope_uuid) + + self.manager.del_subject_category_scope( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope_uuid) + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) + + # Add a particular subject_category_scope + self.assertRaises( + SubjectCategoryScopeAddNotAuthorized, + self.manager.add_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category, + new_subject_category_scope[new_subject_category_scope_uuid]) + + subject_category_scope = self.manager.add_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope[new_subject_category_scope_uuid]) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("uuid", subject_category_scope["subject_category_scope"]) + self.assertEqual(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"]["name"]) + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) + + def test_object_category_scope(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + object_categories = self.manager.set_object_category_dict( + admin_user["id"], + ref["id"], + { + uuid.uuid4().hex: "id", + uuid.uuid4().hex: "domain", + } + ) + + for object_category in object_categories["object_categories"]: + self.assertRaises( + ObjectCategoryScopeReadNotAuthorized, + self.manager.get_object_category_scope_dict, + demo_user["id"], ref["id"], object_category) + + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIsInstance(object_category_scope["object_category_scope"], dict) + + new_object_category_scope = dict() + new_object_category_scope_uuid = uuid.uuid4().hex + new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" + + self.assertRaises( + ObjectCategoryScopeAddNotAuthorized, + self.manager.set_object_category_scope_dict, + demo_user["id"], ref["id"], object_category, new_object_category_scope) + + object_category_scope = self.manager.set_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + # Delete the new object_category_scope + self.assertRaises( + ObjectCategoryScopeDelNotAuthorized, + self.manager.del_object_category_scope_dict, + demo_user["id"], ref["id"], object_category, new_object_category_scope) + + self.manager.del_object_category_scope( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope_uuid) + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) + + # Add a particular object_category_scope + self.assertRaises( + ObjectCategoryScopeAddNotAuthorized, + self.manager.add_object_category_scope_dict, + demo_user["id"], ref["id"], object_category, + new_object_category_scope[new_object_category_scope_uuid] + ) + + object_category_scope = self.manager.add_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope[new_object_category_scope_uuid]) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("uuid", object_category_scope["object_category_scope"]) + self.assertEqual(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"]["name"]) + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) + + def test_action_category_scope(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + action_categories = self.manager.set_action_category_dict( + admin_user["id"], + ref["id"], + { + uuid.uuid4().hex: "compute", + uuid.uuid4().hex: "identity", + } + ) + + for action_category in action_categories["action_categories"]: + self.assertRaises( + ActionCategoryScopeReadNotAuthorized, + self.manager.get_object_category_scope_dict, + demo_user["id"], ref["id"], action_category) + + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIsInstance(action_category_scope["action_category_scope"], dict) + + new_action_category_scope = dict() + new_action_category_scope_uuid = uuid.uuid4().hex + new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" + + self.assertRaises( + ActionCategoryScopeAddNotAuthorized, + self.manager.set_action_category_scope_dict, + demo_user["id"], ref["id"], action_category, new_action_category_scope) + + action_category_scope = self.manager.set_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + # Delete the new action_category_scope + self.assertRaises( + ActionCategoryScopeDelNotAuthorized, + self.manager.del_action_category_scope_dict, + demo_user["id"], ref["id"], action_category, + new_action_category_scope_uuid + ) + + self.manager.del_action_category_scope( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope_uuid) + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) + + # Add a particular action_category_scope + self.assertRaises( + ActionCategoryScopeAddNotAuthorized, + self.manager.add_action_category_scope_dict, + demo_user["id"], ref["id"], action_category, + new_action_category_scope[new_action_category_scope_uuid] + ) + + action_category_scope = self.manager.add_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope[new_action_category_scope_uuid]) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("uuid", action_category_scope["action_category_scope"]) + self.assertEqual(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"]["name"]) + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) + + def test_subject_category_assignment(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + new_subject = self.create_user() + new_subjects = dict() + new_subjects[new_subject["id"]] = new_subject["name"] + subjects = self.manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) + + new_subject_category_uuid = uuid.uuid4().hex + new_subject_category_value = "role" + subject_categories = self.manager.set_subject_category_dict( + admin_user["id"], + ref["id"], + { + new_subject_category_uuid: new_subject_category_value + } + ) + + for subject_category in subject_categories["subject_categories"]: + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + + new_subject_category_scope = dict() + new_subject_category_scope_uuid = uuid.uuid4().hex + new_subject_category_scope[new_subject_category_scope_uuid] = admin_user["id"] + subject_category_scope = self.manager.set_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + new_subject_category_scope2 = dict() + new_subject_category_scope2_uuid = uuid.uuid4().hex + new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" + subject_category_scope = self.manager.set_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope2) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + self.assertRaises( + SubjectCategoryAssignmentReadNotAuthorized, + self.manager.get_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"]) + + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + SubjectCategoryAssignmentAddNotAuthorized, + self.manager.set_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"], + { + new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], + } + ) + + subject_category_assignments = self.manager.set_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"], + { + new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], + } + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + SubjectCategoryAssignmentDelNotAuthorized, + self.manager.del_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + + self.manager.del_subject_category_assignment( + admin_user["id"], + ref["id"], + new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope2_uuid, ]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + SubjectCategoryAssignmentAddNotAuthorized, + self.manager.add_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + + data = self.manager.add_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope2_uuid, new_subject_category_scope_uuid]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + def test_object_category_assignment(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + new_object = self.create_user() + new_objects = dict() + new_objects[new_object["id"]] = new_object["name"] + objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects) + + new_object_category_uuid = uuid.uuid4().hex + new_object_category_value = "role" + object_categories = self.manager.set_object_category_dict( + admin_user["id"], + ref["id"], + { + new_object_category_uuid: new_object_category_value + } + ) + + for object_category in object_categories["object_categories"]: + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIsInstance(object_category_scope["object_category_scope"], dict) + + new_object_category_scope = dict() + new_object_category_scope_uuid = uuid.uuid4().hex + new_object_category_scope[new_object_category_scope_uuid] = admin_user["id"] + object_category_scope = self.manager.set_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + new_object_category_scope2 = dict() + new_object_category_scope2_uuid = uuid.uuid4().hex + new_object_category_scope2[new_object_category_scope2_uuid] = "dev" + object_category_scope = self.manager.set_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope2) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + self.assertRaises( + ObjectCategoryAssignmentReadNotAuthorized, + self.manager.get_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"] + ) + + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + ObjectCategoryAssignmentAddNotAuthorized, + self.manager.set_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"], + { + new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], + } + ) + + object_category_assignments = self.manager.set_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"], + { + new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], + } + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + ObjectCategoryAssignmentDelNotAuthorized, + self.manager.del_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + + self.manager.del_object_category_assignment( + admin_user["id"], + ref["id"], + new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope2_uuid, ]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + ObjectCategoryAssignmentAddNotAuthorized, + self.manager.add_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + + self.manager.add_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope2_uuid, new_object_category_scope_uuid]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + + def test_action_category_assignment(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + new_action = self.create_user() + new_actions = dict() + new_actions[new_action["id"]] = new_action["name"] + actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions) + + new_action_category_uuid = uuid.uuid4().hex + new_action_category_value = "role" + action_categories = self.manager.set_action_category_dict( + admin_user["id"], + ref["id"], + { + new_action_category_uuid: new_action_category_value + } + ) + + for action_category in action_categories["action_categories"]: + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIsInstance(action_category_scope["action_category_scope"], dict) + + new_action_category_scope = dict() + new_action_category_scope_uuid = uuid.uuid4().hex + new_action_category_scope[new_action_category_scope_uuid] = admin_user["id"] + action_category_scope = self.manager.set_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + new_action_category_scope2 = dict() + new_action_category_scope2_uuid = uuid.uuid4().hex + new_action_category_scope2[new_action_category_scope2_uuid] = "dev" + action_category_scope = self.manager.set_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope2) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + self.assertRaises( + ActionCategoryAssignmentReadNotAuthorized, + self.manager.get_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"] + ) + + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + ActionCategoryAssignmentAddNotAuthorized, + self.manager.set_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"], + { + new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], + } + ) + + action_category_assignments = self.manager.set_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"], + { + new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], + } + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + ActionCategoryAssignmentDelNotAuthorized, + self.manager.del_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + + self.manager.del_action_category_assignment( + admin_user["id"], + ref["id"], + new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope2_uuid, ]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + ActionCategoryAssignmentAddNotAuthorized, + self.manager.add_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + + self.manager.add_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope2_uuid, new_action_category_scope_uuid]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + + def test_sub_meta_rules(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"]) + self.assertIsInstance(aggregation_algorithms, dict) + self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) + self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) + self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) + + self.assertRaises( + MetaRuleReadNotAuthorized, + self.manager.get_aggregation_algorithm, + demo_user["id"], ref["id"] + ) + + aggregation_algorithm = self.manager.get_aggregation_algorithm(admin_user["id"], ref["id"]) + self.assertIsInstance(aggregation_algorithm, dict) + self.assertIn("aggregation", aggregation_algorithm) + self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) + + _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) + _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) + + self.assertRaises( + MetaRuleAddNotAuthorized, + self.manager.set_aggregation_algorithms, + demo_user["id"], ref["id"], _aggregation_algorithm[0] + ) + + aggregation_algorithm = self.manager.set_aggregation_algorithm(admin_user["id"], ref["id"], _aggregation_algorithm[0]) + self.assertIsInstance(aggregation_algorithm, dict) + self.assertIn("aggregation", aggregation_algorithm) + self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) + + self.assertRaises( + MetaRuleReadNotAuthorized, + self.manager.get_sub_meta_rule, + demo_user["id"], ref["id"] + ) + + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) + self.assertIsInstance(sub_meta_rules, dict) + self.assertIn("sub_meta_rules", sub_meta_rules) + sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, ref["model"], "metarule.json"))) + metarule = dict() + categories = { + "subject_categories": self.manager.get_subject_category_dict(admin_user["id"], ref["id"]), + "object_categories": self.manager.get_object_category_dict(admin_user["id"], ref["id"]), + "action_categories": self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + } + for relation in sub_meta_rules_conf["sub_meta_rules"]: + metarule[relation] = dict() + for item in ("subject_categories", "object_categories", "action_categories"): + metarule[relation][item] = list() + for element in sub_meta_rules_conf["sub_meta_rules"][relation][item]: + metarule[relation][item].append(self.__get_key_from_value( + element, + categories[item][item] + )) + + for relation in sub_meta_rules["sub_meta_rules"]: + self.assertIn(relation, metarule) + for item in ("subject_categories", "object_categories", "action_categories"): + self.assertEqual( + sub_meta_rules["sub_meta_rules"][relation][item], + metarule[relation][item] + ) + + new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} + # Add a particular subject_category + data = self.manager.add_subject_category_dict( + admin_user["id"], + ref["id"], + new_subject_category["name"]) + new_subject_category["id"] = data["subject_category"]["uuid"] + subject_categories = self.manager.get_subject_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + metarule[relation]["subject_categories"].append(new_subject_category["id"]) + + self.assertRaises( + MetaRuleAddNotAuthorized, + self.manager.set_sub_meta_rule, + demo_user["id"], ref["id"], metarule + ) + + _sub_meta_rules = self.manager.set_sub_meta_rule(admin_user["id"], ref["id"], metarule) + self.assertIn(relation, metarule) + for item in ("subject_categories", "object_categories", "action_categories"): + self.assertEqual( + _sub_meta_rules["sub_meta_rules"][relation][item], + metarule[relation][item] + ) + + def test_sub_rules(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) + self.assertIsInstance(sub_meta_rules, dict) + self.assertIn("sub_meta_rules", sub_meta_rules) + + self.assertRaises( + RuleReadNotAuthorized, + self.manager.get_sub_rules, + demo_user["id"], ref["id"] + ) + + sub_rules = self.manager.get_sub_rules(admin_user["id"], ref["id"]) + self.assertIsInstance(sub_rules, dict) + self.assertIn("rules", sub_rules) + rules = dict() + for relation in sub_rules["rules"]: + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) + rules[relation] = list() + for rule in sub_rules["rules"][relation]: + print(rule) + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + admin_user["id"], + ref["id"], + cat_value + ) + a_scope = rule.pop(0) + print(a_scope) + if type(a_scope) is not bool: + self.assertIn(a_scope, scope[func_name][cat_value]) + + # add a new subrule + + relation = sub_rules["rules"].keys()[0] + sub_rule = [] + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + admin_user["id"], + ref["id"], + cat_value + ) + sub_rule.append(scope[func_name][cat_value].keys()[0]) + + sub_rule.append(True) + self.assertRaises( + RuleAddNotAuthorized, + self.manager.set_sub_rules, + demo_user["id"], ref["id"], relation, sub_rule + ) + + sub_rules = self.manager.set_sub_rule(admin_user["id"], ref["id"], relation, sub_rule) + self.assertIsInstance(sub_rules, dict) + self.assertIn("rules", sub_rules) + rules = dict() + self.assertIn(sub_rule, sub_rules["rules"][relation]) + for relation in sub_rules["rules"]: + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) + rules[relation] = list() + for rule in sub_rules["rules"][relation]: + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + admin_user["id"], + ref["id"], + cat_value + ) + a_scope = rule.pop(0) + self.assertIn(a_scope, scope[func_name][cat_value]) diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py index 6d852780..64a2d38f 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py @@ -19,7 +19,7 @@ from keystone.contrib.moon.core import LogManager, TenantManager CONF = cfg.CONF -USER_ADMIN = { +USER = { 'name': 'admin', 'domain_id': "default", 'password': 'admin' @@ -48,7 +48,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): return { "moonlog_api": LogManager(), "tenant_api": TenantManager(), - # "resource_api": resource.Manager(), + "resource_api": resource.Manager(), } def config_overrides(self): @@ -61,46 +61,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): group='moon', policy_directory=self.policy_directory) - -class TestIntraExtensionAuthzManager(tests.TestCase): - - def setUp(self): - self.useFixture(database.Database()) - super(TestIntraExtensionAuthzManager, self).setUp() - self.load_backends() - self.load_fixtures(default_fixtures) - self.manager = IntraExtensionAuthzManager() - self.admin_manager = IntraExtensionAdminManager() - - def __get_key_from_value(self, value, values_dict): - return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] - - def load_extra_backends(self): - return { - "moonlog_api": LogManager(), - "tenant_api": TenantManager(), - # "resource_api": resource.Manager(), - } - - def config_overrides(self): - super(TestIntraExtensionAuthzManager, self).config_overrides() - self.policy_directory = 'examples/moon/policies' - self.config_fixture.config( - group='moon', - intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') - self.config_fixture.config( - group='moon', - policy_directory=self.policy_directory) - - def create_intra_extension(self, policy_model="policy_rbac_authz"): - # Create the admin user because IntraExtension needs it - self.admin = self.identity_api.create_user(USER_ADMIN) - IE["policymodel"] = policy_model - self.ref = self.admin_manager.load_intra_extension(IE) - self.assertIsInstance(self.ref, dict) - self.create_tenant(self.ref["id"]) - - def create_tenant(self, authz_uuid): + def create_tenant(self): tenant = { "id": uuid.uuid4().hex, "name": "TestIntraExtensionAuthzManager", @@ -108,266 +69,517 @@ class TestIntraExtensionAuthzManager(tests.TestCase): "description": "", "domain_id": "default" } - project = self.resource_api.create_project(tenant["id"], tenant) - mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None) + return self.resource_api.create_project(tenant["id"], tenant) + + def create_mapping(self, tenant, authz_uuid=None, admin_uuid=None): + + mapping = self.tenant_api.set_tenant_dict(tenant["id"], tenant["name"], authz_uuid, admin_uuid) self.assertIsInstance(mapping, dict) self.assertIn("authz", mapping) self.assertEqual(mapping["authz"], authz_uuid) return mapping - def create_user(self, username="TestIntraExtensionAuthzManagerUser"): - user = { - "id": uuid.uuid4().hex, - "name": username, - "enabled": True, - "description": "", - "domain_id": "default" - } - _user = self.identity_api.create_user(user) - return _user + def create_user(self, username="admin"): + + _USER = dict(USER) + _USER["name"] = username + return self.identity_api.create_user(_USER) + + def create_intra_extension(self, policy_model="policy_rbac_authz"): + + IE["policymodel"] = policy_model + ref = self.admin_manager.load_intra_extension(IE) + self.assertIsInstance(self.ref, dict) + return ref + + def test_tenant_exceptions(self): + self.assertRaises( + TenantListEmpty, + self.manager.get_tenant_dict + ) + self.assertRaises( + TenantNotFound, + self.manager.get_tenant_name, + uuid.uuid4().hex + ) + self.assertRaises( + TenantNotFound, + self.manager.set_tenant_name, + uuid.uuid4().hex, uuid.uuid4().hex + ) + self.assertRaises( + TenantNotFound, + self.manager.get_extension_uuid, + uuid.uuid4().hex, "authz" + ) + self.assertRaises( + TenantNotFound, + self.manager.get_extension_uuid, + uuid.uuid4().hex, "admin" + ) - def delete_admin_intra_extension(self): + def test_intra_extension_exceptions(self): + + tenant = self.create_tenant() + self.assertRaises( + IntraExtensionNotFound, + self.manager.get_extension_uuid, + tenant["id"], "authz" + ) + self.assertRaises( + IntraExtensionNotFound, + self.manager.get_extension_uuid, + tenant["id"], "admin" + ) + # TODO + + def test_delete_admin_intra_extension(self): self.assertRaises( AdminException, self.manager.delete_intra_extension, self.ref["id"]) + def test_authz_exceptions(self): + self.assertRaises( + IntraExtensionNotFound, + self.manager.authz, + uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex + ) + + admin_user = self.create_user() + tenant = self.create_tenant() + ie_authz = self.create_intra_extension("policy_rbac_authz") + ie_admin = self.create_intra_extension("policy_rbac_admin") + mapping = self.create_mapping(tenant, ie_authz["id"], ie_admin["id"]) + + # Test when subject is unknown + self.assertRaises( + SubjectUnknown, + self.manager.authz, + ie_authz["id"], uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex + ) + + # Test when subject is known but not the object + demo_user = self.create_user("demo") + self.manager.add_subject_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"] + ) + + self.assertRaises( + ObjectUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], uuid.uuid4().hex, uuid.uuid4().hex + ) + + # Test when subject and object are known but not the action + _tmp = self.manager.add_object_dict( + admin_user['id'], + self.ref["id"], + "my_object" + ).items()[0] + my_object = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ActionUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], uuid.uuid4().hex + ) + + # Test when subject and object and action are known + _tmp = self.manager.add_action_dict( + admin_user['id'], + self.ref["id"], + "my_action" + ).items()[0] + my_action = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + SubjectCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add a subject scope and test ObjectCategoryAssignmentOutOfScope + _tmp = self.manager.add_subject_category_dict( + admin_user['id'], + self.ref["id"], + "my_subject_category" + ) + my_subject_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_subject_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_subject_category["id"], + "my_subject_scope", + ) + my_subject_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ObjectCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an object scope and test ActionCategoryAssignmentOutOfScope + _tmp = self.manager.add_object_category_dict( + admin_user['id'], + self.ref["id"], + "my_object_category" + ) + my_object_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_object_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_object_category["id"], + "my_object_scope", + ) + my_object_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ActionCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an action scope and test SubjectCategoryAssignmentUnknown + _tmp = self.manager.add_action_category_dict( + admin_user['id'], + self.ref["id"], + "my_action_category" + ) + my_action_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_action_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_action_category["id"], + "my_action_scope", + ) + my_action_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + SubjectCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add a subject assignment and test ObjectCategoryAssignmentUnknown + self.manager.add_subject_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_subject_category["id"], + my_subject_scope["id"] + ) + + self.assertRaises( + ObjectCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an object assignment and test ActionCategoryAssignmentUnknown + self.manager.add_object_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_object_category["id"], + my_object_scope["id"] + ) + + self.assertRaises( + ActionCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an action assignment and test RuleUnknown + self.manager.add_action_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_action_category["id"], + my_action_scope["id"] + ) + + self.assertRaises( + RuleUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add the correct rule and test that no exception is raised + my_meta_rule = { + "relation_super": { + "subject_categories": [my_subject_category["id"], ], + "action_categories": [my_object_category["id"], ], + "object_categories": [my_action_category["id"], ], + "relation": "relation_super" + } + } + self.manager.set_sub_meta_rule( + admin_user['id'], + self.ref["id"], + my_meta_rule + ) + self.manager.set_sub_rule( + admin_user['id'], + self.ref["id"], + "relation_super", + [my_subject_scope, my_object_scope, my_action_scope] + ) + + result = self.manager.authz(ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"]) + self.assertEqual(True, result) + def test_subjects(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - subjects = self.manager.get_subject_dict("admin", self.ref["id"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) self.assertIsInstance(subjects, dict) self.assertIn("subjects", subjects) self.assertIn("id", subjects) self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) self.assertIsInstance(subjects["subjects"], dict) - new_subject = self.create_user() + new_subject = self.create_user("my_user") new_subjects = dict() new_subjects[new_subject["id"]] = new_subject["name"] self.assertRaises( - AdminException, + SubjectAddNotAuthorized, self.manager.set_subject_dict, - "admin", self.ref["id"], new_subjects) + admin_user["id"], ref["id"], new_subjects) # Delete the new subject self.assertRaises( - AdminException, + SubjectDelNotAuthorized, self.manager.del_subject, - "admin", self.ref["id"], new_subject["id"]) + admin_user["id"], ref["id"], new_subject["id"]) # Add a particular subject self.assertRaises( - AdminException, + SubjectAddNotAuthorized, self.manager.add_subject_dict, - "admin", self.ref["id"], new_subject["id"]) + admin_user["id"], ref["id"], new_subject["id"]) def test_objects(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - objects = self.manager.get_object_dict("admin", self.ref["id"]) + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) self.assertIsInstance(objects["objects"], dict) - new_object = self.create_user() + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} new_objects = dict() new_objects[new_object["id"]] = new_object["name"] self.assertRaises( - AdminException, + ObjectAddNotAuthorized, self.manager.set_object_dict, - "admin", self.ref["id"], new_object["id"]) + admin_user["id"], ref["id"], new_object["id"]) # Delete the new object self.assertRaises( - AdminException, + ObjectDelNotAuthorized, self.manager.del_object, - "admin", self.ref["id"], new_object["id"]) + admin_user["id"], ref["id"], new_object["id"]) # Add a particular object self.assertRaises( - AdminException, + ObjectAddNotAuthorized, self.manager.add_object_dict, - "admin", self.ref["id"], new_object["name"]) + admin_user["id"], ref["id"], new_object["name"]) def test_actions(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - actions = self.manager.get_action_dict("admin", self.ref["id"]) + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) self.assertIsInstance(actions["actions"], dict) - new_action = self.create_user() + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} new_actions = dict() new_actions[new_action["id"]] = new_action["name"] self.assertRaises( - AdminException, + ActionAddNotAuthorized, self.manager.set_action_dict, - "admin", self.ref["id"], new_actions) + admin_user["id"], ref["id"], new_actions) # Delete the new action self.assertRaises( - AdminException, + ActionDelNotAuthorized, self.manager.del_action, - "admin", self.ref["id"], new_action["id"]) + admin_user["id"], ref["id"], new_action["id"]) # Add a particular action self.assertRaises( - AdminException, + ActionAddNotAuthorized, self.manager.add_action_dict, - "admin", self.ref["id"], new_action["id"]) + admin_user["id"], ref["id"], new_action["id"]) def test_subject_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - subject_categories = self.manager.get_subject_category_dict("admin", self.ref["id"]) + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(subject_categories, dict) self.assertIn("subject_categories", subject_categories) self.assertIn("id", subject_categories) self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) self.assertIsInstance(subject_categories["subject_categories"], dict) new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} new_subject_categories = dict() new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] self.assertRaises( - AdminException, + SubjectCategoryAddNotAuthorized, self.manager.set_subject_category_dict, - "admin", self.ref["id"], new_subject_categories) + admin_user["id"], ref["id"], new_subject_categories) # Delete the new subject_category self.assertRaises( - AdminException, + SubjectCategoryDelNotAuthorized, self.manager.del_subject_category, - "admin", self.ref["id"], new_subject_category["id"]) + admin_user["id"], ref["id"], new_subject_category["id"]) # Add a particular subject_category self.assertRaises( - AdminException, + SubjectCategoryAddNotAuthorized, self.manager.add_subject_category_dict, - "admin", self.ref["id"], new_subject_category["name"]) + admin_user["id"], ref["id"], new_subject_category["name"]) def test_object_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - object_categories = self.manager.get_object_category_dict("admin", self.ref["id"]) + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(object_categories, dict) self.assertIn("object_categories", object_categories) self.assertIn("id", object_categories) self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) self.assertIsInstance(object_categories["object_categories"], dict) new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} new_object_categories = dict() new_object_categories[new_object_category["id"]] = new_object_category["name"] self.assertRaises( - AdminException, + ObjectCategoryAddNotAuthorized, self.manager.set_object_category_dict, - "admin", self.ref["id"], new_object_categories) + admin_user["id"], ref["id"], new_object_categories) # Delete the new object_category self.assertRaises( - AdminException, + ObjectCategoryDelNotAuthorized, self.manager.del_object_category, - "admin", self.ref["id"], new_object_category["id"]) + admin_user["id"], ref["id"], new_object_category["id"]) # Add a particular object_category self.assertRaises( - AdminException, + ObjectCategoryAddNotAuthorized, self.manager.add_object_category_dict, - "admin", self.ref["id"], new_object_category["name"]) + admin_user["id"], ref["id"], new_object_category["name"]) def test_action_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - action_categories = self.manager.get_action_category_dict("admin", self.ref["id"]) + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(action_categories, dict) self.assertIn("action_categories", action_categories) self.assertIn("id", action_categories) self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) self.assertIsInstance(action_categories["action_categories"], dict) new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} new_action_categories = dict() new_action_categories[new_action_category["id"]] = new_action_category["name"] self.assertRaises( - AdminException, + ActionCategoryAddNotAuthorized, self.manager.set_action_category_dict, - "admin", self.ref["id"], new_action_categories) + admin_user["id"], ref["id"], new_action_categories) # Delete the new action_category self.assertRaises( - AdminException, + ActionCategoryDelNotAuthorized, self.manager.del_action_category, - "admin", self.ref["id"], new_action_category["id"]) + admin_user["id"], ref["id"], new_action_category["id"]) # Add a particular action_category self.assertRaises( - AdminException, + ActionCategoryAddNotAuthorized, self.manager.add_action_category_dict, - "admin", self.ref["id"], new_action_category["name"]) + admin_user["id"], ref["id"], new_action_category["name"]) def test_subject_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() subject_categories = self.admin_manager.set_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { - uuid.uuid4().hex: "admin", + uuid.uuid4().hex: admin_user["id"], uuid.uuid4().hex: "dev", } ) for subject_category in subject_categories["subject_categories"]: subject_category_scope = self.manager.get_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) new_subject_category_scope = dict() new_subject_category_scope_uuid = uuid.uuid4().hex new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" self.assertRaises( - AdminException, + SubjectCategoryScopeAddNotAuthorized, self.manager.set_subject_category_scope_dict, - "admin", self.ref["id"], subject_category, new_subject_category_scope) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope) # Delete the new subject_category_scope self.assertRaises( - AdminException, + SubjectCategoryScopeDelNotAuthorized, self.manager.del_subject_category_scope, - "admin", self.ref["id"], subject_category, new_subject_category_scope_uuid) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope_uuid) # Add a particular subject_category_scope self.assertRaises( - AdminException, + SubjectCategoryScopeAddNotAuthorized, self.manager.add_subject_category_scope_dict, - "admin", self.ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) def test_object_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() object_categories = self.admin_manager.set_object_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { uuid.uuid4().hex: "id", uuid.uuid4().hex: "domain", @@ -376,42 +588,43 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for object_category in object_categories["object_categories"]: object_category_scope = self.manager.get_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIsInstance(object_category_scope["object_category_scope"], dict) new_object_category_scope = dict() new_object_category_scope_uuid = uuid.uuid4().hex new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" self.assertRaises( - AdminException, + ObjectCategoryScopeAddNotAuthorized, self.manager.set_object_category_scope_dict, - "admin", self.ref["id"], object_category, new_object_category_scope) + admin_user["id"], ref["id"], object_category, new_object_category_scope) # Delete the new object_category_scope self.assertRaises( - AdminException, + ObjectCategoryScopeDelNotAuthorized, self.manager.del_object_category_scope, - "admin", self.ref["id"], object_category, new_object_category_scope_uuid) + admin_user["id"], ref["id"], object_category, new_object_category_scope_uuid) # Add a particular object_category_scope self.assertRaises( - AdminException, + ObjectCategoryScopeAddNotAuthorized, self.manager.add_object_category_scope_dict, - "admin", self.ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) + admin_user["id"], ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) def test_action_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() action_categories = self.admin_manager.set_action_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { uuid.uuid4().hex: "compute", uuid.uuid4().hex: "identity", @@ -420,49 +633,50 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for action_category in action_categories["action_categories"]: action_category_scope = self.manager.get_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIsInstance(action_category_scope["action_category_scope"], dict) new_action_category_scope = dict() new_action_category_scope_uuid = uuid.uuid4().hex new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" self.assertRaises( - AdminException, + ActionCategoryScopeAddNotAuthorized, self.manager.set_action_category_scope_dict, - "admin", self.ref["id"], action_category, new_action_category_scope) + admin_user["id"], ref["id"], action_category, new_action_category_scope) # Delete the new action_category_scope self.assertRaises( - AdminException, + ActionCategoryScopeDelNotAuthorized, self.manager.del_action_category_scope, - "admin", self.ref["id"], action_category, new_action_category_scope_uuid) + admin_user["id"], ref["id"], action_category, new_action_category_scope_uuid) # Add a particular action_category_scope self.assertRaises( - AdminException, + ActionCategoryScopeAddNotAuthorized, self.manager.add_action_category_scope_dict, - "admin", self.ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) + admin_user["id"], ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) def test_subject_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() new_subject = self.create_user() new_subjects = dict() new_subjects[new_subject["id"]] = new_subject["name"] - subjects = self.admin_manager.set_subject_dict("admin", self.ref["id"], new_subjects) + subjects = self.admin_manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) new_subject_category_uuid = uuid.uuid4().hex new_subject_category_value = "role" subject_categories = self.admin_manager.set_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_subject_category_uuid: new_subject_category_value } @@ -470,29 +684,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for subject_category in subject_categories["subject_categories"]: subject_category_scope = self.admin_manager.get_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) new_subject_category_scope = dict() new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = "admin" + new_subject_category_scope[new_subject_category_scope_uuid] = admin_user["id"] subject_category_scope = self.admin_manager.set_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category, new_subject_category_scope) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], subject_category_scope["subject_category_scope"][subject_category].values()) @@ -500,65 +714,66 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_subject_category_scope2_uuid = uuid.uuid4().hex new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" subject_category_scope = self.admin_manager.set_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category, new_subject_category_scope2) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], subject_category_scope["subject_category_scope"][subject_category].values()) subject_category_assignments = self.manager.get_subject_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_subject["id"] ) self.assertIsInstance(subject_category_assignments, dict) self.assertIn("subject_category_assignments", subject_category_assignments) self.assertIn("id", subject_category_assignments) self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) self.assertRaises( - AdminException, + SubjectCategoryAssignmentAddNotAuthorized, self.manager.set_subject_category_assignment_dict, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], { new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], }) self.assertRaises( - AdminException, + SubjectCategoryAssignmentDelNotAuthorized, self.manager.del_subject_category_assignment, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) self.assertRaises( - AdminException, + SubjectCategoryAssignmentAddNotAuthorized, self.manager.add_subject_category_assignment_dict, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) def test_object_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - new_object = self.create_user() + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} new_objects = dict() new_objects[new_object["id"]] = new_object["name"] - objects = self.admin_manager.set_object_dict("admin", self.ref["id"], new_objects) + objects = self.admin_manager.set_object_dict(admin_user["id"], ref["id"], new_objects) new_object_category_uuid = uuid.uuid4().hex new_object_category_value = "role" object_categories = self.admin_manager.set_object_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_object_category_uuid: new_object_category_value } @@ -566,29 +781,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for object_category in object_categories["object_categories"]: object_category_scope = self.admin_manager.get_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIsInstance(object_category_scope["object_category_scope"], dict) new_object_category_scope = dict() new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = "admin" + new_object_category_scope[new_object_category_scope_uuid] = admin_user["id"] object_category_scope = self.admin_manager.set_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category, new_object_category_scope) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIn(new_object_category_scope[new_object_category_scope_uuid], object_category_scope["object_category_scope"][object_category].values()) @@ -596,65 +811,66 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_object_category_scope2_uuid = uuid.uuid4().hex new_object_category_scope2[new_object_category_scope2_uuid] = "dev" object_category_scope = self.admin_manager.set_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category, new_object_category_scope2) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], object_category_scope["object_category_scope"][object_category].values()) object_category_assignments = self.manager.get_object_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_object["id"] ) self.assertIsInstance(object_category_assignments, dict) self.assertIn("object_category_assignments", object_category_assignments) self.assertIn("id", object_category_assignments) self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) self.assertRaises( - AdminException, + ObjectCategoryAssignmentAddNotAuthorized, self.manager.set_object_category_assignment_dict, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], { new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], }) self.assertRaises( - AdminException, + ObjectCategoryAssignmentDelNotAuthorized, self.manager.del_object_category_assignment, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) self.assertRaises( - AdminException, + ObjectCategoryAssignmentAddNotAuthorized, self.manager.add_object_category_assignment_dict, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) def test_action_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - new_action = self.create_user() + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} new_actions = dict() new_actions[new_action["id"]] = new_action["name"] - actions = self.admin_manager.set_action_dict("admin", self.ref["id"], new_actions) + actions = self.admin_manager.set_action_dict(admin_user["id"], ref["id"], new_actions) new_action_category_uuid = uuid.uuid4().hex new_action_category_value = "role" action_categories = self.admin_manager.set_action_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_action_category_uuid: new_action_category_value } @@ -662,29 +878,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for action_category in action_categories["action_categories"]: action_category_scope = self.admin_manager.get_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIsInstance(action_category_scope["action_category_scope"], dict) new_action_category_scope = dict() new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = "admin" + new_action_category_scope[new_action_category_scope_uuid] = admin_user["id"] action_category_scope = self.admin_manager.set_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category, new_action_category_scope) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIn(new_action_category_scope[new_action_category_scope_uuid], action_category_scope["action_category_scope"][action_category].values()) @@ -692,62 +908,63 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_action_category_scope2_uuid = uuid.uuid4().hex new_action_category_scope2[new_action_category_scope2_uuid] = "dev" action_category_scope = self.admin_manager.set_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category, new_action_category_scope2) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], action_category_scope["action_category_scope"][action_category].values()) action_category_assignments = self.manager.get_action_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_action["id"] ) self.assertIsInstance(action_category_assignments, dict) self.assertIn("action_category_assignments", action_category_assignments) self.assertIn("id", action_category_assignments) self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) self.assertRaises( - AdminException, + ActionCategoryAssignmentAddNotAuthorized, self.manager.set_action_category_assignment_dict, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], { new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], }) self.assertRaises( - AdminException, + ActionCategoryAssignmentDelNotAuthorized, self.manager.del_action_category_assignment, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) self.assertRaises( - AdminException, + ActionCategoryAssignmentAddNotAuthorized, self.manager.add_action_category_assignment_dict, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) def test_sub_meta_rules(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - aggregation_algorithms = self.manager.get_aggregation_algorithms("admin", self.ref["id"]) + aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"]) self.assertIsInstance(aggregation_algorithms, dict) self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) - aggregation_algorithm = self.manager.get_aggregation_algorithm("admin", self.ref["id"]) + aggregation_algorithm = self.manager.get_aggregation_algorithm(admin_user["id"], ref["id"]) self.assertIsInstance(aggregation_algorithm, dict) self.assertIn("aggregation", aggregation_algorithm) self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) @@ -755,19 +972,19 @@ class TestIntraExtensionAuthzManager(tests.TestCase): _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) self.assertRaises( - AdminException, + MetaRuleAddNotAuthorized, self.manager.set_aggregation_algorithm, - "admin", self.ref["id"], _aggregation_algorithm[0]) + admin_user["id"], ref["id"], _aggregation_algorithm[0]) - sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) self.assertIsInstance(sub_meta_rules, dict) self.assertIn("sub_meta_rules", sub_meta_rules) - sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, self.ref["model"], "metarule.json"))) + sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, ref["model"], "metarule.json"))) metarule = dict() categories = { - "subject_categories": self.manager.get_subject_category_dict("admin", self.ref["id"]), - "object_categories": self.manager.get_object_category_dict("admin", self.ref["id"]), - "action_categories": self.manager.get_action_category_dict("admin", self.ref["id"]) + "subject_categories": self.manager.get_subject_category_dict(admin_user["id"], ref["id"]), + "object_categories": self.manager.get_object_category_dict(admin_user["id"], ref["id"]), + "action_categories": self.manager.get_action_category_dict(admin_user["id"], ref["id"]) } for relation in sub_meta_rules_conf["sub_meta_rules"]: metarule[relation] = dict() @@ -790,38 +1007,39 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} # Add a particular subject_category data = self.admin_manager.add_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_subject_category["name"]) new_subject_category["id"] = data["subject_category"]["uuid"] subject_categories = self.manager.get_subject_category_dict( - "admin", - self.ref["id"]) + admin_user["id"], + ref["id"]) self.assertIsInstance(subject_categories, dict) self.assertIn("subject_categories", subject_categories) self.assertIn("id", subject_categories) self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) metarule[relation]["subject_categories"].append(new_subject_category["id"]) - self.assertRaises( + self.MetaRuleAddNotAuthorized( AdminException, self.manager.set_sub_meta_rule, - "admin", self.ref["id"], metarule) + admin_user["id"], ref["id"], metarule) def test_sub_rules(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) self.assertIsInstance(sub_meta_rules, dict) self.assertIn("sub_meta_rules", sub_meta_rules) - sub_rules = self.manager.get_sub_rules("admin", self.ref["id"]) + sub_rules = self.manager.get_sub_rules(admin_user["id"], ref["id"]) self.assertIsInstance(sub_rules, dict) self.assertIn("rules", sub_rules) rules = dict() for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"]) + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) rules[relation] = list() for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( @@ -831,8 +1049,8 @@ class TestIntraExtensionAuthzManager(tests.TestCase): ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], cat_value ) a_scope = rule.pop(0) @@ -849,13 +1067,14 @@ class TestIntraExtensionAuthzManager(tests.TestCase): ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], cat_value ) sub_rule.append(scope[func_name][cat_value].keys()[0]) self.assertRaises( - AdminException, + RuleAddNotAuthorized, self.manager.set_sub_rule, - "admin", self.ref["id"], relation, sub_rule) + admin_user["id"], ref["id"], relation, sub_rule) + |