From 51f7872f4902093cb0f0d445737c5892dd784191 Mon Sep 17 00:00:00 2001 From: asteroide Date: Fri, 31 Jul 2015 16:49:31 +0200 Subject: Play unit tests and modify code accordingly. (some tests are still not OK). Change-Id: I021081710411be12ffbbb0a3c8192626dbf9f8ce --- .../unit/test_unit_core_intra_extension_admin.py | 3984 ++++++++------------ 1 file changed, 1655 insertions(+), 2329 deletions(-) (limited to 'keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py') diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py index 97442228..68e4a79a 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py @@ -16,7 +16,8 @@ from keystone import resource from keystone.contrib.moon.exception import * from keystone.tests.unit import default_fixtures from keystone.contrib.moon.core import LogManager, TenantManager -from keystone.contrib.moon.core import DEFAULT_USER_ID +from keystone.contrib.moon.core import ADMIN_ID +from keystone.tests.moon.unit import * CONF = cfg.CONF @@ -32,14 +33,23 @@ IE = { "description": "a simple description." } +@dependency.requires('admin_api', 'authz_api', 'tenant_api', 'configuration_api', 'moonlog_api') class TestIntraExtensionAdminManagerOK(tests.TestCase): + # TODO: must be reviewed because some tests are on the authz interface def setUp(self): self.useFixture(database.Database()) super(TestIntraExtensionAdminManagerOK, self).setUp() self.load_backends() self.load_fixtures(default_fixtures) - self.manager = IntraExtensionAdminManager() + self.admin = create_user(self, username="admin") + self.demo = create_user(self, username="demo") + self.root_intra_extension = create_intra_extension(self, policy_model="policy_root") + # force re-initialization of the ADMIN_ID variable + from keystone.contrib.moon.core import ADMIN_ID + self.ADMIN_ID = ADMIN_ID + self.manager = self.authz_api + self.admin_manager = self.admin_api def __get_key_from_value(self, value, values_dict): return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] @@ -48,6 +58,8 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): return { "moonlog_api": LogManager(), "tenant_api": TenantManager(), + "admin_api": IntraExtensionAdminManager(), + "authz_api": IntraExtensionAuthzManager(), # "resource_api": resource.Manager(), } @@ -61,1207 +73,921 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): group='moon', policy_directory=self.policy_directory) - def create_intra_extension(self, policy_model="policy_admin"): - # Create the admin user because IntraExtension needs it - #self.admin = self.identity_api.create_user(USER) - IE["policymodel"] = policy_model - IE["name"] = uuid.uuid4().hex - self.ref = self.manager.load_intra_extension_dict(DEFAULT_USER_ID, intra_extension_dict=IE) - self.assertIsInstance(self.ref, dict) - self.create_tenant(self.ref["id"]) - - def create_tenant(self, authz_uuid): - tenant = { - "id": uuid.uuid4().hex, - "name": "TestAuthzIntraExtensionManager", - "enabled": True, - "description": "", - "domain_id": "default" - } - project = self.resource_api.create_project(tenant["id"], tenant) - mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None) - self.assertIsInstance(mapping, dict) - self.assertIn("authz", mapping) - self.assertEqual(mapping["authz"], authz_uuid) - return mapping - - def create_user(self, username="TestAdminIntraExtensionManagerUser"): - user = { - "id": uuid.uuid4().hex, - "name": username, - "enabled": True, - "description": "", - "domain_id": "default" - } - _user = self.identity_api.create_user(user) - return _user - def delete_admin_intra_extension(self): self.manager.del_intra_extension(self.ref["id"]) def test_subjects(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - subjects = self.manager.get_subjects_dict("admin", self.ref["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) - self.assertIsInstance(subjects["subjects"], dict) - - new_subject = self.create_user() - new_subjects = dict() - new_subjects[new_subject["id"]] = new_subject["name"] - subjects = self.manager.set_subject_dict("admin", self.ref["id"], new_subjects) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) - self.assertEqual(subjects["subjects"], new_subjects) - self.assertIn(new_subject["id"], subjects["subjects"]) - + for key, value in subjects.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + self.assertIn("keystone_name", value) + self.assertIn("keystone_id", value) + + create_user(self, "subject_test") + new_subject = {"name": "subject_test", "description": "subject_test"} + + subjects = self.admin_manager.add_subject_dict(admin_subject_id, authz_ie_dict["id"], new_subject) + _subjects = dict(subjects) + self.assertEqual(len(_subjects.keys()), 1) + new_subject["id"] = _subjects.keys()[0] + value = subjects[new_subject["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_subject["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_subject["description"]) + # Delete the new subject - self.manager.del_subject("admin", self.ref["id"], new_subject["id"]) - subjects = self.manager.get_subjects_dict("admin", self.ref["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) - self.assertNotIn(new_subject["id"], subjects["subjects"]) - - # Add a particular subject - subjects = self.manager.add_subject_dict("admin", self.ref["id"], new_subject["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subject", subjects) - self.assertIn("uuid", subjects["subject"]) - self.assertEqual(new_subject["name"], subjects["subject"]["name"]) - subjects = self.manager.get_subjects_dict("admin", self.ref["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) - self.assertIn(new_subject["id"], subjects["subjects"]) + self.admin_manager.del_subject(admin_subject_id, authz_ie_dict["id"], new_subject["id"]) + subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in subjects.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_subject["name"], value["name"]) + self.assertIn("description", value) def test_objects(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - objects = self.manager.get_objects_dict("admin", self.ref["id"]) - self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) - self.assertIsInstance(objects["objects"], dict) - - new_object = {"id": uuid.uuid4().hex, "name": "my_object"} - new_objects = dict() - new_objects[new_object["id"]] = new_object["name"] - objects = self.manager.set_object_dict("admin", self.ref["id"], new_objects) - self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) - self.assertEqual(objects["objects"], new_objects) - self.assertIn(new_object["id"], objects["objects"]) - - # Delete the new object - self.manager.del_object("admin", self.ref["id"], new_object["id"]) - objects = self.manager.get_objects_dict("admin", self.ref["id"]) - self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) - self.assertNotIn(new_object["id"], objects["objects"]) - - # Add a particular object - objects = self.manager.add_object_dict("admin", self.ref["id"], new_object["name"]) - self.assertIsInstance(objects, dict) - self.assertIn("object", objects) - self.assertIn("uuid", objects["object"]) - self.assertEqual(new_object["name"], objects["object"]["name"]) - new_object["id"] = objects["object"]["uuid"] - objects = self.manager.get_objects_dict("admin", self.ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + objects = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"]) + objects_id_list = [] self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) - self.assertIn(new_object["id"], objects["objects"]) + for key, value in objects.iteritems(): + objects_id_list.append(key) + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) def test_actions(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - actions = self.manager.get_actions_dict("admin", self.ref["id"]) - self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) - self.assertIsInstance(actions["actions"], dict) - - new_action = {"id": uuid.uuid4().hex, "name": "my_action"} - new_actions = dict() - new_actions[new_action["id"]] = new_action["name"] - actions = self.manager.set_action_dict("admin", self.ref["id"], new_actions) - self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) - self.assertEqual(actions["actions"], new_actions) - self.assertIn(new_action["id"], actions["actions"]) - - # Delete the new action - self.manager.del_action("admin", self.ref["id"], new_action["id"]) - actions = self.manager.get_actions_dict("admin", self.ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + actions = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"]) + actions_id_list = [] self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) - self.assertNotIn(new_action["id"], actions["actions"]) - - # Add a particular action - actions = self.manager.add_action_dict("admin", self.ref["id"], new_action["name"]) - self.assertIsInstance(actions, dict) - self.assertIn("action", actions) - self.assertIn("uuid", actions["action"]) - self.assertEqual(new_action["name"], actions["action"]["name"]) - new_action["id"] = actions["action"]["uuid"] - actions = self.manager.get_actions_dict("admin", self.ref["id"]) - self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) - self.assertIn(new_action["id"], actions["actions"]) + for key, value in actions.iteritems(): + actions_id_list.append(key) + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) def test_subject_categories(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - subject_categories = self.manager.get_subject_categories_dict("admin", self.ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) - self.assertIsInstance(subject_categories["subject_categories"], dict) - - new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} - new_subject_categories = dict() - new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] - subject_categories = self.manager.set_subject_category_dict("admin", self.ref["id"], new_subject_categories) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) - self.assertEqual(subject_categories["subject_categories"], new_subject_categories) - self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + for key, value in subject_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + new_subject_category = {"name": "subject_category_test", "description": "subject_category_test"} + + subject_categories = self.admin_manager.add_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category) + _subject_categories = dict(subject_categories) + self.assertEqual(len(_subject_categories.keys()), 1) + new_subject_category["id"] = _subject_categories.keys()[0] + value = subject_categories[new_subject_category["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_subject_category["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_subject_category["description"]) # Delete the new subject_category - self.manager.del_subject_category("admin", self.ref["id"], new_subject_category["id"]) - subject_categories = self.manager.get_subject_categories_dict("admin", self.ref["id"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) - self.assertNotIn(new_subject_category["id"], subject_categories["subject_categories"]) - - # Add a particular subject_category - subject_categories = self.manager.add_subject_category( - "admin", - self.ref["id"], - new_subject_category["name"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_category", subject_categories) - self.assertIn("uuid", subject_categories["subject_category"]) - self.assertEqual(new_subject_category["name"], subject_categories["subject_category"]["name"]) - new_subject_category["id"] = subject_categories["subject_category"]["uuid"] - subject_categories = self.manager.get_subject_categories_dict( - "admin", - self.ref["id"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) - self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + self.admin_manager.del_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category["id"]) + subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in subject_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_subject_category["name"], value["name"]) + self.assertIn("description", value) def test_object_categories(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - object_categories = self.manager.get_object_category_dict("admin", self.ref["id"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) - self.assertIsInstance(object_categories["object_categories"], dict) - - new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} - new_object_categories = dict() - new_object_categories[new_object_category["id"]] = new_object_category["name"] - object_categories = self.manager.set_object_category_dict("admin", self.ref["id"], new_object_categories) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + object_categories = self.manager.get_object_category_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) - self.assertEqual(object_categories["object_categories"], new_object_categories) - self.assertIn(new_object_category["id"], object_categories["object_categories"]) + for key, value in object_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + new_object_category = {"name": "object_category_test", "description": "object_category_test"} + + object_categories = self.admin_manager.add_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category) + _object_categories = dict(object_categories) + self.assertEqual(len(_object_categories.keys()), 1) + new_object_category["id"] = _object_categories.keys()[0] + value = object_categories[new_object_category["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_object_category["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_object_category["description"]) # Delete the new object_category - self.manager.del_object_category("admin", self.ref["id"], new_object_category["id"]) - object_categories = self.manager.get_object_category_dict("admin", self.ref["id"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) - self.assertNotIn(new_object_category["id"], object_categories["object_categories"]) - - # Add a particular object_category - object_categories = self.manager.add_object_category( - "admin", - self.ref["id"], - new_object_category["name"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_category", object_categories) - self.assertIn("uuid", object_categories["object_category"]) - self.assertEqual(new_object_category["name"], object_categories["object_category"]["name"]) - new_object_category["id"] = object_categories["object_category"]["uuid"] - object_categories = self.manager.get_object_category_dict( - "admin", - self.ref["id"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) - self.assertIn(new_object_category["id"], object_categories["object_categories"]) - def test_action_categories(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() + self.admin_manager.del_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category["id"]) + object_categories = self.manager.get_object_category_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in object_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_object_category["name"], value["name"]) + self.assertIn("description", value) - action_categories = self.manager.get_action_category_dict("admin", self.ref["id"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) - self.assertIsInstance(action_categories["action_categories"], dict) - - new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} - new_action_categories = dict() - new_action_categories[new_action_category["id"]] = new_action_category["name"] - action_categories = self.manager.set_action_category_dict("admin", self.ref["id"], new_action_categories) + def test_action_categories(self): + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + action_categories = self.manager.get_action_category_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) - self.assertEqual(action_categories["action_categories"], new_action_categories) - self.assertIn(new_action_category["id"], action_categories["action_categories"]) + for key, value in action_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + new_action_category = {"name": "action_category_test", "description": "action_category_test"} + + action_categories = self.admin_manager.add_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category) + _action_categories = dict(action_categories) + self.assertEqual(len(_action_categories.keys()), 1) + new_action_category["id"] = _action_categories.keys()[0] + value = action_categories[new_action_category["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_action_category["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_action_category["description"]) # Delete the new action_category - self.manager.del_action_category("admin", self.ref["id"], new_action_category["id"]) - action_categories = self.manager.get_action_category_dict("admin", self.ref["id"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) - self.assertNotIn(new_action_category["id"], action_categories["action_categories"]) - - # Add a particular action_category - action_categories = self.manager.add_action_category( - "admin", - self.ref["id"], - new_action_category["name"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_category", action_categories) - self.assertIn("uuid", action_categories["action_category"]) - self.assertEqual(new_action_category["name"], action_categories["action_category"]["name"]) - new_action_category["id"] = action_categories["action_category"]["uuid"] - action_categories = self.manager.get_action_category_dict( - "admin", - self.ref["id"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) - self.assertIn(new_action_category["id"], action_categories["action_categories"]) - def test_subject_category_scope(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() + self.admin_manager.del_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category["id"]) + action_categories = self.manager.get_action_category_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in action_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_action_category["name"], value["name"]) + self.assertIn("description", value) - subject_categories = self.manager.set_subject_category_dict( - "admin", - self.ref["id"], + def test_subject_category_scope(self): + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + subject_categories = self.admin_manager.add_subject_category( + admin_subject_id, + authz_ie_dict["id"], { - uuid.uuid4().hex: "admin", - uuid.uuid4().hex: "dev", + "name": "country", + "description": "country", } ) - for subject_category in subject_categories["subject_categories"]: + for subject_category_id in subject_categories: + subject_category_scope = self.manager.get_subject_scopes_dict( - "admin", - self.ref["id"], - subject_category) + admin_subject_id, + authz_ie_dict["id"], + subject_category_id) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) - - new_subject_category_scope = dict() - new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" - subject_category_scope = self.manager.set_subject_scope_dict( - "admin", - self.ref["id"], - subject_category, + self.assertEqual({}, subject_category_scope) + + new_subject_category_scope = { + "name": "france", + "description": "france", + } + + subject_category_scope = self.admin_manager.add_subject_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, new_subject_category_scope) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], - subject_category_scope["subject_category_scope"][subject_category].values()) + self.assertEqual(len(subject_category_scope.keys()), 1) + subject_category_scope_id = subject_category_scope.keys()[0] + subject_category_scope_value = subject_category_scope[subject_category_scope_id] + self.assertIn("name", subject_category_scope_value) + self.assertEqual(new_subject_category_scope["name"], subject_category_scope_value["name"]) + self.assertIn("description", subject_category_scope_value) + self.assertEqual(new_subject_category_scope["description"], subject_category_scope_value["description"]) # Delete the new subject_category_scope - self.manager.del_subject_scope( - "admin", - self.ref["id"], - subject_category, - new_subject_category_scope_uuid) - subject_category_scope = self.manager.get_subject_scopes_dict( - "admin", - self.ref["id"], - subject_category) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) - - # Add a particular subject_category_scope - subject_category_scope = self.manager.add_subject_scope_dict( - "admin", - self.ref["id"], - subject_category, - new_subject_category_scope[new_subject_category_scope_uuid]) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("uuid", subject_category_scope["subject_category_scope"]) - self.assertEqual(new_subject_category_scope[new_subject_category_scope_uuid], - subject_category_scope["subject_category_scope"]["name"]) - subject_category_scope = self.manager.get_subject_scopes_dict( - "admin", - self.ref["id"], - subject_category) + + self.admin_manager.del_subject_scope( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, + subject_category_scope_id) + subject_category_scope = self.admin_manager.get_subject_scopes_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) + self.assertNotIn(subject_category_scope_id, subject_category_scope.keys()) def test_object_category_scope(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - object_categories = self.manager.set_object_category_dict( - "admin", - self.ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + object_categories = self.admin_manager.add_object_category( + admin_subject_id, + authz_ie_dict["id"], { - uuid.uuid4().hex: "id", - uuid.uuid4().hex: "domain", + "name": "country", + "description": "country", } ) - for object_category in object_categories["object_categories"]: + for object_category_id in object_categories: + object_category_scope = self.manager.get_object_scopes_dict( - "admin", - self.ref["id"], - object_category) + admin_subject_id, + authz_ie_dict["id"], + object_category_id) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIsInstance(object_category_scope["object_category_scope"], dict) - - new_object_category_scope = dict() - new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" - object_category_scope = self.manager.set_object_scope_dict( - "admin", - self.ref["id"], - object_category, + self.assertEqual({}, object_category_scope) + + new_object_category_scope = { + "name": "france", + "description": "france", + } + + object_category_scope = self.admin_manager.add_object_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, new_object_category_scope) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIn(new_object_category_scope[new_object_category_scope_uuid], - object_category_scope["object_category_scope"][object_category].values()) + self.assertEqual(len(object_category_scope.keys()), 1) + object_category_scope_id = object_category_scope.keys()[0] + object_category_scope_value = object_category_scope[object_category_scope_id] + self.assertIn("name", object_category_scope_value) + self.assertEqual(new_object_category_scope["name"], object_category_scope_value["name"]) + self.assertIn("description", object_category_scope_value) + self.assertEqual(new_object_category_scope["description"], object_category_scope_value["description"]) # Delete the new object_category_scope - self.manager.del_object_scope( - "admin", - self.ref["id"], - object_category, - new_object_category_scope_uuid) - object_category_scope = self.manager.get_object_scopes_dict( - "admin", - self.ref["id"], - object_category) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) - - # Add a particular object_category_scope - object_category_scope = self.manager.add_object_scope_dict( - "admin", - self.ref["id"], - object_category, - new_object_category_scope[new_object_category_scope_uuid]) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("uuid", object_category_scope["object_category_scope"]) - self.assertEqual(new_object_category_scope[new_object_category_scope_uuid], - object_category_scope["object_category_scope"]["name"]) - object_category_scope = self.manager.get_object_scopes_dict( - "admin", - self.ref["id"], - object_category) + + self.admin_manager.del_object_scope( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, + object_category_scope_id) + object_category_scope = self.admin_manager.get_object_scopes_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) + self.assertNotIn(object_category_scope_id, object_category_scope.keys()) def test_action_category_scope(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - action_categories = self.manager.set_action_category_dict( - "admin", - self.ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + action_categories = self.admin_manager.add_action_category( + admin_subject_id, + authz_ie_dict["id"], { - uuid.uuid4().hex: "compute", - uuid.uuid4().hex: "identity", + "name": "swift", + "description": "swift actions", } ) - for action_category in action_categories["action_categories"]: + for action_category_id in action_categories: + action_category_scope = self.manager.get_action_scopes_dict( - "admin", - self.ref["id"], - action_category) + admin_subject_id, + authz_ie_dict["id"], + action_category_id) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIsInstance(action_category_scope["action_category_scope"], dict) - - new_action_category_scope = dict() - new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" - action_category_scope = self.manager.set_action_scope_dict( - "admin", - self.ref["id"], - action_category, + self.assertEqual({}, action_category_scope) + + new_action_category_scope = { + "name": "get", + "description": "get swift files", + } + + action_category_scope = self.admin_manager.add_action_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, new_action_category_scope) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIn(new_action_category_scope[new_action_category_scope_uuid], - action_category_scope["action_category_scope"][action_category].values()) + self.assertEqual(len(action_category_scope.keys()), 1) + action_category_scope_id = action_category_scope.keys()[0] + action_category_scope_value = action_category_scope[action_category_scope_id] + self.assertIn("name", action_category_scope_value) + self.assertEqual(new_action_category_scope["name"], action_category_scope_value["name"]) + self.assertIn("description", action_category_scope_value) + self.assertEqual(new_action_category_scope["description"], action_category_scope_value["description"]) # Delete the new action_category_scope - self.manager.del_action_scope( - "admin", - self.ref["id"], - action_category, - new_action_category_scope_uuid) - action_category_scope = self.manager.get_action_scopes_dict( - "admin", - self.ref["id"], - action_category) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) - - # Add a particular action_category_scope - action_category_scope = self.manager.add_action_scope_dict( - "admin", - self.ref["id"], - action_category, - new_action_category_scope[new_action_category_scope_uuid]) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("uuid", action_category_scope["action_category_scope"]) - self.assertEqual(new_action_category_scope[new_action_category_scope_uuid], - action_category_scope["action_category_scope"]["name"]) - action_category_scope = self.manager.get_action_scopes_dict( - "admin", - self.ref["id"], - action_category) + + self.admin_manager.del_action_scope( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, + action_category_scope_id) + action_category_scope = self.admin_manager.get_action_scopes_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) + self.assertNotIn(action_category_scope_id, action_category_scope.keys()) def test_subject_category_assignment(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - new_subject = self.create_user() - new_subjects = dict() - new_subjects[new_subject["id"]] = new_subject["name"] - subjects = self.manager.set_subject_dict("admin", self.ref["id"], new_subjects) - - new_subject_category_uuid = uuid.uuid4().hex - new_subject_category_value = "role" - subject_categories = self.manager.set_subject_category_dict( - "admin", - self.ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + admin_authz_subject_id, admin_authz_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + demo_authz_subject_id, demo_authz_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next() + + subjects_dict = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"]) + + subject_categories = self.admin_manager.add_subject_category( + admin_subject_id, + authz_ie_dict["id"], { - new_subject_category_uuid: new_subject_category_value + "name": "country", + "description": "country", } ) - for subject_category in subject_categories["subject_categories"]: + for subject_category_id in subject_categories: subject_category_scope = self.manager.get_subject_scopes_dict( - "admin", - self.ref["id"], - subject_category) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) - - new_subject_category_scope = dict() - new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = "admin" - subject_category_scope = self.manager.set_subject_scope_dict( - "admin", - self.ref["id"], - subject_category, - new_subject_category_scope) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], - subject_category_scope["subject_category_scope"][subject_category].values()) - - new_subject_category_scope2 = dict() - new_subject_category_scope2_uuid = uuid.uuid4().hex - new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" - subject_category_scope = self.manager.set_subject_scope_dict( - "admin", - self.ref["id"], - subject_category, - new_subject_category_scope2) + admin_subject_id, + authz_ie_dict["id"], + subject_category_id) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], - subject_category_scope["subject_category_scope"][subject_category].values()) - - subject_category_assignments = self.manager.get_subject_assignment_dict( - "admin", - self.ref["id"], - new_subject["id"] + self.assertEqual({}, subject_category_scope) + + new_subject_category_scope_1 = { + "name": "france", + "description": "france", + } + + subject_category_scope_1 = self.admin_manager.add_subject_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, + new_subject_category_scope_1) + subject_category_scope_1_id = subject_category_scope_1.keys()[0] + + new_subject_category_scope_2 = { + "name": "china", + "description": "china", + } + + subject_category_scope_2 = self.admin_manager.add_subject_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, + new_subject_category_scope_2) + subject_category_scope_2_id = subject_category_scope_2.keys()[0] + + subject_category_assignments = self.manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, + subject_category_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) - - subject_category_assignments = self.manager.set_subject_assignment_dict( - "admin", - self.ref["id"], - new_subject["id"], - { - new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], - } + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual([], subject_category_assignments) + + subject_category_assignments = self.manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + demo_authz_subject_id, + subject_category_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) - subject_category_assignments = self.manager.get_subject_assignment_dict( - "admin", - self.ref["id"], - new_subject["id"] + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual([], subject_category_assignments) + + subject_category_assignments = self.admin_manager.add_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_1_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) - - self.manager.del_subject_assignment( - "admin", - self.ref["id"], - new_subject["id"], - new_subject_category_uuid, - new_subject_category_scope_uuid + self.assertIsInstance(subject_category_assignments, list) + + self.assertEqual(len(subject_category_assignments), 1) + + subject_category_assignments = self.admin_manager.add_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_2_id ) - subject_category_assignments = self.manager.get_subject_assignment_dict( - "admin", - self.ref["id"], - new_subject["id"] + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 2) + + subject_category_assignments = self.admin_manager.add_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + demo_authz_subject_id, subject_category_id, subject_category_scope_2_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope2_uuid, ]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) - - data = self.manager.add_subject_assignment_list( - "admin", - self.ref["id"], - new_subject["id"], - new_subject_category_uuid, - new_subject_category_scope_uuid + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 1) + + subject_category_assignments = self.admin_manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id ) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 2) - subject_category_assignments = self.manager.get_subject_assignment_dict( - "admin", - self.ref["id"], - new_subject["id"] + self.admin_manager.del_subject_assignment( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_2_id + ) + subject_category_assignments = self.admin_manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope2_uuid, new_subject_category_scope_uuid]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 1) def test_object_category_assignment(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - new_object = {"id": uuid.uuid4().hex, "name": "my_object"} - new_objects = dict() - new_objects[new_object["id"]] = new_object["name"] - objects = self.manager.set_object_dict("admin", self.ref["id"], new_objects) - - new_object_category_uuid = uuid.uuid4().hex - new_object_category_value = "role" - object_categories = self.manager.set_object_category_dict( - "admin", - self.ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + objects_dict = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"]) + + object_vm1_id = None + object_vm2_id = None + for _object_id in objects_dict: + if objects_dict[_object_id]['name'] == 'vm1': + object_vm1_id = _object_id + if objects_dict[_object_id]['name'] == 'vm2': + object_vm2_id = _object_id + if not object_vm1_id or not object_vm2_id: + raise Exception("Cannot run tests, database is corrupted ? (need upload and list in objects)") + + object_categories = self.admin_manager.add_object_category( + admin_subject_id, + authz_ie_dict["id"], { - new_object_category_uuid: new_object_category_value + "name": "location", + "description": "location", } ) - for object_category in object_categories["object_categories"]: + for object_category_id in object_categories: object_category_scope = self.manager.get_object_scopes_dict( - "admin", - self.ref["id"], - object_category) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIsInstance(object_category_scope["object_category_scope"], dict) - - new_object_category_scope = dict() - new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = "admin" - object_category_scope = self.manager.set_object_scope_dict( - "admin", - self.ref["id"], - object_category, - new_object_category_scope) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIn(new_object_category_scope[new_object_category_scope_uuid], - object_category_scope["object_category_scope"][object_category].values()) - - new_object_category_scope2 = dict() - new_object_category_scope2_uuid = uuid.uuid4().hex - new_object_category_scope2[new_object_category_scope2_uuid] = "dev" - object_category_scope = self.manager.set_object_scope_dict( - "admin", - self.ref["id"], - object_category, - new_object_category_scope2) + admin_subject_id, + authz_ie_dict["id"], + object_category_id) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], - object_category_scope["object_category_scope"][object_category].values()) + self.assertEqual({}, object_category_scope) + + new_object_category_scope_1 = { + "name": "france", + "description": "france", + } + + object_category_scope_1 = self.admin_manager.add_object_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, + new_object_category_scope_1) + object_category_scope_1_id = object_category_scope_1.keys()[0] + + new_object_category_scope_2 = { + "name": "china", + "description": "china", + } + + object_category_scope_2 = self.admin_manager.add_object_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, + new_object_category_scope_2) + object_category_scope_2_id = object_category_scope_2.keys()[0] object_category_assignments = self.manager.get_object_assignment_list( - "admin", - self.ref["id"], - new_object["id"] - ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) - - object_category_assignments = self.manager.set_object_category_assignment_dict( - "admin", - self.ref["id"], - new_object["id"], - { - new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], - } + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, + object_category_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual([], object_category_assignments) + object_category_assignments = self.manager.get_object_assignment_list( - "admin", - self.ref["id"], - new_object["id"] + admin_subject_id, + authz_ie_dict["id"], + object_vm2_id, + object_category_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) - - self.manager.del_object_assignment( - "admin", - self.ref["id"], - new_object["id"], - new_object_category_uuid, - new_object_category_scope_uuid + self.assertIsInstance(object_category_assignments, list) + self.assertEqual([], object_category_assignments) + + object_category_assignments = self.admin_manager.add_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_1_id ) - object_category_assignments = self.manager.get_object_assignment_list( - "admin", - self.ref["id"], - new_object["id"] + self.assertIsInstance(object_category_assignments, list) + + self.assertEqual(len(object_category_assignments), 1) + + object_category_assignments = self.admin_manager.add_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_2_id + ) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 2) + + object_category_assignments = self.admin_manager.add_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm2_id, object_category_id, object_category_scope_2_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope2_uuid, ]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) - - self.manager.add_object_assignment_list( - "admin", - self.ref["id"], - new_object["id"], - new_object_category_uuid, - new_object_category_scope_uuid + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 1) + + object_category_assignments = self.admin_manager.get_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id ) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 2) - object_category_assignments = self.manager.get_object_assignment_list( - "admin", - self.ref["id"], - new_object["id"] + self.admin_manager.del_object_assignment( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_2_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope2_uuid, new_object_category_scope_uuid]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) + object_category_assignments = self.admin_manager.get_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id + ) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 1) def test_action_category_assignment(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - new_action = {"id": uuid.uuid4().hex, "name": "my_action"} - new_actions = dict() - new_actions[new_action["id"]] = new_action["name"] - actions = self.manager.set_action_dict("admin", self.ref["id"], new_actions) - - new_action_category_uuid = uuid.uuid4().hex - new_action_category_value = "role" - action_categories = self.manager.set_action_category_dict( - "admin", - self.ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + actions_dict = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"]) + + action_upload_id = None + action_list_id = None + for _action_id in actions_dict: + if actions_dict[_action_id]['name'] == 'upload': + action_upload_id = _action_id + if actions_dict[_action_id]['name'] == 'list': + action_list_id = _action_id + if not action_upload_id or not action_list_id: + raise Exception("Cannot run tests, database is corrupted ? (need upload and list in actions)") + + action_categories = self.admin_manager.add_action_category( + admin_subject_id, + authz_ie_dict["id"], { - new_action_category_uuid: new_action_category_value + "name": "swift", + "description": "swift actions", } ) - for action_category in action_categories["action_categories"]: + for action_category_id in action_categories: action_category_scope = self.manager.get_action_scopes_dict( - "admin", - self.ref["id"], - action_category) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIsInstance(action_category_scope["action_category_scope"], dict) - - new_action_category_scope = dict() - new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = "admin" - action_category_scope = self.manager.set_action_scope_dict( - "admin", - self.ref["id"], - action_category, - new_action_category_scope) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIn(new_action_category_scope[new_action_category_scope_uuid], - action_category_scope["action_category_scope"][action_category].values()) - - new_action_category_scope2 = dict() - new_action_category_scope2_uuid = uuid.uuid4().hex - new_action_category_scope2[new_action_category_scope2_uuid] = "dev" - action_category_scope = self.manager.set_action_scope_dict( - "admin", - self.ref["id"], - action_category, - new_action_category_scope2) + admin_subject_id, + authz_ie_dict["id"], + action_category_id) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], - action_category_scope["action_category_scope"][action_category].values()) + self.assertEqual({}, action_category_scope) + + new_action_category_scope_1 = { + "name": "swift_admin", + "description": "action require admin rights", + } + + action_category_scope_1 = self.admin_manager.add_action_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, + new_action_category_scope_1) + action_category_scope_1_id = action_category_scope_1.keys()[0] + + new_action_category_scope_2 = { + "name": "swift_anonymous", + "description": "action require no right", + } + + action_category_scope_2 = self.admin_manager.add_action_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, + new_action_category_scope_2) + action_category_scope_2_id = action_category_scope_2.keys()[0] action_category_assignments = self.manager.get_action_assignment_list( - "admin", - self.ref["id"], - new_action["id"] + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, + action_category_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) - - action_category_assignments = self.manager.set_action_assignment_dict( - "admin", - self.ref["id"], - new_action["id"], - { - new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], - } - ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual([], action_category_assignments) + action_category_assignments = self.manager.get_action_assignment_list( - "admin", - self.ref["id"], - new_action["id"] + admin_subject_id, + authz_ie_dict["id"], + action_list_id, + action_category_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) - - self.manager.del_action_assignment( - "admin", - self.ref["id"], - new_action["id"], - new_action_category_uuid, - new_action_category_scope_uuid + self.assertIsInstance(action_category_assignments, list) + self.assertEqual([], action_category_assignments) + + action_category_assignments = self.admin_manager.add_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_1_id ) - action_category_assignments = self.manager.get_action_assignment_list( - "admin", - self.ref["id"], - new_action["id"] + self.assertIsInstance(action_category_assignments, list) + + self.assertEqual(len(action_category_assignments), 1) + + action_category_assignments = self.admin_manager.add_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_2_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope2_uuid, ]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) - - self.manager.add_action_assignment_list( - "admin", - self.ref["id"], - new_action["id"], - new_action_category_uuid, - new_action_category_scope_uuid + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 2) + + action_category_assignments = self.admin_manager.add_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_list_id, action_category_id, action_category_scope_2_id ) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 1) - action_category_assignments = self.manager.get_action_assignment_list( - "admin", - self.ref["id"], - new_action["id"] + action_category_assignments = self.admin_manager.get_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id + ) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 2) + + self.admin_manager.del_action_assignment( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_2_id + ) + action_category_assignments = self.admin_manager.get_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope2_uuid, new_action_category_scope_uuid]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 1) def test_sub_meta_rules(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - aggregation_algorithms = self.manager.get_aggregation_algorithms("admin", self.ref["id"]) - self.assertIsInstance(aggregation_algorithms, dict) - self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) - self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) - self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) - - aggregation_algorithm = self.manager.get_aggregation_algorithm_dict("admin", self.ref["id"]) - self.assertIsInstance(aggregation_algorithm, dict) - self.assertIn("aggregation", aggregation_algorithm) - self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) - - _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) - _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) - aggregation_algorithm = self.manager.set_aggregation_algorithm_dict("admin", self.ref["id"], _aggregation_algorithm[0]) - self.assertIsInstance(aggregation_algorithm, dict) - self.assertIn("aggregation", aggregation_algorithm) - self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) - - sub_meta_rules = self.manager.get_sub_meta_rules_dict("admin", self.ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + aggregation_algorithms = self.admin_manager.get_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in aggregation_algorithms.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + # TODO: need more tests on aggregation_algorithms (set and del) + + sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(sub_meta_rules, dict) - self.assertIn("sub_meta_rules", sub_meta_rules) - sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, self.ref["model"], "metarule.json"))) - metarule = dict() categories = { - "subject_categories": self.manager.get_subject_categories_dict("admin", self.ref["id"]), - "object_categories": self.manager.get_object_category_dict("admin", self.ref["id"]), - "action_categories": self.manager.get_action_category_dict("admin", self.ref["id"]) + "subject_categories": self.admin_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"]), + "object_categories": self.admin_manager.get_object_category_dict(admin_subject_id, authz_ie_dict["id"]), + "action_categories": self.admin_manager.get_action_category_dict(admin_subject_id, authz_ie_dict["id"]) } - for relation in sub_meta_rules_conf["sub_meta_rules"]: - metarule[relation] = dict() - for item in ("subject_categories", "object_categories", "action_categories"): - metarule[relation][item] = list() - for element in sub_meta_rules_conf["sub_meta_rules"][relation][item]: - metarule[relation][item].append(self.__get_key_from_value( - element, - categories[item][item] - )) - - for relation in sub_meta_rules["sub_meta_rules"]: - self.assertIn(relation, metarule) - for item in ("subject_categories", "object_categories", "action_categories"): - self.assertEqual( - sub_meta_rules["sub_meta_rules"][relation][item], - metarule[relation][item] - ) - - new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} - # Add a particular subject_category - data = self.manager.add_subject_category( - "admin", - self.ref["id"], - new_subject_category["name"]) - new_subject_category["id"] = data["subject_category"]["uuid"] - subject_categories = self.manager.get_subject_categories_dict( - "admin", - self.ref["id"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) - self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) - metarule[relation]["subject_categories"].append(new_subject_category["id"]) - _sub_meta_rules = self.manager.del_sub_meta_rule("admin", self.ref["id"], metarule) - self.assertIn(relation, metarule) - for item in ("subject_categories", "object_categories", "action_categories"): - self.assertEqual( - _sub_meta_rules["sub_meta_rules"][relation][item], - metarule[relation][item] - ) + for key, value in sub_meta_rules.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("action_categories", value) + self.assertIn("object_categories", value) + self.assertIn("subject_categories", value) + self.assertIn("algorithm", value) + self.assertIn("name", value) + for action_category_id in value["action_categories"]: + self.assertIn(action_category_id, categories["action_categories"]) + for object_category_id in value["object_categories"]: + self.assertIn(object_category_id, categories["object_categories"]) + for subject_category_id in value["subject_categories"]: + self.assertIn(subject_category_id, categories["subject_categories"]) + # TODO: need more tests (set and del) def test_sub_rules(self): - self.create_user("demo") - self.create_user("admin") - self.create_intra_extension() - - sub_meta_rules = self.manager.get_sub_meta_rules_dict("admin", self.ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(sub_meta_rules, dict) - self.assertIn("sub_meta_rules", sub_meta_rules) - - sub_rules = self.manager.get_sub_rules("admin", self.ref["id"]) - self.assertIsInstance(sub_rules, dict) - self.assertIn("rules", sub_rules) - rules = dict() - for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"]) - rules[relation] = list() - for rule in sub_rules["rules"][relation]: + + for relation_id in sub_meta_rules: + rules = self.admin_manager.get_rules_dict(admin_subject_id, authz_ie_dict["id"], relation_id) + rule_length = len(sub_meta_rules[relation_id]["subject_categories"]) + \ + len(sub_meta_rules[relation_id]["object_categories"]) + \ + len(sub_meta_rules[relation_id]["action_categories"]) + 1 + for rule_id in rules: + self.assertEqual(rule_length, len(rules[rule_id])) + rule = list(rules[rule_id]) for cat, cat_func, func_name in ( - ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), + ("subject_categories", self.admin_manager.get_subject_scopes_dict, "subject_scope"), + ("action_categories", self.admin_manager.get_action_scopes_dict, "action_scope"), + ("object_categories", self.admin_manager.get_object_scopes_dict, "object_scope"), ): - for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + for cat_value in sub_meta_rules[relation_id][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_subject_id, + authz_ie_dict["id"], cat_value ) a_scope = rule.pop(0) if type(a_scope) is not bool: - self.assertIn(a_scope, scope[func_name][cat_value]) - - # add a new subrule - - relation = sub_rules["rules"].keys()[0] - sub_rule = [] - for cat, cat_func, func_name in ( - ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), - ): - for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: - scope = cat_func( - "admin", - self.ref["id"], - cat_value - ) - sub_rule.append(scope[func_name][cat_value].keys()[0]) - - sub_rule.append(True) - sub_rules = self.manager.set_sub_rule("admin", self.ref["id"], relation, sub_rule) - self.assertIsInstance(sub_rules, dict) - self.assertIn("rules", sub_rules) - rules = dict() - self.assertIn(sub_rule, sub_rules["rules"][relation]) - for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"]) - rules[relation] = list() - for rule in sub_rules["rules"][relation]: + self.assertIn(a_scope, scope.keys()) + + # add a new subrule + + sub_rule = [] + for cat, cat_func, func_name in ( + ("subject_categories", self.admin_manager.get_subject_scopes_dict, "subject_scope"), + ("action_categories", self.admin_manager.get_action_scopes_dict, "action_scope"), + ("object_categories", self.admin_manager.get_object_scopes_dict, "object_scope"), + ): + for cat_value in sub_meta_rules[relation_id][cat]: + scope = cat_func( + admin_subject_id, + authz_ie_dict["id"], + cat_value + ) + sub_rule.append(scope.keys()[0]) + + sub_rule.append(False) + + sub_rules = self.admin_manager.add_rule_dict(admin_subject_id, authz_ie_dict["id"], relation_id, sub_rule) + self.assertIsInstance(sub_rules, dict) + self.assertIn(sub_rule, sub_rules.values()) + + for rule_id, rule_value in sub_rules.iteritems(): for cat, cat_func, func_name in ( - ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), + ("subject_categories", self.admin_manager.get_subject_scopes_dict, "subject_category_scope"), + ("action_categories", self.admin_manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.admin_manager.get_object_scopes_dict, "object_category_scope"), ): - for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + for cat_value in sub_meta_rules[relation_id][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_subject_id, + authz_ie_dict["id"], cat_value ) - a_scope = rule.pop(0) - self.assertIn(a_scope, scope[func_name][cat_value]) + a_scope = rule_value.pop(0) + self.assertIn(a_scope, scope.keys()) + + # TODO: add test for the delete function +@dependency.requires('admin_api', 'authz_api', 'tenant_api', 'configuration_api', 'moonlog_api') class TestIntraExtensionAdminManagerKO(tests.TestCase): + # TODO: must be reviewed because some tests are on the authz interface def setUp(self): self.useFixture(database.Database()) super(TestIntraExtensionAdminManagerKO, self).setUp() self.load_backends() self.load_fixtures(default_fixtures) - self.manager = IntraExtensionAuthzManager() - self.admin_manager = IntraExtensionAdminManager() + self.admin = create_user(self, username="admin") + self.demo = create_user(self, username="demo") + self.root_intra_extension = create_intra_extension(self, policy_model="policy_root") + # force re-initialization of the ADMIN_ID variable + from keystone.contrib.moon.core import ADMIN_ID + self.ADMIN_ID = ADMIN_ID + self.manager = self.authz_api + self.admin_manager = self.admin_api def __get_key_from_value(self, value, values_dict): return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] @@ -1270,6 +996,8 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): return { "moonlog_api": LogManager(), "tenant_api": TenantManager(), + "admin_api": IntraExtensionAdminManager(), + "authz_api": IntraExtensionAuthzManager(), # "resource_api": resource.Manager(), } @@ -1283,1519 +1011,1117 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): group='moon', policy_directory=self.policy_directory) - def create_tenant(self): - tenant = { - "id": uuid.uuid4().hex, - "name": "TestIntraExtensionAuthzManager", - "enabled": True, - "description": "", - "domain_id": "default" - } - return self.resource_api.create_project(tenant["id"], tenant) - - def create_mapping(self, tenant, authz_uuid=None, admin_uuid=None): - - mapping = self.tenant_api.set_tenant_dict(tenant["id"], tenant["name"], authz_uuid, admin_uuid) - self.assertIsInstance(mapping, dict) - self.assertIn("authz", mapping) - self.assertEqual(mapping["authz"], authz_uuid) - return mapping - - def create_user(self, username="admin"): - - _USER = dict(USER) - _USER["name"] = username - return self.identity_api.create_user(_USER) - - def create_intra_extension(self, policy_model="policy_authz"): - - IE["policymodel"] = policy_model - IE["name"] = uuid.uuid4().hex - ref = self.admin_manager.load_intra_extension_dict(DEFAULT_USER_ID, intra_extension_dict=IE) - self.assertIsInstance(ref, dict) - return ref - def test_subjects(self): - admin_user = self.create_user("admin") - demo_user = self.create_user("demo") - ref = self.create_intra_extension() - - self.assertRaises( - AuthzException, - self.manager.get_subjects_dict, - demo_user["id"], ref["id"]) - - subjects = self.manager.get_subjects_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) - self.assertIsInstance(subjects["subjects"], dict) - - new_subject = self.create_user("new_user") - new_subjects = dict() - new_subjects[new_subject["id"]] = new_subject["name"] - + for key, value in subjects.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + self.assertIn("keystone_name", value) + self.assertIn("keystone_id", value) + + create_user(self, "subject_test") + new_subject = {"name": "subject_test", "description": "subject_test"} self.assertRaises( AuthzException, - self.manager.set_subject_dict, - demo_user["id"], ref["id"], new_subjects) - - subjects = self.manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) - self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) - self.assertEqual(subjects["subjects"], new_subjects) - self.assertIn(new_subject["id"], subjects["subjects"]) + self.manager.add_subject_dict, + demo_subject_id, admin_ie_dict["id"], new_subject) + + subjects = self.admin_manager.add_subject_dict(admin_subject_id, authz_ie_dict["id"], new_subject) + _subjects = dict(subjects) + self.assertEqual(len(_subjects.keys()), 1) + new_subject["id"] = _subjects.keys()[0] + value = subjects[new_subject["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_subject["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_subject["description"]) # Delete the new subject self.assertRaises( AuthzException, - self.manager.del_subject_dict, - demo_user["id"], ref["id"], new_subject["id"]) - - self.manager.del_subject(admin_user["id"], ref["id"], new_subject["id"]) - subjects = self.manager.get_subjects_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) - self.assertNotIn(new_subject["id"], subjects["subjects"]) - - # Add a particular subject - self.assertRaises( - AuthzException, - self.manager.add_subject_dict, - demo_user["id"], ref["id"], new_subject["id"]) + self.manager.del_subject, + demo_subject_id, authz_ie_dict["id"], new_subject["id"]) - subjects = self.manager.add_subject_dict(admin_user["id"], ref["id"], new_subject["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subject", subjects) - self.assertIn("uuid", subjects["subject"]) - self.assertEqual(new_subject["name"], subjects["subject"]["name"]) - subjects = self.manager.get_subjects_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(subjects, dict) - self.assertIn("subjects", subjects) - self.assertIn("id", subjects) - self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) - self.assertIn(new_subject["id"], subjects["subjects"]) + self.admin_manager.del_subject(admin_subject_id, authz_ie_dict["id"], new_subject["id"]) + subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in subjects.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_subject["name"], value["name"]) + self.assertIn("description", value) def test_objects(self): - admin_user = self.create_user("admin") - demo_user = self.create_user("demo") - ref = self.create_intra_extension() - - self.assertRaises( - AuthzException, - self.manager.get_objects_dict, - demo_user["id"], ref["id"]) - - objects = self.manager.get_objects_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + objects = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"]) + objects_id_list = [] self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(ref["id"], objects["intra_extension_uuid"]) - self.assertIsInstance(objects["objects"], dict) - - new_object = {"id": uuid.uuid4().hex, "name": "my_object"} - new_objects = dict() - new_objects[new_object["id"]] = new_object["name"] - + for key, value in objects.iteritems(): + objects_id_list.append(key) + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + create_user(self, "subject_test") + new_object = {"name": "object_test", "description": "object_test"} self.assertRaises( AuthzException, - self.manager.set_object_dict, - demo_user["id"], ref["id"], new_objects) - - objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects) - self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(ref["id"], objects["intra_extension_uuid"]) - self.assertEqual(objects["objects"], new_objects) - self.assertIn(new_object["id"], objects["objects"]) + self.manager.add_object_dict, + demo_subject_id, admin_ie_dict["id"], new_object) - # Delete the new object self.assertRaises( - AuthzException, - self.manager.del_object_dict, - demo_user["id"], ref["id"], new_object["id"]) - - self.manager.del_object(admin_user["id"], ref["id"], new_object["id"]) - objects = self.manager.get_objects_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(ref["id"], objects["intra_extension_uuid"]) - self.assertNotIn(new_object["id"], objects["objects"]) + ObjectsWriteNoAuthorized, + self.admin_manager.add_object_dict, + admin_subject_id, authz_ie_dict["id"], new_object + ) - # Add a particular object - self.assertRaises( - AuthzException, - self.manager.add_object_dict, - demo_user["id"], ref["id"], new_object["name"]) + # Delete the new object + for key in objects_id_list: + self.assertRaises( + AuthzException, + self.manager.del_object, + demo_subject_id, authz_ie_dict["id"], key) + self.assertRaises( + AuthzException, + self.manager.del_object, + admin_subject_id, authz_ie_dict["id"], key) - objects = self.manager.add_object_dict(admin_user["id"], ref["id"], new_object["name"]) - self.assertIsInstance(objects, dict) - self.assertIn("object", objects) - self.assertIn("uuid", objects["object"]) - self.assertEqual(new_object["name"], objects["object"]["name"]) - new_object["id"] = objects["object"]["uuid"] - objects = self.manager.get_objects_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(objects, dict) - self.assertIn("objects", objects) - self.assertIn("id", objects) - self.assertIn("intra_extension_uuid", objects) - self.assertEqual(ref["id"], objects["intra_extension_uuid"]) - self.assertIn(new_object["id"], objects["objects"]) + for key in objects_id_list: + self.assertRaises( + ObjectsWriteNoAuthorized, + self.admin_manager.del_object, + demo_subject_id, authz_ie_dict["id"], key) + self.assertRaises( + ObjectsWriteNoAuthorized, + self.admin_manager.del_object, + admin_subject_id, authz_ie_dict["id"], key) def test_actions(self): - admin_user = self.create_user("admin") - demo_user = self.create_user("demo") - ref = self.create_intra_extension() - - self.assertRaises( - AuthzException, - self.manager.get_actions_dict, - demo_user["id"], ref["id"]) - - actions = self.manager.get_actions_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + actions = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"]) + actions_id_list = [] self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(ref["id"], actions["intra_extension_uuid"]) - self.assertIsInstance(actions["actions"], dict) - - new_action = {"id": uuid.uuid4().hex, "name": "my_action"} - new_actions = dict() - new_actions[new_action["id"]] = new_action["name"] - + for key, value in actions.iteritems(): + actions_id_list.append(key) + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + create_user(self, "subject_test") + new_action = {"name": "action_test", "description": "action_test"} self.assertRaises( AuthzException, - self.manager.set_action_dict, - demo_user["id"], ref["id"], new_actions) + self.manager.add_action_dict, + demo_subject_id, admin_ie_dict["id"], new_action) - actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions) - self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(ref["id"], actions["intra_extension_uuid"]) - self.assertEqual(actions["actions"], new_actions) - self.assertIn(new_action["id"], actions["actions"]) - - # Delete the new action self.assertRaises( - AuthzException, - self.manager.del_action_dict, - demo_user["id"], ref["id"], new_action["id"]) - - self.manager.del_action(admin_user["id"], ref["id"], new_action["id"]) - actions = self.manager.get_actions_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(ref["id"], actions["intra_extension_uuid"]) - self.assertNotIn(new_action["id"], actions["actions"]) + ActionsWriteNoAuthorized, + self.admin_manager.add_action_dict, + admin_subject_id, authz_ie_dict["id"], new_action + ) - # Add a particular action - self.assertRaises( - AuthzException, - self.manager.add_action_dict, - demo_user["id"], ref["id"], new_action["name"]) + # Delete all actions + for key in actions_id_list: + self.assertRaises( + AuthzException, + self.manager.del_action, + demo_subject_id, authz_ie_dict["id"], key) + self.assertRaises( + AuthzException, + self.manager.del_action, + admin_subject_id, authz_ie_dict["id"], key) - actions = self.manager.add_action_dict(admin_user["id"], ref["id"], new_action["name"]) - self.assertIsInstance(actions, dict) - self.assertIn("action", actions) - self.assertIn("uuid", actions["action"]) - self.assertEqual(new_action["name"], actions["action"]["name"]) - new_action["id"] = actions["action"]["uuid"] - actions = self.manager.get_actions_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(actions, dict) - self.assertIn("actions", actions) - self.assertIn("id", actions) - self.assertIn("intra_extension_uuid", actions) - self.assertEqual(ref["id"], actions["intra_extension_uuid"]) - self.assertIn(new_action["id"], actions["actions"]) + for key in actions_id_list: + self.assertRaises( + ActionsWriteNoAuthorized, + self.admin_manager.del_action, + demo_subject_id, authz_ie_dict["id"], key) + self.assertRaises( + ActionsWriteNoAuthorized, + self.admin_manager.del_action, + admin_subject_id, authz_ie_dict["id"], key) def test_subject_categories(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - self.assertRaises( - AuthzException, - self.manager.get_subject_categories_dict, - demo_user["id"], ref["id"]) - - subject_categories = self.manager.get_subject_categories_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) - self.assertIsInstance(subject_categories["subject_categories"], dict) - - new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} - new_subject_categories = dict() - new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] + for key, value in subject_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + new_subject_category = {"name": "subject_category_test", "description": "subject_category_test"} self.assertRaises( AuthzException, - self.manager.set_subject_category_dict, - demo_user["id"], ref["id"], new_subject_categories) - - subject_categories = self.manager.set_subject_category_dict(admin_user["id"], ref["id"], new_subject_categories) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) - self.assertEqual(subject_categories["subject_categories"], new_subject_categories) - self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + self.manager.add_subject_category, + demo_subject_id, admin_ie_dict["id"], new_subject_category) + + subject_categories = self.admin_manager.add_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category) + _subject_categories = dict(subject_categories) + self.assertEqual(len(_subject_categories.keys()), 1) + new_subject_category["id"] = _subject_categories.keys()[0] + value = subject_categories[new_subject_category["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_subject_category["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_subject_category["description"]) # Delete the new subject_category self.assertRaises( AuthzException, - self.manager.del_subject_category_dict, - demo_user["id"], ref["id"], new_subject_category["id"]) - - self.manager.del_subject_category(admin_user["id"], ref["id"], new_subject_category["id"]) - subject_categories = self.manager.get_subject_categories_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) - self.assertNotIn(new_subject_category["id"], subject_categories["subject_categories"]) + self.manager.del_subject_category, + demo_subject_id, authz_ie_dict["id"], new_subject_category["id"]) - # Add a particular subject_category - self.assertRaises( - AuthzException, - self.manager.add_subject_category, - demo_user["id"], ref["id"], new_subject_category["name"]) - - subject_categories = self.manager.add_subject_category( - admin_user["id"], - ref["id"], - new_subject_category["name"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_category", subject_categories) - self.assertIn("uuid", subject_categories["subject_category"]) - self.assertEqual(new_subject_category["name"], subject_categories["subject_category"]["name"]) - new_subject_category["id"] = subject_categories["subject_category"]["uuid"] - subject_categories = self.manager.get_subject_categories_dict( - admin_user["id"], - ref["id"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) - self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + self.admin_manager.del_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category["id"]) + subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in subject_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_subject_category["name"], value["name"]) + self.assertIn("description", value) def test_object_categories(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - self.assertRaises( - AuthzException, - self.manager.get_object_category_dict, - demo_user["id"], ref["id"]) - - object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + object_categories = self.manager.get_object_category_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) - self.assertIsInstance(object_categories["object_categories"], dict) - - new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} - new_object_categories = dict() - new_object_categories[new_object_category["id"]] = new_object_category["name"] + for key, value in object_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + new_object_category = {"name": "object_category_test", "description": "object_category_test"} self.assertRaises( AuthzException, - self.manager.set_object_category_dict, - demo_user["id"], ref["id"], new_object_categories) - - object_categories = self.manager.set_object_category_dict(admin_user["id"], ref["id"], new_object_categories) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) - self.assertEqual(object_categories["object_categories"], new_object_categories) - self.assertIn(new_object_category["id"], object_categories["object_categories"]) + self.manager.add_object_category, + demo_subject_id, admin_ie_dict["id"], new_object_category) + + object_categories = self.admin_manager.add_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category) + _object_categories = dict(object_categories) + self.assertEqual(len(_object_categories.keys()), 1) + new_object_category["id"] = _object_categories.keys()[0] + value = object_categories[new_object_category["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_object_category["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_object_category["description"]) # Delete the new object_category self.assertRaises( AuthzException, self.manager.del_object_category, - demo_user["id"], ref["id"], new_object_category["id"]) - - self.manager.del_object_category(admin_user["id"], ref["id"], new_object_category["id"]) - object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) - self.assertNotIn(new_object_category["id"], object_categories["object_categories"]) + demo_subject_id, authz_ie_dict["id"], new_object_category["id"]) - # Add a particular object_category - self.assertRaises( - AuthzException, - self.manager.add_object_category, - demo_user["id"], ref["id"], new_object_category["name"]) - - object_categories = self.manager.add_object_category( - admin_user["id"], - ref["id"], - new_object_category["name"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_category", object_categories) - self.assertIn("uuid", object_categories["object_category"]) - self.assertEqual(new_object_category["name"], object_categories["object_category"]["name"]) - new_object_category["id"] = object_categories["object_category"]["uuid"] - object_categories = self.manager.get_object_category_dict( - admin_user["id"], - ref["id"]) - self.assertIsInstance(object_categories, dict) - self.assertIn("object_categories", object_categories) - self.assertIn("id", object_categories) - self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) - self.assertIn(new_object_category["id"], object_categories["object_categories"]) + self.admin_manager.del_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category["id"]) + object_categories = self.manager.get_object_category_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in object_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_object_category["name"], value["name"]) + self.assertIn("description", value) def test_action_categories(self): - admin_user = self.create_user() - demo_user = self.create_user("demo") - tenant = self.create_tenant() - ie_authz = self.create_intra_extension("policy_authz") - ie_admin = self.create_intra_extension("policy_admin") - mapping = self.create_mapping(tenant, ie_authz["id"], ie_admin["id"]) - ref = ie_authz - # admin_user = self.create_user("admin") - # ref = self.create_intra_extension() - - self.assertRaises( - AuthzException, - self.manager.get_action_category_dict, - demo_user["id"], ref["id"]) - - action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + action_categories = self.manager.get_action_category_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) - self.assertIsInstance(action_categories["action_categories"], dict) - - new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} - new_action_categories = dict() - new_action_categories[new_action_category["id"]] = new_action_category["name"] - self.assertRaises( - AuthzException, - self.manager.set_action_category_dict, - demo_user["id"], ref["id"], new_action_categories) + for key, value in action_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) - action_categories = self.manager.set_action_category_dict(admin_user["id"], ref["id"], new_action_categories) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) - self.assertEqual(action_categories["action_categories"], new_action_categories) - self.assertIn(new_action_category["id"], action_categories["action_categories"]) - - # Delete the new action_category + new_action_category = {"name": "action_category_test", "description": "action_category_test"} self.assertRaises( AuthzException, - self.manager.del_action_category_dict, - demo_user["id"], ref["id"], new_action_category["id"]) - - self.manager.del_action_category(admin_user["id"], ref["id"], new_action_category["id"]) - action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) - self.assertNotIn(new_action_category["id"], action_categories["action_categories"]) + self.manager.add_action_category, + demo_subject_id, admin_ie_dict["id"], new_action_category) + + action_categories = self.admin_manager.add_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category) + _action_categories = dict(action_categories) + self.assertEqual(len(_action_categories.keys()), 1) + new_action_category["id"] = _action_categories.keys()[0] + value = action_categories[new_action_category["id"]] + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertEqual(value["name"], new_action_category["name"]) + self.assertIn("description", value) + self.assertEqual(value["description"], new_action_category["description"]) - # Add a particular action_category + # Delete the new action_category self.assertRaises( AuthzException, - self.manager.add_action_category, - demo_user["id"], ref["id"], new_action_category["name"]) + self.manager.del_action_category, + demo_subject_id, authz_ie_dict["id"], new_action_category["id"]) - action_categories = self.manager.add_action_category( - admin_user["id"], - ref["id"], - new_action_category["name"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_category", action_categories) - self.assertIn("uuid", action_categories["action_category"]) - self.assertEqual(new_action_category["name"], action_categories["action_category"]["name"]) - new_action_category["id"] = action_categories["action_category"]["uuid"] - action_categories = self.manager.get_action_category_dict( - admin_user["id"], - ref["id"]) - self.assertIsInstance(action_categories, dict) - self.assertIn("action_categories", action_categories) - self.assertIn("id", action_categories) - self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) - self.assertIn(new_action_category["id"], action_categories["action_categories"]) + self.admin_manager.del_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category["id"]) + action_categories = self.manager.get_action_category_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in action_categories.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIsNot(new_action_category["name"], value["name"]) + self.assertIn("description", value) def test_subject_category_scope(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - subject_categories = self.manager.set_subject_category_dict( - admin_user["id"], - ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + subject_categories = self.admin_manager.add_subject_category( + admin_subject_id, + authz_ie_dict["id"], { - uuid.uuid4().hex: admin_user["id"], - uuid.uuid4().hex: "dev", + "name": "country", + "description": "country", } ) - for subject_category in subject_categories["subject_categories"]: - self.assertRaises( - AuthzException, - self.manager.get_subject_scopes_dict, - demo_user["id"], ref["id"], subject_category) + for subject_category_id in subject_categories: subject_category_scope = self.manager.get_subject_scopes_dict( - admin_user["id"], - ref["id"], - subject_category) + admin_subject_id, + authz_ie_dict["id"], + subject_category_id) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + self.assertEqual({}, subject_category_scope) - new_subject_category_scope = dict() - new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" + new_subject_category_scope = { + "name": "france", + "description": "france", + } self.assertRaises( AuthzException, - self.manager.set_subject_scope_dict, - demo_user["id"], ref["id"], subject_category, new_subject_category_scope) + self.admin_manager.add_subject_scope_dict, + demo_subject_id, authz_ie_dict["id"], subject_category_id, new_subject_category_scope) - subject_category_scope = self.manager.set_subject_scope_dict( - admin_user["id"], - ref["id"], - subject_category, + subject_category_scope = self.admin_manager.add_subject_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, new_subject_category_scope) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], - subject_category_scope["subject_category_scope"][subject_category].values()) + self.assertEqual(len(subject_category_scope.keys()), 1) + subject_category_scope_id = subject_category_scope.keys()[0] + subject_category_scope_value = subject_category_scope[subject_category_scope_id] + self.assertIn("name", subject_category_scope_value) + self.assertEqual(new_subject_category_scope["name"], subject_category_scope_value["name"]) + self.assertIn("description", subject_category_scope_value) + self.assertEqual(new_subject_category_scope["description"], subject_category_scope_value["description"]) # Delete the new subject_category_scope self.assertRaises( AuthzException, - self.manager.del_subject_category_scope_dict, - demo_user["id"], ref["id"], subject_category, new_subject_category_scope_uuid) - - self.manager.del_subject_scope( - admin_user["id"], - ref["id"], - subject_category, - new_subject_category_scope_uuid) - subject_category_scope = self.manager.get_subject_scopes_dict( - admin_user["id"], - ref["id"], - subject_category) + self.admin_manager.del_subject_scope, + demo_subject_id, authz_ie_dict["id"], subject_category_id, subject_category_scope_id) + + self.admin_manager.del_subject_scope( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, + subject_category_scope_id) + subject_category_scope = self.admin_manager.get_subject_scopes_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) - - # Add a particular subject_category_scope - self.assertRaises( - AuthzException, - self.manager.add_subject_scope_dict, - demo_user["id"], ref["id"], subject_category, - new_subject_category_scope[new_subject_category_scope_uuid]) - - subject_category_scope = self.manager.add_subject_scope_dict( - admin_user["id"], - ref["id"], - subject_category, - new_subject_category_scope[new_subject_category_scope_uuid]) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("uuid", subject_category_scope["subject_category_scope"]) - self.assertEqual(new_subject_category_scope[new_subject_category_scope_uuid], - subject_category_scope["subject_category_scope"]["name"]) - subject_category_scope = self.manager.get_subject_scopes_dict( - admin_user["id"], - ref["id"], - subject_category) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) + self.assertNotIn(subject_category_scope_id, subject_category_scope.keys()) def test_object_category_scope(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - object_categories = self.manager.set_object_category_dict( - admin_user["id"], - ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + object_categories = self.admin_manager.add_object_category( + admin_subject_id, + authz_ie_dict["id"], { - uuid.uuid4().hex: "id", - uuid.uuid4().hex: "domain", + "name": "country", + "description": "country", } ) - for object_category in object_categories["object_categories"]: - self.assertRaises( - AuthzException, - self.manager.get_object_scopes_dict, - demo_user["id"], ref["id"], object_category) + for object_category_id in object_categories: object_category_scope = self.manager.get_object_scopes_dict( - admin_user["id"], - ref["id"], - object_category) + admin_subject_id, + authz_ie_dict["id"], + object_category_id) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIsInstance(object_category_scope["object_category_scope"], dict) + self.assertEqual({}, object_category_scope) - new_object_category_scope = dict() - new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" + new_object_category_scope = { + "name": "france", + "description": "france", + } self.assertRaises( AuthzException, - self.manager.set_object_scope_dict, - demo_user["id"], ref["id"], object_category, new_object_category_scope) + self.admin_manager.add_object_scope_dict, + demo_subject_id, authz_ie_dict["id"], object_category_id, new_object_category_scope) - object_category_scope = self.manager.set_object_scope_dict( - admin_user["id"], - ref["id"], - object_category, + object_category_scope = self.admin_manager.add_object_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, new_object_category_scope) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIn(new_object_category_scope[new_object_category_scope_uuid], - object_category_scope["object_category_scope"][object_category].values()) + self.assertEqual(len(object_category_scope.keys()), 1) + object_category_scope_id = object_category_scope.keys()[0] + object_category_scope_value = object_category_scope[object_category_scope_id] + self.assertIn("name", object_category_scope_value) + self.assertEqual(new_object_category_scope["name"], object_category_scope_value["name"]) + self.assertIn("description", object_category_scope_value) + self.assertEqual(new_object_category_scope["description"], object_category_scope_value["description"]) # Delete the new object_category_scope self.assertRaises( AuthzException, - self.manager.del_object_category_scope_dict, - demo_user["id"], ref["id"], object_category, new_object_category_scope) - - self.manager.del_object_scope( - admin_user["id"], - ref["id"], - object_category, - new_object_category_scope_uuid) - object_category_scope = self.manager.get_object_scopes_dict( - admin_user["id"], - ref["id"], - object_category) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) - - # Add a particular object_category_scope - self.assertRaises( - AuthzException, - self.manager.add_object_scope_dict, - demo_user["id"], ref["id"], object_category, - new_object_category_scope[new_object_category_scope_uuid] - ) - - object_category_scope = self.manager.add_object_scope_dict( - admin_user["id"], - ref["id"], - object_category, - new_object_category_scope[new_object_category_scope_uuid]) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("uuid", object_category_scope["object_category_scope"]) - self.assertEqual(new_object_category_scope[new_object_category_scope_uuid], - object_category_scope["object_category_scope"]["name"]) - object_category_scope = self.manager.get_object_scopes_dict( - admin_user["id"], - ref["id"], - object_category) + self.admin_manager.del_object_scope, + demo_subject_id, authz_ie_dict["id"], object_category_id, object_category_scope_id) + + self.admin_manager.del_object_scope( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, + object_category_scope_id) + object_category_scope = self.admin_manager.get_object_scopes_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) + self.assertNotIn(object_category_scope_id, object_category_scope.keys()) def test_action_category_scope(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - action_categories = self.manager.set_action_category_dict( - admin_user["id"], - ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + action_categories = self.admin_manager.add_action_category( + admin_subject_id, + authz_ie_dict["id"], { - uuid.uuid4().hex: "compute", - uuid.uuid4().hex: "identity", + "name": "swift", + "description": "swift actions", } ) - for action_category in action_categories["action_categories"]: - self.assertRaises( - AuthzException, - self.manager.get_object_scopes_dict, - demo_user["id"], ref["id"], action_category) + for action_category_id in action_categories: action_category_scope = self.manager.get_action_scopes_dict( - admin_user["id"], - ref["id"], - action_category) + admin_subject_id, + authz_ie_dict["id"], + action_category_id) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIsInstance(action_category_scope["action_category_scope"], dict) + self.assertEqual({}, action_category_scope) - new_action_category_scope = dict() - new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" + new_action_category_scope = { + "name": "get", + "description": "get swift files", + } self.assertRaises( AuthzException, - self.manager.set_action_scope_dict, - demo_user["id"], ref["id"], action_category, new_action_category_scope) + self.admin_manager.add_action_scope_dict, + demo_subject_id, authz_ie_dict["id"], action_category_id, new_action_category_scope) - action_category_scope = self.manager.set_action_scope_dict( - admin_user["id"], - ref["id"], - action_category, + action_category_scope = self.admin_manager.add_action_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, new_action_category_scope) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIn(new_action_category_scope[new_action_category_scope_uuid], - action_category_scope["action_category_scope"][action_category].values()) + self.assertEqual(len(action_category_scope.keys()), 1) + action_category_scope_id = action_category_scope.keys()[0] + action_category_scope_value = action_category_scope[action_category_scope_id] + self.assertIn("name", action_category_scope_value) + self.assertEqual(new_action_category_scope["name"], action_category_scope_value["name"]) + self.assertIn("description", action_category_scope_value) + self.assertEqual(new_action_category_scope["description"], action_category_scope_value["description"]) # Delete the new action_category_scope self.assertRaises( AuthzException, - self.manager.del_action_category_scope_dict, - demo_user["id"], ref["id"], action_category, - new_action_category_scope_uuid - ) - - self.manager.del_action_scope( - admin_user["id"], - ref["id"], - action_category, - new_action_category_scope_uuid) - action_category_scope = self.manager.get_action_scopes_dict( - admin_user["id"], - ref["id"], - action_category) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) - - # Add a particular action_category_scope - self.assertRaises( - AuthzException, - self.manager.add_action_scope_dict, - demo_user["id"], ref["id"], action_category, - new_action_category_scope[new_action_category_scope_uuid] - ) - - action_category_scope = self.manager.add_action_scope_dict( - admin_user["id"], - ref["id"], - action_category, - new_action_category_scope[new_action_category_scope_uuid]) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("uuid", action_category_scope["action_category_scope"]) - self.assertEqual(new_action_category_scope[new_action_category_scope_uuid], - action_category_scope["action_category_scope"]["name"]) - action_category_scope = self.manager.get_action_scopes_dict( - admin_user["id"], - ref["id"], - action_category) + self.admin_manager.del_action_scope, + demo_subject_id, authz_ie_dict["id"], action_category_id, action_category_scope_id) + + self.admin_manager.del_action_scope( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, + action_category_scope_id) + action_category_scope = self.admin_manager.get_action_scopes_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) + self.assertNotIn(action_category_scope_id, action_category_scope.keys()) def test_subject_category_assignment(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - new_subject = self.create_user() - new_subjects = dict() - new_subjects[new_subject["id"]] = new_subject["name"] - subjects = self.manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) - - new_subject_category_uuid = uuid.uuid4().hex - new_subject_category_value = "role" - subject_categories = self.manager.set_subject_category_dict( - admin_user["id"], - ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + admin_authz_subject_id, admin_authz_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + demo_authz_subject_id, demo_authz_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next() + + subjects_dict = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"]) + + subject_categories = self.admin_manager.add_subject_category( + admin_subject_id, + authz_ie_dict["id"], { - new_subject_category_uuid: new_subject_category_value + "name": "country", + "description": "country", } ) - for subject_category in subject_categories["subject_categories"]: + for subject_category_id in subject_categories: subject_category_scope = self.manager.get_subject_scopes_dict( - admin_user["id"], - ref["id"], - subject_category) + admin_subject_id, + authz_ie_dict["id"], + subject_category_id) self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) - - new_subject_category_scope = dict() - new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = admin_user["id"] - subject_category_scope = self.manager.set_subject_scope_dict( - admin_user["id"], - ref["id"], - subject_category, - new_subject_category_scope) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], - subject_category_scope["subject_category_scope"][subject_category].values()) - - new_subject_category_scope2 = dict() - new_subject_category_scope2_uuid = uuid.uuid4().hex - new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" - subject_category_scope = self.manager.set_subject_scope_dict( - admin_user["id"], - ref["id"], - subject_category, - new_subject_category_scope2) - self.assertIsInstance(subject_category_scope, dict) - self.assertIn("subject_category_scope", subject_category_scope) - self.assertIn("id", subject_category_scope) - self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) - self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], - subject_category_scope["subject_category_scope"][subject_category].values()) + self.assertEqual({}, subject_category_scope) - self.assertRaises( - AuthzException, - self.manager.get_subject_assignment_dict, - demo_user["id"], ref["id"], new_subject["id"]) + new_subject_category_scope_1 = { + "name": "france", + "description": "france", + } - subject_category_assignments = self.manager.get_subject_assignment_dict( - admin_user["id"], - ref["id"], - new_subject["id"] + subject_category_scope_1 = self.admin_manager.add_subject_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, + new_subject_category_scope_1) + subject_category_scope_1_id = subject_category_scope_1.keys()[0] + + new_subject_category_scope_2 = { + "name": "china", + "description": "china", + } + + subject_category_scope_2 = self.admin_manager.add_subject_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + subject_category_id, + new_subject_category_scope_2) + subject_category_scope_2_id = subject_category_scope_2.keys()[0] + + subject_category_assignments = self.manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, + subject_category_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual([], subject_category_assignments) + + subject_category_assignments = self.manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + demo_authz_subject_id, + subject_category_id + ) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual([], subject_category_assignments) self.assertRaises( AuthzException, - self.manager.set_subject_assignment_dict, - demo_user["id"], ref["id"], new_subject["id"], - { - new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], - } + self.manager.add_subject_assignment_list, + demo_subject_id, authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_1_id ) - subject_category_assignments = self.manager.set_subject_assignment_dict( - admin_user["id"], - ref["id"], - new_subject["id"], - { - new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], - } + self.assertRaises( + AuthzException, + self.manager.add_subject_assignment_list, + demo_subject_id, authz_ie_dict["id"], + demo_authz_subject_id, subject_category_id, subject_category_scope_2_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) - subject_category_assignments = self.manager.get_subject_assignment_dict( - admin_user["id"], - ref["id"], - new_subject["id"] + + subject_category_assignments = self.admin_manager.add_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_1_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + self.assertIsInstance(subject_category_assignments, list) - self.assertRaises( - AuthzException, - self.manager.del_subject_category_assignment_dict, - demo_user["id"], ref["id"], new_subject["id"], - new_subject_category_uuid, - new_subject_category_scope_uuid + self.assertEqual(len(subject_category_assignments), 1) + + subject_category_assignments = self.admin_manager.add_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_2_id ) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 2) - self.manager.del_subject_assignment( - admin_user["id"], - ref["id"], - new_subject["id"], - new_subject_category_uuid, - new_subject_category_scope_uuid + subject_category_assignments = self.admin_manager.add_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + demo_authz_subject_id, subject_category_id, subject_category_scope_2_id ) - subject_category_assignments = self.manager.get_subject_assignment_dict( - admin_user["id"], - ref["id"], - new_subject["id"] + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 1) + + subject_category_assignments = self.admin_manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope2_uuid, ]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 2) self.assertRaises( AuthzException, - self.manager.add_subject_assignment_list, - demo_user["id"], ref["id"], new_subject["id"], - new_subject_category_uuid, - new_subject_category_scope_uuid + self.admin_manager.del_subject_assignment, + demo_subject_id, authz_ie_dict["id"], + demo_authz_subject_id, subject_category_id, subject_category_scope_2_id ) - data = self.manager.add_subject_assignment_list( - admin_user["id"], - ref["id"], - new_subject["id"], - new_subject_category_uuid, - new_subject_category_scope_uuid + self.admin_manager.del_subject_assignment( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_2_id + ) + subject_category_assignments = self.admin_manager.get_subject_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id ) + self.assertIsInstance(subject_category_assignments, list) + self.assertEqual(len(subject_category_assignments), 1) - subject_category_assignments = self.manager.get_subject_assignment_dict( - admin_user["id"], - ref["id"], - new_subject["id"] + self.assertRaises( + SubjectAssignmentUnknown, + self.admin_manager.del_subject_assignment, + admin_subject_id, + authz_ie_dict["id"], + admin_authz_subject_id, subject_category_id, subject_category_scope_2_id ) - self.assertIsInstance(subject_category_assignments, dict) - self.assertIn("subject_category_assignments", subject_category_assignments) - self.assertIn("id", subject_category_assignments) - self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_subject_category_uuid: [new_subject_category_scope2_uuid, new_subject_category_scope_uuid]}, - subject_category_assignments["subject_category_assignments"][new_subject["id"]]) def test_object_category_assignment(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - new_object = {"id": uuid.uuid4().hex, "name": "my_object"} - new_objects = dict() - new_objects[new_object["id"]] = new_object["name"] - objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects) - - new_object_category_uuid = uuid.uuid4().hex - new_object_category_value = "role" - object_categories = self.manager.set_object_category_dict( - admin_user["id"], - ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + objects_dict = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"]) + + object_vm1_id = None + object_vm2_id = None + for _object_id in objects_dict: + if objects_dict[_object_id]['name'] == 'vm1': + object_vm1_id = _object_id + if objects_dict[_object_id]['name'] == 'vm2': + object_vm2_id = _object_id + if not object_vm1_id or not object_vm2_id: + raise Exception("Cannot run tests, database is corrupted ? (need upload and list in objects)") + + object_categories = self.admin_manager.add_object_category( + admin_subject_id, + authz_ie_dict["id"], { - new_object_category_uuid: new_object_category_value + "name": "location", + "description": "location", } ) - for object_category in object_categories["object_categories"]: + for object_category_id in object_categories: object_category_scope = self.manager.get_object_scopes_dict( - admin_user["id"], - ref["id"], - object_category) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIsInstance(object_category_scope["object_category_scope"], dict) - - new_object_category_scope = dict() - new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = admin_user["id"] - object_category_scope = self.manager.set_object_scope_dict( - admin_user["id"], - ref["id"], - object_category, - new_object_category_scope) - self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIn(new_object_category_scope[new_object_category_scope_uuid], - object_category_scope["object_category_scope"][object_category].values()) - - new_object_category_scope2 = dict() - new_object_category_scope2_uuid = uuid.uuid4().hex - new_object_category_scope2[new_object_category_scope2_uuid] = "dev" - object_category_scope = self.manager.set_object_scope_dict( - admin_user["id"], - ref["id"], - object_category, - new_object_category_scope2) + admin_subject_id, + authz_ie_dict["id"], + object_category_id) self.assertIsInstance(object_category_scope, dict) - self.assertIn("object_category_scope", object_category_scope) - self.assertIn("id", object_category_scope) - self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) - self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], - object_category_scope["object_category_scope"][object_category].values()) + self.assertEqual({}, object_category_scope) - self.assertRaises( - AuthzException, - self.manager.get_object_assignment_list, - demo_user["id"], ref["id"], new_object["id"] + new_object_category_scope_1 = { + "name": "france", + "description": "france", + } + + object_category_scope_1 = self.admin_manager.add_object_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, + new_object_category_scope_1) + object_category_scope_1_id = object_category_scope_1.keys()[0] + + new_object_category_scope_2 = { + "name": "china", + "description": "china", + } + + object_category_scope_2 = self.admin_manager.add_object_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + object_category_id, + new_object_category_scope_2) + object_category_scope_2_id = object_category_scope_2.keys()[0] + + object_category_assignments = self.manager.get_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, + object_category_id ) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual([], object_category_assignments) object_category_assignments = self.manager.get_object_assignment_list( - admin_user["id"], - ref["id"], - new_object["id"] + admin_subject_id, + authz_ie_dict["id"], + object_vm2_id, + object_category_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual([], object_category_assignments) self.assertRaises( AuthzException, - self.manager.set_object_category_assignment_dict, - demo_user["id"], ref["id"], new_object["id"], - { - new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], - } + self.manager.add_object_assignment_list, + demo_subject_id, authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_1_id ) - object_category_assignments = self.manager.set_object_category_assignment_dict( - admin_user["id"], - ref["id"], - new_object["id"], - { - new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], - } + self.assertRaises( + AuthzException, + self.manager.add_object_assignment_list, + demo_subject_id, authz_ie_dict["id"], + object_vm2_id, object_category_id, object_category_scope_2_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) - object_category_assignments = self.manager.get_object_assignment_list( - admin_user["id"], - ref["id"], - new_object["id"] + + object_category_assignments = self.admin_manager.add_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_1_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) + self.assertIsInstance(object_category_assignments, list) - self.assertRaises( - AuthzException, - self.manager.del_object_category_assignment_dict, - demo_user["id"], ref["id"], new_object["id"], - new_object_category_uuid, - new_object_category_scope_uuid + self.assertEqual(len(object_category_assignments), 1) + + object_category_assignments = self.admin_manager.add_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_2_id ) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 2) - self.manager.del_object_assignment( - admin_user["id"], - ref["id"], - new_object["id"], - new_object_category_uuid, - new_object_category_scope_uuid + object_category_assignments = self.admin_manager.add_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm2_id, object_category_id, object_category_scope_2_id ) - object_category_assignments = self.manager.get_object_assignment_list( - admin_user["id"], - ref["id"], - new_object["id"] + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 1) + + object_category_assignments = self.admin_manager.get_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope2_uuid, ]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 2) self.assertRaises( AuthzException, - self.manager.add_object_assignment_list, - demo_user["id"], ref["id"], new_object["id"], - new_object_category_uuid, - new_object_category_scope_uuid + self.admin_manager.del_object_assignment, + demo_subject_id, authz_ie_dict["id"], + object_vm2_id, object_category_id, object_category_scope_2_id ) - self.manager.add_object_assignment_list( - admin_user["id"], - ref["id"], - new_object["id"], - new_object_category_uuid, - new_object_category_scope_uuid + self.admin_manager.del_object_assignment( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_2_id + ) + object_category_assignments = self.admin_manager.get_object_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id ) + self.assertIsInstance(object_category_assignments, list) + self.assertEqual(len(object_category_assignments), 1) - object_category_assignments = self.manager.get_object_assignment_list( - admin_user["id"], - ref["id"], - new_object["id"] + self.assertRaises( + ObjectAssignmentUnknown, + self.admin_manager.del_object_assignment, + admin_subject_id, + authz_ie_dict["id"], + object_vm1_id, object_category_id, object_category_scope_2_id ) - self.assertIsInstance(object_category_assignments, dict) - self.assertIn("object_category_assignments", object_category_assignments) - self.assertIn("id", object_category_assignments) - self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_object_category_uuid: [new_object_category_scope2_uuid, new_object_category_scope_uuid]}, - object_category_assignments["object_category_assignments"][new_object["id"]]) def test_action_category_assignment(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - new_action = {"id": uuid.uuid4().hex, "name": "my_action"} - new_actions = dict() - new_actions[new_action["id"]] = new_action["name"] - actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions) - - new_action_category_uuid = uuid.uuid4().hex - new_action_category_value = "role" - action_categories = self.manager.set_action_category_dict( - admin_user["id"], - ref["id"], + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + actions_dict = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"]) + + action_upload_id = None + action_list_id = None + for _action_id in actions_dict: + if actions_dict[_action_id]['name'] == 'upload': + action_upload_id = _action_id + if actions_dict[_action_id]['name'] == 'list': + action_list_id = _action_id + if not action_upload_id or not action_list_id: + raise Exception("Cannot run tests, database is corrupted ? (need upload and list in actions)") + + action_categories = self.admin_manager.add_action_category( + admin_subject_id, + authz_ie_dict["id"], { - new_action_category_uuid: new_action_category_value + "name": "swift", + "description": "swift actions", } ) - for action_category in action_categories["action_categories"]: + for action_category_id in action_categories: action_category_scope = self.manager.get_action_scopes_dict( - admin_user["id"], - ref["id"], - action_category) + admin_subject_id, + authz_ie_dict["id"], + action_category_id) self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIsInstance(action_category_scope["action_category_scope"], dict) - - new_action_category_scope = dict() - new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = admin_user["id"] - action_category_scope = self.manager.set_action_scope_dict( - admin_user["id"], - ref["id"], - action_category, - new_action_category_scope) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIn(new_action_category_scope[new_action_category_scope_uuid], - action_category_scope["action_category_scope"][action_category].values()) - - new_action_category_scope2 = dict() - new_action_category_scope2_uuid = uuid.uuid4().hex - new_action_category_scope2[new_action_category_scope2_uuid] = "dev" - action_category_scope = self.manager.set_action_scope_dict( - admin_user["id"], - ref["id"], - action_category, - new_action_category_scope2) - self.assertIsInstance(action_category_scope, dict) - self.assertIn("action_category_scope", action_category_scope) - self.assertIn("id", action_category_scope) - self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) - self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], - action_category_scope["action_category_scope"][action_category].values()) + self.assertEqual({}, action_category_scope) - self.assertRaises( - AuthzException, - self.manager.get_action_assignment_list, - demo_user["id"], ref["id"], new_action["id"] + new_action_category_scope_1 = { + "name": "swift_admin", + "description": "action require admin rights", + } + + action_category_scope_1 = self.admin_manager.add_action_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, + new_action_category_scope_1) + action_category_scope_1_id = action_category_scope_1.keys()[0] + + new_action_category_scope_2 = { + "name": "swift_anonymous", + "description": "action require no right", + } + + action_category_scope_2 = self.admin_manager.add_action_scope_dict( + admin_subject_id, + authz_ie_dict["id"], + action_category_id, + new_action_category_scope_2) + action_category_scope_2_id = action_category_scope_2.keys()[0] + + action_category_assignments = self.manager.get_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, + action_category_id ) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual([], action_category_assignments) action_category_assignments = self.manager.get_action_assignment_list( - admin_user["id"], - ref["id"], - new_action["id"] + admin_subject_id, + authz_ie_dict["id"], + action_list_id, + action_category_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual([], action_category_assignments) self.assertRaises( AuthzException, - self.manager.set_action_assignment_dict, - demo_user["id"], ref["id"], new_action["id"], - { - new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], - } + self.manager.add_action_assignment_list, + demo_subject_id, authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_1_id ) - action_category_assignments = self.manager.set_action_assignment_dict( - admin_user["id"], - ref["id"], - new_action["id"], - { - new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], - } + self.assertRaises( + AuthzException, + self.manager.add_action_assignment_list, + demo_subject_id, authz_ie_dict["id"], + action_list_id, action_category_id, action_category_scope_2_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) - action_category_assignments = self.manager.get_action_assignment_list( - admin_user["id"], - ref["id"], - new_action["id"] + + action_category_assignments = self.admin_manager.add_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_1_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) + self.assertIsInstance(action_category_assignments, list) - self.assertRaises( - AuthzException, - self.manager.del_action_category_assignment_dict, - demo_user["id"], ref["id"], new_action["id"], - new_action_category_uuid, - new_action_category_scope_uuid + self.assertEqual(len(action_category_assignments), 1) + + action_category_assignments = self.admin_manager.add_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_2_id ) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 2) - self.manager.del_action_assignment( - admin_user["id"], - ref["id"], - new_action["id"], - new_action_category_uuid, - new_action_category_scope_uuid + action_category_assignments = self.admin_manager.add_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_list_id, action_category_id, action_category_scope_2_id ) - action_category_assignments = self.manager.get_action_assignment_list( - admin_user["id"], - ref["id"], - new_action["id"] + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 1) + + action_category_assignments = self.admin_manager.get_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope2_uuid, ]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 2) self.assertRaises( AuthzException, - self.manager.add_action_assignment_list, - demo_user["id"], ref["id"], new_action["id"], - new_action_category_uuid, - new_action_category_scope_uuid + self.admin_manager.del_action_assignment, + demo_subject_id, authz_ie_dict["id"], + action_list_id, action_category_id, action_category_scope_2_id ) - self.manager.add_action_assignment_list( - admin_user["id"], - ref["id"], - new_action["id"], - new_action_category_uuid, - new_action_category_scope_uuid + self.admin_manager.del_action_assignment( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_2_id + ) + action_category_assignments = self.admin_manager.get_action_assignment_list( + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id ) + self.assertIsInstance(action_category_assignments, list) + self.assertEqual(len(action_category_assignments), 1) - action_category_assignments = self.manager.get_action_assignment_list( - admin_user["id"], - ref["id"], - new_action["id"] + self.assertRaises( + ActionAssignmentUnknown, + self.admin_manager.del_action_assignment, + admin_subject_id, + authz_ie_dict["id"], + action_upload_id, action_category_id, action_category_scope_2_id ) - self.assertIsInstance(action_category_assignments, dict) - self.assertIn("action_category_assignments", action_category_assignments) - self.assertIn("id", action_category_assignments) - self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) - self.assertEqual( - {new_action_category_uuid: [new_action_category_scope2_uuid, new_action_category_scope_uuid]}, - action_category_assignments["action_category_assignments"][new_action["id"]]) def test_sub_meta_rules(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"]) - self.assertIsInstance(aggregation_algorithms, dict) - self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) - self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) - self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) - - self.assertRaises( - AuthzException, - self.manager.get_aggregation_algorithm_dict, - demo_user["id"], ref["id"] - ) - - aggregation_algorithm = self.manager.get_aggregation_algorithm_dict(admin_user["id"], ref["id"]) - self.assertIsInstance(aggregation_algorithm, dict) - self.assertIn("aggregation", aggregation_algorithm) - self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) - - _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) - _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) - - self.assertRaises( - AuthzException, - self.manager.set_aggregation_algorithms, - demo_user["id"], ref["id"], _aggregation_algorithm[0] - ) - - aggregation_algorithm = self.manager.set_aggregation_algorithm_dict(admin_user["id"], ref["id"], _aggregation_algorithm[0]) - self.assertIsInstance(aggregation_algorithm, dict) - self.assertIn("aggregation", aggregation_algorithm) - self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) - - self.assertRaises( - AuthzException, - self.manager.get_sub_meta_rules_dict, - demo_user["id"], ref["id"] - ) - - sub_meta_rules = self.manager.get_sub_meta_rules_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + aggregation_algorithms = self.admin_manager.get_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"]) + for key, value in aggregation_algorithms.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("name", value) + self.assertIn("description", value) + + # TODO: need more tests on aggregation_algorithms (set and del) + + sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(sub_meta_rules, dict) - self.assertIn("sub_meta_rules", sub_meta_rules) - sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, ref["model"], "metarule.json"))) - metarule = dict() categories = { - "subject_categories": self.manager.get_subject_categories_dict(admin_user["id"], ref["id"]), - "object_categories": self.manager.get_object_category_dict(admin_user["id"], ref["id"]), - "action_categories": self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + "subject_categories": self.admin_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"]), + "object_categories": self.admin_manager.get_object_category_dict(admin_subject_id, authz_ie_dict["id"]), + "action_categories": self.admin_manager.get_action_category_dict(admin_subject_id, authz_ie_dict["id"]) } - for relation in sub_meta_rules_conf["sub_meta_rules"]: - metarule[relation] = dict() - for item in ("subject_categories", "object_categories", "action_categories"): - metarule[relation][item] = list() - for element in sub_meta_rules_conf["sub_meta_rules"][relation][item]: - metarule[relation][item].append(self.__get_key_from_value( - element, - categories[item][item] - )) - - for relation in sub_meta_rules["sub_meta_rules"]: - self.assertIn(relation, metarule) - for item in ("subject_categories", "object_categories", "action_categories"): - self.assertEqual( - sub_meta_rules["sub_meta_rules"][relation][item], - metarule[relation][item] - ) - - new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} - # Add a particular subject_category - data = self.manager.add_subject_category( - admin_user["id"], - ref["id"], - new_subject_category["name"]) - new_subject_category["id"] = data["subject_category"]["uuid"] - subject_categories = self.manager.get_subject_categories_dict( - admin_user["id"], - ref["id"]) - self.assertIsInstance(subject_categories, dict) - self.assertIn("subject_categories", subject_categories) - self.assertIn("id", subject_categories) - self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) - self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) - metarule[relation]["subject_categories"].append(new_subject_category["id"]) - - self.assertRaises( - AuthzException, - self.manager.get_sub_meta_rule_dict, - demo_user["id"], ref["id"], metarule - ) - - _sub_meta_rules = self.manager.get_sub_meta_rule_dict(admin_user["id"], ref["id"], metarule) - self.assertIn(relation, metarule) - for item in ("subject_categories", "object_categories", "action_categories"): - self.assertEqual( - _sub_meta_rules["sub_meta_rules"][relation][item], - metarule[relation][item] - ) + for key, value in sub_meta_rules.iteritems(): + self.assertIsInstance(value, dict) + self.assertIn("action_categories", value) + self.assertIn("object_categories", value) + self.assertIn("subject_categories", value) + self.assertIn("algorithm", value) + self.assertIn("name", value) + for action_category_id in value["action_categories"]: + self.assertIn(action_category_id, categories["action_categories"]) + for object_category_id in value["object_categories"]: + self.assertIn(object_category_id, categories["object_categories"]) + for subject_category_id in value["subject_categories"]: + self.assertIn(subject_category_id, categories["subject_categories"]) + # TODO: need more tests (set and del) def test_sub_rules(self): - demo_user = self.create_user("demo") - admin_user = self.create_user("admin") - ref = self.create_intra_extension() - - sub_meta_rules = self.manager.get_sub_meta_rules_dict(admin_user["id"], ref["id"]) + authz_ie_dict = create_intra_extension(self, "policy_authz") + admin_ie_dict = create_intra_extension(self, "policy_admin") + tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id']) + + admin_subject_id, admin_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next() + demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"], + {"name": "demo", "description": "demo"}) + demo_subject_id, demo_subject_dict = \ + self.tenant_api.get_subject_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next() + + sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"]) self.assertIsInstance(sub_meta_rules, dict) - self.assertIn("sub_meta_rules", sub_meta_rules) - self.assertRaises( - AuthzException, - self.manager.get_sub_rules, - demo_user["id"], ref["id"] - ) - - sub_rules = self.manager.get_sub_rules(admin_user["id"], ref["id"]) - self.assertIsInstance(sub_rules, dict) - self.assertIn("rules", sub_rules) - rules = dict() - for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) - rules[relation] = list() - for rule in sub_rules["rules"][relation]: + for relation_id in sub_meta_rules: + rules = self.admin_manager.get_rules_dict(admin_subject_id, authz_ie_dict["id"], relation_id) + rule_length = len(sub_meta_rules[relation_id]["subject_categories"]) + \ + len(sub_meta_rules[relation_id]["object_categories"]) + \ + len(sub_meta_rules[relation_id]["action_categories"]) + 1 + for rule_id in rules: + self.assertEqual(rule_length, len(rules[rule_id])) + rule = list(rules[rule_id]) for cat, cat_func, func_name in ( - ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), + ("subject_categories", self.admin_manager.get_subject_scopes_dict, "subject_scope"), + ("action_categories", self.admin_manager.get_action_scopes_dict, "action_scope"), + ("object_categories", self.admin_manager.get_object_scopes_dict, "object_scope"), ): - for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + for cat_value in sub_meta_rules[relation_id][cat]: scope = cat_func( - admin_user["id"], - ref["id"], + admin_subject_id, + authz_ie_dict["id"], cat_value ) a_scope = rule.pop(0) if type(a_scope) is not bool: - self.assertIn(a_scope, scope[func_name][cat_value]) - - # add a new subrule - - relation = sub_rules["rules"].keys()[0] - sub_rule = [] - for cat, cat_func, func_name in ( - ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), - ): - for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: - scope = cat_func( - admin_user["id"], - ref["id"], - cat_value - ) - sub_rule.append(scope[func_name][cat_value].keys()[0]) - - sub_rule.append(True) - self.assertRaises( - AuthzException, - self.manager.set_sub_rules, - demo_user["id"], ref["id"], relation, sub_rule - ) + self.assertIn(a_scope, scope.keys()) + + # add a new subrule + + sub_rule = [] + for cat, cat_func, func_name in ( + ("subject_categories", self.admin_manager.get_subject_scopes_dict, "subject_scope"), + ("action_categories", self.admin_manager.get_action_scopes_dict, "action_scope"), + ("object_categories", self.admin_manager.get_object_scopes_dict, "object_scope"), + ): + for cat_value in sub_meta_rules[relation_id][cat]: + scope = cat_func( + admin_subject_id, + authz_ie_dict["id"], + cat_value + ) + sub_rule.append(scope.keys()[0]) + + sub_rule.append(False) + self.assertRaises( + AuthzException, + self.admin_manager.add_rule_dict, + demo_subject_id, authz_ie_dict["id"], relation_id, sub_rule + ) + + sub_rules = self.admin_manager.add_rule_dict(admin_subject_id, authz_ie_dict["id"], relation_id, sub_rule) + self.assertIsInstance(sub_rules, dict) + self.assertIn(sub_rule, sub_rules.values()) - sub_rules = self.manager.set_sub_rule(admin_user["id"], ref["id"], relation, sub_rule) - self.assertIsInstance(sub_rules, dict) - self.assertIn("rules", sub_rules) - rules = dict() - self.assertIn(sub_rule, sub_rules["rules"][relation]) - for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) - rules[relation] = list() - for rule in sub_rules["rules"][relation]: + for rule_id, rule_value in sub_rules.iteritems(): for cat, cat_func, func_name in ( - ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), + ("subject_categories", self.admin_manager.get_subject_scopes_dict, "subject_category_scope"), + ("action_categories", self.admin_manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.admin_manager.get_object_scopes_dict, "object_category_scope"), ): - for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + for cat_value in sub_meta_rules[relation_id][cat]: scope = cat_func( - admin_user["id"], - ref["id"], + admin_subject_id, + authz_ie_dict["id"], cat_value ) - a_scope = rule.pop(0) - self.assertIn(a_scope, scope[func_name][cat_value]) + a_scope = rule_value.pop(0) + self.assertIn(a_scope, scope.keys()) + # TODO: add test for the delete function -- cgit 1.2.3-korg