diff options
4 files changed, 236 insertions, 95 deletions
diff --git a/keystone-moon/keystone/contrib/moon/backends/sql.py b/keystone-moon/keystone/contrib/moon/backends/sql.py index c2f384bd..ceb057bd 100644 --- a/keystone-moon/keystone/contrib/moon/backends/sql.py +++ b/keystone-moon/keystone/contrib/moon/backends/sql.py @@ -306,7 +306,6 @@ class Rule(sql.ModelBase, sql.DictBase): __all_objects__ = ( - Tenant, Subject, Object, Action, @@ -901,6 +900,13 @@ class IntraExtensionConnector(IntraExtensionDriver): ref = query.first() return {ref.id: ref.aggregation_algorithm} + def del_aggregation_algorithm(self, intra_extension_id, aggregation_algorithm_id): + with sql.transaction() as session: + query = session.query(AggregationAlgorithm) + query = query.filter_by(intra_extension_id=intra_extension_id, id=aggregation_algorithm_id) + ref = query.first() + session.delete(ref) + # Getter and Setter for sub_meta_rule def get_sub_meta_rules_dict(self, intra_extension_id): diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py index a42a7912..d9a59ff3 100644 --- a/keystone-moon/keystone/contrib/moon/core.py +++ b/keystone-moon/keystone/contrib/moon/core.py @@ -25,9 +25,9 @@ from keystone.contrib.moon.algorithms import * CONF = config.CONF LOG = log.getLogger(__name__) -# TODO: call functions to get these 2 variables -ADMIN_ID = uuid4().hex # default user_id for internal invocation -ROOT_EXTENSION_ID = uuid4().hex +ADMIN_ID = None # default user_id for internal invocation +ROOT_EXTENSION_ID = None +ROOT_EXTENSION_MODEL = "policy_root" _OPTS = [ @@ -107,9 +107,31 @@ def enforce(action_names, object_name, **extra): _action_name_list = action_names _object_name = object_name + def get_root_extension(self, args, kwargs): + if not ROOT_EXTENSION_ID: + global ROOT_EXTENSION_MODEL, ROOT_EXTENSION_ID, ADMIN_ID + try: + # if it is the first time we passed here, the root extension may be not initialized + # specially during unittest. So we raise RootExtensionNotInitialized to authorize the + # current creation process + if 'intra_extension_dict' in kwargs: + intra_extension_dict = kwargs['intra_extension_dict'] + else: + intra_extension_dict = args[2] + print(intra_extension_dict) + if isinstance(intra_extension_dict, dict) and \ + "model" in intra_extension_dict and \ + intra_extension_dict["model"] == "policy_root": + raise RootExtensionNotInitialized() + except KeyError: + pass + return ROOT_EXTENSION_ID + def wrap(func): def wrapped(*args, **kwargs): + global ADMIN_ID, ROOT_EXTENSION_ID + returned_value_for_func = None self = args[0] try: user_id = args[1] @@ -118,64 +140,85 @@ def enforce(action_names, object_name, **extra): intra_extension_id = None intra_admin_extension_id = None - if user_id == ADMIN_ID: - # TODO: check if there is no security hole here - return func(*args, **kwargs) - + try: + intra_admin_extension_id = get_root_extension(self, args, kwargs) + except RootExtensionNotInitialized: + returned_value_for_func = func(*args, **kwargs) + intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict() + for ext in intra_extensions_dict: + if intra_extensions_dict[ext]["model"] == ROOT_EXTENSION_MODEL: + ROOT_EXTENSION_ID = ext + break + if not ROOT_EXTENSION_ID: + raise RootExtensionUnknown() + print(returned_value_for_func) + subjects_dict = self.admin_api.driver.get_subjects_dict(returned_value_for_func['id']) + for subject_id in subjects_dict: + if subjects_dict[subject_id]["name"] == "admin": + ADMIN_ID = subject_id + break + if not ADMIN_ID: + raise RootExtensionUnknown() + return returned_value_for_func try: intra_extension_id = args[2] except IndexError: if 'intra_extension_id' in kwargs: intra_extension_id = kwargs['intra_extension_id'] - else: - intra_admin_extension_id = ROOT_EXTENSION_ID - - intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict() - if intra_extension_id not in intra_extensions_dict: - raise IntraExtensionUnknown() - tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID) - for _tenant_id in tenants_dict: - if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \ - tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id: - intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id'] - if not intra_admin_extension_id: - self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.") - return func(*args, **kwargs) + # else: + # intra_admin_extension_id = get_root_extension(self) + + if ADMIN_ID and user_id == ADMIN_ID: + # TODO: check if there is no security hole here + returned_value_for_func = func(*args, **kwargs) else: - objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id) - object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name - object_id = None - for _object_id in objects_dict: - if objects_dict[_object_id]['name'] is object_name: - object_id = _object_id - break - if type(_action_name_list) in (str, unicode): - action_name_list = (_action_name_list, ) + intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict() + if intra_extension_id not in intra_extensions_dict: + raise IntraExtensionUnknown() + tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID) + for _tenant_id in tenants_dict: + if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \ + tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id: + intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id'] + if not intra_admin_extension_id: + self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.") + returned_value_for_func = func(*args, **kwargs) else: - action_name_list = _action_name_list - actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id) - action_id_list = list() - for _action_name in action_name_list: - for _action_id in actions_dict: - if actions_dict[_action_id]['name'] is _action_name: - action_id_list.append(_action_id) + objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id) + object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name + object_id = None + for _object_id in objects_dict: + if objects_dict[_object_id]['name'] is object_name: + object_id = _object_id break - - authz_result = False - for action_id in action_id_list: - if self.driver.authz(intra_admin_extension_id, user_id, object_id, action_id): - authz_result = True + if type(_action_name_list) in (str, unicode): + action_name_list = (_action_name_list, ) else: - authz_result = False - break - if authz_result: - return func(*args, **kwargs) + action_name_list = _action_name_list + actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id) + action_id_list = list() + for _action_name in action_name_list: + for _action_id in actions_dict: + if actions_dict[_action_id]['name'] is _action_name: + action_id_list.append(_action_id) + break + + authz_result = False + for action_id in action_id_list: + if self.driver.authz(intra_admin_extension_id, user_id, object_id, action_id): + authz_result = True + else: + authz_result = False + break + if authz_result: + returned_value_for_func = func(*args, **kwargs) + return returned_value_for_func return wrapped return wrap @dependency.provider('configuration_api') -@dependency.requires('moonlog_api') +@dependency.requires('moonlog_api', 'admin_api') class ConfigurationManager(manager.Manager): def __init__(self): @@ -230,7 +273,7 @@ class ConfigurationManager(manager.Manager): return None @dependency.provider('tenant_api') -@dependency.requires('moonlog_api', 'admin_api') +@dependency.requires('moonlog_api', 'admin_api', 'configuration_api') class TenantManager(manager.Manager): def __init__(self): @@ -259,8 +302,6 @@ class TenantManager(manager.Manager): def add_tenant_dict(self, user_id, tenant_dict): tenants_dict = self.driver.get_tenants_dict() for tenant_id in tenants_dict: - print(tenants_dict[tenant_id]) - print(tenant_dict) if tenants_dict[tenant_id]['name'] == tenant_dict['name']: raise TenantAddedNameExisting() @@ -396,6 +437,7 @@ class IntraExtensionManager(manager.Manager): """ authz_buffer = self.__get_authz_buffer(intra_extension_id, subject_id, object_id, action_id) decision_buffer = dict() + decision = False meta_rule_dict = self.driver.get_sub_meta_rules_dict(intra_extension_id) @@ -412,9 +454,10 @@ class IntraExtensionManager(manager.Manager): self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id).values()) if meta_rule_dict['aggregation'] == 'all_true': - return all_true(decision_buffer) - - return False + decision = all_true(decision_buffer) + if not decision: + raise AuthzException() + return decision @enforce("read", "intra_extensions") def get_intra_extensions_dict(self, user_id): @@ -715,6 +758,33 @@ class IntraExtensionManager(manager.Manager): def del_intra_extension(self, user_id, intra_extension_id): if intra_extension_id not in self.driver.get_intra_extensions_dict(): raise IntraExtensionUnknown() + for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id): + for rule_id in self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id): + self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id) + self.driver.del_sub_meta_rule(intra_extension_id, sub_meta_rule_id) + for aggregation_algorithm_id in self.driver.get_aggregation_algorithms_dict(intra_extension_id): + self.driver.del_aggregation_algorithm(intra_extension_id, aggregation_algorithm_id) + for subject_id in self.driver.get_subjects_dict(intra_extension_id): + self.driver.del_subject(intra_extension_id, subject_id) + for object_id in self.driver.get_objects_dict(intra_extension_id): + self.driver.del_object(intra_extension_id, object_id) + for action_id in self.driver.get_actions_dict(intra_extension_id): + self.driver.del_action(intra_extension_id, action_id) + for subject_category_id in self.driver.get_subject_categories_dict(intra_extension_id): + for subject_scope_id in self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id): + self.driver.del_subject_assignment(intra_extension_id, subject_id, subject_category_id, subject_scope_id) + self.driver.del_subject_scope(intra_extension_id, subject_category_id, subject_scope_id) + self.driver.del_subject_category(intra_extension_id, subject_category_id) + for object_category_id in self.driver.get_object_categories_dict(intra_extension_id): + for object_scope_id in self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id): + self.driver.del_object_assignment(intra_extension_id, object_id, object_category_id, object_scope_id) + self.driver.del_object_scope(intra_extension_id, object_category_id, object_scope_id) + self.driver.del_object_category(intra_extension_id, object_category_id) + for action_category_id in self.driver.get_action_categories_dict(intra_extension_id): + for action_scope_id in self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id): + self.driver.del_action_assignment(intra_extension_id, action_id, action_category_id, action_scope_id) + self.driver.del_action_scope(intra_extension_id, action_category_id, action_scope_id) + self.driver.del_action_category(intra_extension_id, action_category_id) return self.driver.del_intra_extension(intra_extension_id) @enforce(("read", "write"), "intra_extensions") @@ -1418,8 +1488,8 @@ class IntraExtensionManager(manager.Manager): def del_sub_meta_rule(self, user_id, intra_extension_id, sub_meta_rule_id): if sub_meta_rule_id not in self.driver.get_sub_meta_rules_dict(intra_extension_id): raise SubMetaRuleUnknown() - # TODO (dthom): destroy sub-meta-rule-related rules - # self.driver.del_rule(intra_extension_id, sub_meta_rule_id, "*") + for rule_id in self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id): + self.del_rule(intra_extension_id, sub_meta_rule_id, rule_id) self.driver.del_sub_meta_rule(intra_extension_id, sub_meta_rule_id) @filter_input @@ -1499,7 +1569,6 @@ class IntraExtensionAuthzManager(IntraExtensionManager): super(IntraExtensionAuthzManager, self).__init__() def authz(self, tenant_name, subject_name, object_name, action_name, genre="authz"): - # TODO (dthom) add moon log """Check authorization for a particular action. :return: True or False or raise an exception """ @@ -1538,16 +1607,39 @@ class IntraExtensionAuthzManager(IntraExtensionManager): return super(IntraExtensionAuthzManager, self).authz(intra_extension_id, subject_id, object_id, action_id) def add_subject_dict(self, user_id, intra_extension_id, subject_dict): - # TODO: sync with intra_admin_extension subjects table, need double check in both authz and admin - return + subject = super(IntraExtensionAuthzManager, self).set_subject_dict(user_id, intra_extension_id, subject_dict) + tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID) + for tenant_id in tenants_dict: + if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_admin_extension_id"], subject['id'], subject_dict) + break + if tenants_dict[tenant_id]["intra_admin_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_authz_extension_id"], subject['id'], subject_dict) + break + return subject def del_subject(self, user_id, intra_extension_id, subject_id): - # TODO: sync with intra_admin_extension subjects table, need double check in both authz and admin - pass + super(IntraExtensionAuthzManager, self).del_subject(user_id, intra_extension_id, subject_id) + tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID) + for tenant_id in tenants_dict: + if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id: + self.driver.del_subject(tenants_dict[tenant_id]["intra_admin_extension_id"], subject_id) + break + if tenants_dict[tenant_id]["intra_admin_extension_id"] == intra_extension_id: + self.driver.del_subject(tenants_dict[tenant_id]["intra_authz_extension_id"], subject_id) + break def set_subject_dict(self, user_id, intra_extension_id, subject_id, subject_dict): - # TODO: sync with intra_admin_extension subjects table, need double check in both authz and admin - return + subject = super(IntraExtensionAuthzManager, self).set_subject_dict(user_id, intra_extension_id, subject_dict) + tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID) + for tenant_id in tenants_dict: + if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_admin_extension_id"], subject['id'], subject_dict) + break + if tenants_dict[tenant_id]["intra_admin_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_authz_extension_id"], subject['id'], subject_dict) + break + return subject def get_subject_categories_dict(self, user_id, intra_extension_id): raise AuthzException() @@ -1629,22 +1721,46 @@ class IntraExtensionAuthzManager(IntraExtensionManager): @dependency.provider('admin_api') +# @dependency.requires('configuration_api') class IntraExtensionAdminManager(IntraExtensionManager): def __init__(self): super(IntraExtensionAdminManager, self).__init__() def add_subject_dict(self, user_id, intra_extension_id, subject_dict): - # TODO: sync with intra_authz_extension subjects table, need double check in both authz and admin - return + subject = super(IntraExtensionAdminManager, self).set_subject_dict(user_id, intra_extension_id, subject_dict) + tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID) + for tenant_id in tenants_dict: + if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_admin_extension_id"], subject['id'], subject_dict) + break + if tenants_dict[tenant_id]["intra_admin_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_authz_extension_id"], subject['id'], subject_dict) + break + return subject def del_subject(self, user_id, intra_extension_id, subject_id): - # TODO: sync with intra_authz_extension subjects table, need double check in both authz and admin - pass + super(IntraExtensionAdminManager, self).del_subject(user_id, intra_extension_id, subject_id) + tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID) + for tenant_id in tenants_dict: + if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id: + self.driver.del_subject(tenants_dict[tenant_id]["intra_admin_extension_id"], subject_id) + break + if tenants_dict[tenant_id]["intra_admin_extension_id"] == intra_extension_id: + self.driver.del_subject(tenants_dict[tenant_id]["intra_authz_extension_id"], subject_id) + break def set_subject_dict(self, user_id, intra_extension_id, subject_id, subject_dict): - # TODO: sync with intra_authz_extension subjects table, need double check in both authz and admin - return + subject = super(IntraExtensionAdminManager, self).set_subject_dict(user_id, intra_extension_id, subject_dict) + tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID) + for tenant_id in tenants_dict: + if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_admin_extension_id"], subject['id'], subject_dict) + break + if tenants_dict[tenant_id]["intra_admin_extension_id"] == intra_extension_id: + self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_authz_extension_id"], subject['id'], subject_dict) + break + return subject def add_object_dict(self, user_id, intra_extension_id, object_name): raise ObjectsWriteNoAuthorized() @@ -2039,6 +2155,9 @@ class IntraExtensionDriver(object): def get_aggregation_algorithm(self, intra_extension_id): raise exception.NotImplemented() # pragma: no cover + def del_aggregation_algorithm(self, intra_extension_id, aggregation_algorithm_id): + raise exception.NotImplemented() # pragma: no cover + def get_sub_meta_rules_dict(self, intra_extension_id): raise exception.NotImplemented() # pragma: no cover diff --git a/keystone-moon/keystone/contrib/moon/exception.py b/keystone-moon/keystone/contrib/moon/exception.py index a53a3397..75ccd187 100644 --- a/keystone-moon/keystone/contrib/moon/exception.py +++ b/keystone-moon/keystone/contrib/moon/exception.py @@ -97,6 +97,12 @@ class RootExtensionUnknown(IntraExtensionUnknown): title = 'Root Extension Unknown' logger = "Error" +class RootExtensionNotInitialized(IntraExtensionException): + message_format = _("The root_extension is not initialized.") + code = 400 + title = 'Root Extension Not Initialized' + logger = "Error" + class IntraExtensionCreationError(IntraExtensionException): message_format = _("The arguments for the creation of this Extension were malformed.") diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py index d122b25b..6d656488 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py @@ -13,6 +13,7 @@ from keystone.tests.unit.ksfixtures import database from keystone.contrib.moon.exception import * from keystone.tests.unit import default_fixtures from keystone.contrib.moon.core import LogManager +from keystone.contrib.moon.core import ConfigurationManager from keystone.contrib.moon.core import ADMIN_ID from keystone.common import dependency @@ -31,6 +32,7 @@ IE = { @dependency.requires('admin_api') class TestTenantManager(tests.TestCase): + ADMIN_ID = None def setUp(self): self.useFixture(database.Database()) @@ -39,8 +41,13 @@ class TestTenantManager(tests.TestCase): self.load_fixtures(default_fixtures) self.admin = self.create_user(username="admin") self.demo = self.create_user(username="demo") - self.manager = TenantManager() self.root_intra_extension = self.create_intra_extension(policy_model="policy_root") + # force re-initialization of the ADMIN_ID variable + from keystone.contrib.moon.core import ADMIN_ID + self.ADMIN_ID = ADMIN_ID + self.manager = TenantManager() + self.configuration_api = ConfigurationManager() + # self.configuration_api.init_default_variables() def load_extra_backends(self): return { @@ -74,7 +81,10 @@ class TestTenantManager(tests.TestCase): if "authz" in policy_model: genre = "authz" IE["genre"] = genre - ref = self.admin_api.load_intra_extension_dict(ADMIN_ID, intra_extension_dict=IE) + # force re-initialization of the ADMIN_ID variable + from keystone.contrib.moon.core import ADMIN_ID + self.ADMIN_ID = ADMIN_ID + ref = self.admin_api.load_intra_extension_dict(self.ADMIN_ID, intra_extension_dict=IE) self.assertIsInstance(ref, dict) return ref @@ -88,16 +98,16 @@ class TestTenantManager(tests.TestCase): "intra_authz_extension": authz_intra_extension['id'], "intra_admin_extension": admin_intra_extension['id'], } - data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant) + data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertNotEqual(data, {}) - data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_tenant["intra_authz_extension"]) + data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_authz_extension"]) self.assertEquals(new_tenant["intra_authz_extension"], data["id"]) - data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_tenant["intra_admin_extension"]) + data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_admin_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data["id"]) def test_del_tenant(self): @@ -110,15 +120,15 @@ class TestTenantManager(tests.TestCase): "intra_authz_extension": authz_intra_extension['id'], "intra_admin_extension": admin_intra_extension['id'], } - data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant) + data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertNotEqual(data, {}) - self.manager.del_tenant(ADMIN_ID, new_tenant["id"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + self.manager.del_tenant(self.ADMIN_ID, new_tenant["id"]) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertEqual(data, {}) def test_set_tenant(self): @@ -131,25 +141,25 @@ class TestTenantManager(tests.TestCase): "intra_authz_extension": authz_intra_extension['id'], "intra_admin_extension": admin_intra_extension['id'], } - data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant) + data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertNotEqual(data, {}) new_tenant["name"] = "demo2" - data = self.manager.set_tenant_dict(user_id=ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant) + data = self.manager.set_tenant_dict(user_id=self.ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) def test_exception_tenant_unknown(self): - self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex) - self.assertRaises(TenantUnknown, self.manager.del_tenant, ADMIN_ID, uuid.uuid4().hex) - self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, ADMIN_ID, uuid.uuid4().hex, {}) + self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex) + self.assertRaises(TenantUnknown, self.manager.del_tenant, self.ADMIN_ID, uuid.uuid4().hex) + self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex, {}) authz_intra_extension = self.create_intra_extension(policy_model="policy_authz") admin_intra_extension = self.create_intra_extension(policy_model="policy_admin") @@ -160,15 +170,15 @@ class TestTenantManager(tests.TestCase): "intra_authz_extension": authz_intra_extension['id'], "intra_admin_extension": admin_intra_extension['id'], } - data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant) + data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertNotEqual(data, {}) - self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex) + self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex) def test_exception_tenant_added_name_existing(self): authz_intra_extension = self.create_intra_extension(policy_model="policy_authz") @@ -180,15 +190,15 @@ class TestTenantManager(tests.TestCase): "intra_authz_extension": authz_intra_extension['id'], "intra_admin_extension": admin_intra_extension['id'], } - data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant) + data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertNotEqual(data, {}) - self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, ADMIN_ID, new_tenant) + self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant) def test_exception_tenant_no_intra_extension(self): authz_intra_extension = self.create_intra_extension(policy_model="policy_authz") @@ -201,16 +211,16 @@ class TestTenantManager(tests.TestCase): "intra_admin_extension": admin_intra_extension['id'], } new_tenant['intra_authz_extension'] = None - self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, ADMIN_ID, new_tenant) + self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant) new_tenant['intra_authz_extension'] = authz_intra_extension['id'] - data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant) + data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant) self.assertEquals(new_tenant["id"], data["id"]) self.assertEquals(new_tenant["name"], data['tenant']["name"]) self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"]) self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"]) - data = self.manager.get_tenants_dict(ADMIN_ID) + data = self.manager.get_tenants_dict(self.ADMIN_ID) self.assertNotEqual(data, {}) new_tenant['intra_authz_extension'] = None new_tenant['name'] = "demo2" - self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, ADMIN_ID, new_tenant["id"], new_tenant) + self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, self.ADMIN_ID, new_tenant["id"], new_tenant) |