diff options
Diffstat (limited to 'keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py')
-rw-r--r-- | keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py | 713 |
1 files changed, 466 insertions, 247 deletions
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py index 6d852780..64a2d38f 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py @@ -19,7 +19,7 @@ from keystone.contrib.moon.core import LogManager, TenantManager CONF = cfg.CONF -USER_ADMIN = { +USER = { 'name': 'admin', 'domain_id': "default", 'password': 'admin' @@ -48,7 +48,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): return { "moonlog_api": LogManager(), "tenant_api": TenantManager(), - # "resource_api": resource.Manager(), + "resource_api": resource.Manager(), } def config_overrides(self): @@ -61,46 +61,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): group='moon', policy_directory=self.policy_directory) - -class TestIntraExtensionAuthzManager(tests.TestCase): - - def setUp(self): - self.useFixture(database.Database()) - super(TestIntraExtensionAuthzManager, self).setUp() - self.load_backends() - self.load_fixtures(default_fixtures) - self.manager = IntraExtensionAuthzManager() - self.admin_manager = IntraExtensionAdminManager() - - def __get_key_from_value(self, value, values_dict): - return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] - - def load_extra_backends(self): - return { - "moonlog_api": LogManager(), - "tenant_api": TenantManager(), - # "resource_api": resource.Manager(), - } - - def config_overrides(self): - super(TestIntraExtensionAuthzManager, self).config_overrides() - self.policy_directory = 'examples/moon/policies' - self.config_fixture.config( - group='moon', - intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') - self.config_fixture.config( - group='moon', - policy_directory=self.policy_directory) - - def create_intra_extension(self, policy_model="policy_rbac_authz"): - # Create the admin user because IntraExtension needs it - self.admin = self.identity_api.create_user(USER_ADMIN) - IE["policymodel"] = policy_model - self.ref = self.admin_manager.load_intra_extension(IE) - self.assertIsInstance(self.ref, dict) - self.create_tenant(self.ref["id"]) - - def create_tenant(self, authz_uuid): + def create_tenant(self): tenant = { "id": uuid.uuid4().hex, "name": "TestIntraExtensionAuthzManager", @@ -108,266 +69,517 @@ class TestIntraExtensionAuthzManager(tests.TestCase): "description": "", "domain_id": "default" } - project = self.resource_api.create_project(tenant["id"], tenant) - mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None) + return self.resource_api.create_project(tenant["id"], tenant) + + def create_mapping(self, tenant, authz_uuid=None, admin_uuid=None): + + mapping = self.tenant_api.set_tenant_dict(tenant["id"], tenant["name"], authz_uuid, admin_uuid) self.assertIsInstance(mapping, dict) self.assertIn("authz", mapping) self.assertEqual(mapping["authz"], authz_uuid) return mapping - def create_user(self, username="TestIntraExtensionAuthzManagerUser"): - user = { - "id": uuid.uuid4().hex, - "name": username, - "enabled": True, - "description": "", - "domain_id": "default" - } - _user = self.identity_api.create_user(user) - return _user + def create_user(self, username="admin"): + + _USER = dict(USER) + _USER["name"] = username + return self.identity_api.create_user(_USER) + + def create_intra_extension(self, policy_model="policy_rbac_authz"): + + IE["policymodel"] = policy_model + ref = self.admin_manager.load_intra_extension(IE) + self.assertIsInstance(self.ref, dict) + return ref + + def test_tenant_exceptions(self): + self.assertRaises( + TenantListEmpty, + self.manager.get_tenant_dict + ) + self.assertRaises( + TenantNotFound, + self.manager.get_tenant_name, + uuid.uuid4().hex + ) + self.assertRaises( + TenantNotFound, + self.manager.set_tenant_name, + uuid.uuid4().hex, uuid.uuid4().hex + ) + self.assertRaises( + TenantNotFound, + self.manager.get_extension_uuid, + uuid.uuid4().hex, "authz" + ) + self.assertRaises( + TenantNotFound, + self.manager.get_extension_uuid, + uuid.uuid4().hex, "admin" + ) - def delete_admin_intra_extension(self): + def test_intra_extension_exceptions(self): + + tenant = self.create_tenant() + self.assertRaises( + IntraExtensionNotFound, + self.manager.get_extension_uuid, + tenant["id"], "authz" + ) + self.assertRaises( + IntraExtensionNotFound, + self.manager.get_extension_uuid, + tenant["id"], "admin" + ) + # TODO + + def test_delete_admin_intra_extension(self): self.assertRaises( AdminException, self.manager.delete_intra_extension, self.ref["id"]) + def test_authz_exceptions(self): + self.assertRaises( + IntraExtensionNotFound, + self.manager.authz, + uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex + ) + + admin_user = self.create_user() + tenant = self.create_tenant() + ie_authz = self.create_intra_extension("policy_rbac_authz") + ie_admin = self.create_intra_extension("policy_rbac_admin") + mapping = self.create_mapping(tenant, ie_authz["id"], ie_admin["id"]) + + # Test when subject is unknown + self.assertRaises( + SubjectUnknown, + self.manager.authz, + ie_authz["id"], uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex + ) + + # Test when subject is known but not the object + demo_user = self.create_user("demo") + self.manager.add_subject_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"] + ) + + self.assertRaises( + ObjectUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], uuid.uuid4().hex, uuid.uuid4().hex + ) + + # Test when subject and object are known but not the action + _tmp = self.manager.add_object_dict( + admin_user['id'], + self.ref["id"], + "my_object" + ).items()[0] + my_object = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ActionUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], uuid.uuid4().hex + ) + + # Test when subject and object and action are known + _tmp = self.manager.add_action_dict( + admin_user['id'], + self.ref["id"], + "my_action" + ).items()[0] + my_action = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + SubjectCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add a subject scope and test ObjectCategoryAssignmentOutOfScope + _tmp = self.manager.add_subject_category_dict( + admin_user['id'], + self.ref["id"], + "my_subject_category" + ) + my_subject_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_subject_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_subject_category["id"], + "my_subject_scope", + ) + my_subject_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ObjectCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an object scope and test ActionCategoryAssignmentOutOfScope + _tmp = self.manager.add_object_category_dict( + admin_user['id'], + self.ref["id"], + "my_object_category" + ) + my_object_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_object_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_object_category["id"], + "my_object_scope", + ) + my_object_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ActionCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an action scope and test SubjectCategoryAssignmentUnknown + _tmp = self.manager.add_action_category_dict( + admin_user['id'], + self.ref["id"], + "my_action_category" + ) + my_action_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_action_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_action_category["id"], + "my_action_scope", + ) + my_action_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + SubjectCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add a subject assignment and test ObjectCategoryAssignmentUnknown + self.manager.add_subject_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_subject_category["id"], + my_subject_scope["id"] + ) + + self.assertRaises( + ObjectCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an object assignment and test ActionCategoryAssignmentUnknown + self.manager.add_object_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_object_category["id"], + my_object_scope["id"] + ) + + self.assertRaises( + ActionCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an action assignment and test RuleUnknown + self.manager.add_action_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_action_category["id"], + my_action_scope["id"] + ) + + self.assertRaises( + RuleUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add the correct rule and test that no exception is raised + my_meta_rule = { + "relation_super": { + "subject_categories": [my_subject_category["id"], ], + "action_categories": [my_object_category["id"], ], + "object_categories": [my_action_category["id"], ], + "relation": "relation_super" + } + } + self.manager.set_sub_meta_rule( + admin_user['id'], + self.ref["id"], + my_meta_rule + ) + self.manager.set_sub_rule( + admin_user['id'], + self.ref["id"], + "relation_super", + [my_subject_scope, my_object_scope, my_action_scope] + ) + + result = self.manager.authz(ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"]) + self.assertEqual(True, result) + def test_subjects(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - subjects = self.manager.get_subject_dict("admin", self.ref["id"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) self.assertIsInstance(subjects, dict) self.assertIn("subjects", subjects) self.assertIn("id", subjects) self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) self.assertIsInstance(subjects["subjects"], dict) - new_subject = self.create_user() + new_subject = self.create_user("my_user") new_subjects = dict() new_subjects[new_subject["id"]] = new_subject["name"] self.assertRaises( - AdminException, + SubjectAddNotAuthorized, self.manager.set_subject_dict, - "admin", self.ref["id"], new_subjects) + admin_user["id"], ref["id"], new_subjects) # Delete the new subject self.assertRaises( - AdminException, + SubjectDelNotAuthorized, self.manager.del_subject, - "admin", self.ref["id"], new_subject["id"]) + admin_user["id"], ref["id"], new_subject["id"]) # Add a particular subject self.assertRaises( - AdminException, + SubjectAddNotAuthorized, self.manager.add_subject_dict, - "admin", self.ref["id"], new_subject["id"]) + admin_user["id"], ref["id"], new_subject["id"]) def test_objects(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - objects = self.manager.get_object_dict("admin", self.ref["id"]) + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) self.assertIsInstance(objects["objects"], dict) - new_object = self.create_user() + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} new_objects = dict() new_objects[new_object["id"]] = new_object["name"] self.assertRaises( - AdminException, + ObjectAddNotAuthorized, self.manager.set_object_dict, - "admin", self.ref["id"], new_object["id"]) + admin_user["id"], ref["id"], new_object["id"]) # Delete the new object self.assertRaises( - AdminException, + ObjectDelNotAuthorized, self.manager.del_object, - "admin", self.ref["id"], new_object["id"]) + admin_user["id"], ref["id"], new_object["id"]) # Add a particular object self.assertRaises( - AdminException, + ObjectAddNotAuthorized, self.manager.add_object_dict, - "admin", self.ref["id"], new_object["name"]) + admin_user["id"], ref["id"], new_object["name"]) def test_actions(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - actions = self.manager.get_action_dict("admin", self.ref["id"]) + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) self.assertIsInstance(actions["actions"], dict) - new_action = self.create_user() + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} new_actions = dict() new_actions[new_action["id"]] = new_action["name"] self.assertRaises( - AdminException, + ActionAddNotAuthorized, self.manager.set_action_dict, - "admin", self.ref["id"], new_actions) + admin_user["id"], ref["id"], new_actions) # Delete the new action self.assertRaises( - AdminException, + ActionDelNotAuthorized, self.manager.del_action, - "admin", self.ref["id"], new_action["id"]) + admin_user["id"], ref["id"], new_action["id"]) # Add a particular action self.assertRaises( - AdminException, + ActionAddNotAuthorized, self.manager.add_action_dict, - "admin", self.ref["id"], new_action["id"]) + admin_user["id"], ref["id"], new_action["id"]) def test_subject_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - subject_categories = self.manager.get_subject_category_dict("admin", self.ref["id"]) + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(subject_categories, dict) self.assertIn("subject_categories", subject_categories) self.assertIn("id", subject_categories) self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) self.assertIsInstance(subject_categories["subject_categories"], dict) new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} new_subject_categories = dict() new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] self.assertRaises( - AdminException, + SubjectCategoryAddNotAuthorized, self.manager.set_subject_category_dict, - "admin", self.ref["id"], new_subject_categories) + admin_user["id"], ref["id"], new_subject_categories) # Delete the new subject_category self.assertRaises( - AdminException, + SubjectCategoryDelNotAuthorized, self.manager.del_subject_category, - "admin", self.ref["id"], new_subject_category["id"]) + admin_user["id"], ref["id"], new_subject_category["id"]) # Add a particular subject_category self.assertRaises( - AdminException, + SubjectCategoryAddNotAuthorized, self.manager.add_subject_category_dict, - "admin", self.ref["id"], new_subject_category["name"]) + admin_user["id"], ref["id"], new_subject_category["name"]) def test_object_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - object_categories = self.manager.get_object_category_dict("admin", self.ref["id"]) + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(object_categories, dict) self.assertIn("object_categories", object_categories) self.assertIn("id", object_categories) self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) self.assertIsInstance(object_categories["object_categories"], dict) new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} new_object_categories = dict() new_object_categories[new_object_category["id"]] = new_object_category["name"] self.assertRaises( - AdminException, + ObjectCategoryAddNotAuthorized, self.manager.set_object_category_dict, - "admin", self.ref["id"], new_object_categories) + admin_user["id"], ref["id"], new_object_categories) # Delete the new object_category self.assertRaises( - AdminException, + ObjectCategoryDelNotAuthorized, self.manager.del_object_category, - "admin", self.ref["id"], new_object_category["id"]) + admin_user["id"], ref["id"], new_object_category["id"]) # Add a particular object_category self.assertRaises( - AdminException, + ObjectCategoryAddNotAuthorized, self.manager.add_object_category_dict, - "admin", self.ref["id"], new_object_category["name"]) + admin_user["id"], ref["id"], new_object_category["name"]) def test_action_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - action_categories = self.manager.get_action_category_dict("admin", self.ref["id"]) + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(action_categories, dict) self.assertIn("action_categories", action_categories) self.assertIn("id", action_categories) self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) self.assertIsInstance(action_categories["action_categories"], dict) new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} new_action_categories = dict() new_action_categories[new_action_category["id"]] = new_action_category["name"] self.assertRaises( - AdminException, + ActionCategoryAddNotAuthorized, self.manager.set_action_category_dict, - "admin", self.ref["id"], new_action_categories) + admin_user["id"], ref["id"], new_action_categories) # Delete the new action_category self.assertRaises( - AdminException, + ActionCategoryDelNotAuthorized, self.manager.del_action_category, - "admin", self.ref["id"], new_action_category["id"]) + admin_user["id"], ref["id"], new_action_category["id"]) # Add a particular action_category self.assertRaises( - AdminException, + ActionCategoryAddNotAuthorized, self.manager.add_action_category_dict, - "admin", self.ref["id"], new_action_category["name"]) + admin_user["id"], ref["id"], new_action_category["name"]) def test_subject_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() subject_categories = self.admin_manager.set_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { - uuid.uuid4().hex: "admin", + uuid.uuid4().hex: admin_user["id"], uuid.uuid4().hex: "dev", } ) for subject_category in subject_categories["subject_categories"]: subject_category_scope = self.manager.get_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) new_subject_category_scope = dict() new_subject_category_scope_uuid = uuid.uuid4().hex new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" self.assertRaises( - AdminException, + SubjectCategoryScopeAddNotAuthorized, self.manager.set_subject_category_scope_dict, - "admin", self.ref["id"], subject_category, new_subject_category_scope) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope) # Delete the new subject_category_scope self.assertRaises( - AdminException, + SubjectCategoryScopeDelNotAuthorized, self.manager.del_subject_category_scope, - "admin", self.ref["id"], subject_category, new_subject_category_scope_uuid) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope_uuid) # Add a particular subject_category_scope self.assertRaises( - AdminException, + SubjectCategoryScopeAddNotAuthorized, self.manager.add_subject_category_scope_dict, - "admin", self.ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) def test_object_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() object_categories = self.admin_manager.set_object_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { uuid.uuid4().hex: "id", uuid.uuid4().hex: "domain", @@ -376,42 +588,43 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for object_category in object_categories["object_categories"]: object_category_scope = self.manager.get_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIsInstance(object_category_scope["object_category_scope"], dict) new_object_category_scope = dict() new_object_category_scope_uuid = uuid.uuid4().hex new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" self.assertRaises( - AdminException, + ObjectCategoryScopeAddNotAuthorized, self.manager.set_object_category_scope_dict, - "admin", self.ref["id"], object_category, new_object_category_scope) + admin_user["id"], ref["id"], object_category, new_object_category_scope) # Delete the new object_category_scope self.assertRaises( - AdminException, + ObjectCategoryScopeDelNotAuthorized, self.manager.del_object_category_scope, - "admin", self.ref["id"], object_category, new_object_category_scope_uuid) + admin_user["id"], ref["id"], object_category, new_object_category_scope_uuid) # Add a particular object_category_scope self.assertRaises( - AdminException, + ObjectCategoryScopeAddNotAuthorized, self.manager.add_object_category_scope_dict, - "admin", self.ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) + admin_user["id"], ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) def test_action_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() action_categories = self.admin_manager.set_action_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { uuid.uuid4().hex: "compute", uuid.uuid4().hex: "identity", @@ -420,49 +633,50 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for action_category in action_categories["action_categories"]: action_category_scope = self.manager.get_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIsInstance(action_category_scope["action_category_scope"], dict) new_action_category_scope = dict() new_action_category_scope_uuid = uuid.uuid4().hex new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" self.assertRaises( - AdminException, + ActionCategoryScopeAddNotAuthorized, self.manager.set_action_category_scope_dict, - "admin", self.ref["id"], action_category, new_action_category_scope) + admin_user["id"], ref["id"], action_category, new_action_category_scope) # Delete the new action_category_scope self.assertRaises( - AdminException, + ActionCategoryScopeDelNotAuthorized, self.manager.del_action_category_scope, - "admin", self.ref["id"], action_category, new_action_category_scope_uuid) + admin_user["id"], ref["id"], action_category, new_action_category_scope_uuid) # Add a particular action_category_scope self.assertRaises( - AdminException, + ActionCategoryScopeAddNotAuthorized, self.manager.add_action_category_scope_dict, - "admin", self.ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) + admin_user["id"], ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) def test_subject_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() new_subject = self.create_user() new_subjects = dict() new_subjects[new_subject["id"]] = new_subject["name"] - subjects = self.admin_manager.set_subject_dict("admin", self.ref["id"], new_subjects) + subjects = self.admin_manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) new_subject_category_uuid = uuid.uuid4().hex new_subject_category_value = "role" subject_categories = self.admin_manager.set_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_subject_category_uuid: new_subject_category_value } @@ -470,29 +684,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for subject_category in subject_categories["subject_categories"]: subject_category_scope = self.admin_manager.get_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) new_subject_category_scope = dict() new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = "admin" + new_subject_category_scope[new_subject_category_scope_uuid] = admin_user["id"] subject_category_scope = self.admin_manager.set_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category, new_subject_category_scope) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], subject_category_scope["subject_category_scope"][subject_category].values()) @@ -500,65 +714,66 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_subject_category_scope2_uuid = uuid.uuid4().hex new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" subject_category_scope = self.admin_manager.set_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category, new_subject_category_scope2) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], subject_category_scope["subject_category_scope"][subject_category].values()) subject_category_assignments = self.manager.get_subject_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_subject["id"] ) self.assertIsInstance(subject_category_assignments, dict) self.assertIn("subject_category_assignments", subject_category_assignments) self.assertIn("id", subject_category_assignments) self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) self.assertRaises( - AdminException, + SubjectCategoryAssignmentAddNotAuthorized, self.manager.set_subject_category_assignment_dict, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], { new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], }) self.assertRaises( - AdminException, + SubjectCategoryAssignmentDelNotAuthorized, self.manager.del_subject_category_assignment, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) self.assertRaises( - AdminException, + SubjectCategoryAssignmentAddNotAuthorized, self.manager.add_subject_category_assignment_dict, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) def test_object_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - new_object = self.create_user() + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} new_objects = dict() new_objects[new_object["id"]] = new_object["name"] - objects = self.admin_manager.set_object_dict("admin", self.ref["id"], new_objects) + objects = self.admin_manager.set_object_dict(admin_user["id"], ref["id"], new_objects) new_object_category_uuid = uuid.uuid4().hex new_object_category_value = "role" object_categories = self.admin_manager.set_object_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_object_category_uuid: new_object_category_value } @@ -566,29 +781,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for object_category in object_categories["object_categories"]: object_category_scope = self.admin_manager.get_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIsInstance(object_category_scope["object_category_scope"], dict) new_object_category_scope = dict() new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = "admin" + new_object_category_scope[new_object_category_scope_uuid] = admin_user["id"] object_category_scope = self.admin_manager.set_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category, new_object_category_scope) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIn(new_object_category_scope[new_object_category_scope_uuid], object_category_scope["object_category_scope"][object_category].values()) @@ -596,65 +811,66 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_object_category_scope2_uuid = uuid.uuid4().hex new_object_category_scope2[new_object_category_scope2_uuid] = "dev" object_category_scope = self.admin_manager.set_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category, new_object_category_scope2) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], object_category_scope["object_category_scope"][object_category].values()) object_category_assignments = self.manager.get_object_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_object["id"] ) self.assertIsInstance(object_category_assignments, dict) self.assertIn("object_category_assignments", object_category_assignments) self.assertIn("id", object_category_assignments) self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) self.assertRaises( - AdminException, + ObjectCategoryAssignmentAddNotAuthorized, self.manager.set_object_category_assignment_dict, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], { new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], }) self.assertRaises( - AdminException, + ObjectCategoryAssignmentDelNotAuthorized, self.manager.del_object_category_assignment, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) self.assertRaises( - AdminException, + ObjectCategoryAssignmentAddNotAuthorized, self.manager.add_object_category_assignment_dict, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) def test_action_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - new_action = self.create_user() + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} new_actions = dict() new_actions[new_action["id"]] = new_action["name"] - actions = self.admin_manager.set_action_dict("admin", self.ref["id"], new_actions) + actions = self.admin_manager.set_action_dict(admin_user["id"], ref["id"], new_actions) new_action_category_uuid = uuid.uuid4().hex new_action_category_value = "role" action_categories = self.admin_manager.set_action_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_action_category_uuid: new_action_category_value } @@ -662,29 +878,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for action_category in action_categories["action_categories"]: action_category_scope = self.admin_manager.get_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIsInstance(action_category_scope["action_category_scope"], dict) new_action_category_scope = dict() new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = "admin" + new_action_category_scope[new_action_category_scope_uuid] = admin_user["id"] action_category_scope = self.admin_manager.set_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category, new_action_category_scope) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIn(new_action_category_scope[new_action_category_scope_uuid], action_category_scope["action_category_scope"][action_category].values()) @@ -692,62 +908,63 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_action_category_scope2_uuid = uuid.uuid4().hex new_action_category_scope2[new_action_category_scope2_uuid] = "dev" action_category_scope = self.admin_manager.set_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category, new_action_category_scope2) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], action_category_scope["action_category_scope"][action_category].values()) action_category_assignments = self.manager.get_action_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_action["id"] ) self.assertIsInstance(action_category_assignments, dict) self.assertIn("action_category_assignments", action_category_assignments) self.assertIn("id", action_category_assignments) self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) self.assertRaises( - AdminException, + ActionCategoryAssignmentAddNotAuthorized, self.manager.set_action_category_assignment_dict, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], { new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], }) self.assertRaises( - AdminException, + ActionCategoryAssignmentDelNotAuthorized, self.manager.del_action_category_assignment, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) self.assertRaises( - AdminException, + ActionCategoryAssignmentAddNotAuthorized, self.manager.add_action_category_assignment_dict, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) def test_sub_meta_rules(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - aggregation_algorithms = self.manager.get_aggregation_algorithms("admin", self.ref["id"]) + aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"]) self.assertIsInstance(aggregation_algorithms, dict) self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) - aggregation_algorithm = self.manager.get_aggregation_algorithm("admin", self.ref["id"]) + aggregation_algorithm = self.manager.get_aggregation_algorithm(admin_user["id"], ref["id"]) self.assertIsInstance(aggregation_algorithm, dict) self.assertIn("aggregation", aggregation_algorithm) self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) @@ -755,19 +972,19 @@ class TestIntraExtensionAuthzManager(tests.TestCase): _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) self.assertRaises( - AdminException, + MetaRuleAddNotAuthorized, self.manager.set_aggregation_algorithm, - "admin", self.ref["id"], _aggregation_algorithm[0]) + admin_user["id"], ref["id"], _aggregation_algorithm[0]) - sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) self.assertIsInstance(sub_meta_rules, dict) self.assertIn("sub_meta_rules", sub_meta_rules) - sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, self.ref["model"], "metarule.json"))) + sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, ref["model"], "metarule.json"))) metarule = dict() categories = { - "subject_categories": self.manager.get_subject_category_dict("admin", self.ref["id"]), - "object_categories": self.manager.get_object_category_dict("admin", self.ref["id"]), - "action_categories": self.manager.get_action_category_dict("admin", self.ref["id"]) + "subject_categories": self.manager.get_subject_category_dict(admin_user["id"], ref["id"]), + "object_categories": self.manager.get_object_category_dict(admin_user["id"], ref["id"]), + "action_categories": self.manager.get_action_category_dict(admin_user["id"], ref["id"]) } for relation in sub_meta_rules_conf["sub_meta_rules"]: metarule[relation] = dict() @@ -790,38 +1007,39 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} # Add a particular subject_category data = self.admin_manager.add_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_subject_category["name"]) new_subject_category["id"] = data["subject_category"]["uuid"] subject_categories = self.manager.get_subject_category_dict( - "admin", - self.ref["id"]) + admin_user["id"], + ref["id"]) self.assertIsInstance(subject_categories, dict) self.assertIn("subject_categories", subject_categories) self.assertIn("id", subject_categories) self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) metarule[relation]["subject_categories"].append(new_subject_category["id"]) - self.assertRaises( + self.MetaRuleAddNotAuthorized( AdminException, self.manager.set_sub_meta_rule, - "admin", self.ref["id"], metarule) + admin_user["id"], ref["id"], metarule) def test_sub_rules(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) self.assertIsInstance(sub_meta_rules, dict) self.assertIn("sub_meta_rules", sub_meta_rules) - sub_rules = self.manager.get_sub_rules("admin", self.ref["id"]) + sub_rules = self.manager.get_sub_rules(admin_user["id"], ref["id"]) self.assertIsInstance(sub_rules, dict) self.assertIn("rules", sub_rules) rules = dict() for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"]) + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) rules[relation] = list() for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( @@ -831,8 +1049,8 @@ class TestIntraExtensionAuthzManager(tests.TestCase): ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], cat_value ) a_scope = rule.pop(0) @@ -849,13 +1067,14 @@ class TestIntraExtensionAuthzManager(tests.TestCase): ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], cat_value ) sub_rule.append(scope[func_name][cat_value].keys()[0]) self.assertRaises( - AdminException, + RuleAddNotAuthorized, self.manager.set_sub_rule, - "admin", self.ref["id"], relation, sub_rule) + admin_user["id"], ref["id"], relation, sub_rule) + |