From 5cf620d23a75e064fa6662c07587a34c1b9c6234 Mon Sep 17 00:00:00 2001 From: asteroide Date: Tue, 21 Jul 2015 11:33:25 +0200 Subject: Modify __load_* functions and object and action related functions. Change-Id: I9016d1202d8fa93997f069706f1abe2e68a4c176 --- keystone-moon/keystone/contrib/moon/core.py | 451 ++++++++++++--------- .../unit/test_unit_core_intra_extension_admin.py | 166 ++++---- .../unit/test_unit_core_intra_extension_authz.py | 52 +-- 3 files changed, 359 insertions(+), 310 deletions(-) (limited to 'keystone-moon/keystone') diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py index 723569cd..4134c497 100644 --- a/keystone-moon/keystone/contrib/moon/core.py +++ b/keystone-moon/keystone/contrib/moon/core.py @@ -93,9 +93,8 @@ def enforce(action_names, object_name, **extra): intra_admin_extension_id = tenants_dict[tenant_id]['intra_admin_extension_id'] tenant_name = tenants_dict[tenant_id]['name'] - # func.func_globals["_admin_extension_uuid"] = _admin_extension_uuid if not intra_admin_extension_id: - raise TenantNoIntraAdminExtension() + args[0].moonlog_api.warning("No admin IntraExtension found, authorization granted by default.") return func(*args) else: authz_result = False @@ -398,32 +397,32 @@ class IntraExtensionManager(manager.Manager): f = open(metadata_path) json_perimeter = json.load(f) - subject_categories_dict = dict() + # subject_categories_dict = dict() for _cat in json_perimeter['subject_categories']: - subject_categories_dict[uuid4().hex] = {"name": _cat} - self.driver.set_subject_category_dict(intra_extension_dict["id"], subject_categories_dict) + self.driver.set_subject_category_dict(intra_extension_dict["id"], uuid4().hex, + {"name": _cat, "description": _cat}) # Initialize scope categories - for _cat in subject_categories_dict.keys(): - self.driver.set_subject_scope_dict(intra_extension_dict["id"], _cat, {}) - intra_extension_dict['subject_categories'] = subject_categories_dict + # for _cat in subject_categories_dict.keys(): + # self.driver.set_subject_scope_dict(intra_extension_dict["id"], _cat, {}) + # intra_extension_dict['subject_categories'] = subject_categories_dict - object_categories_dict = dict() + # object_categories_dict = dict() for _cat in json_perimeter['object_categories']: - object_categories_dict[uuid4().hex] = {"name": _cat} - self.driver.set_object_category_dict(intra_extension_dict["id"], object_categories_dict) + self.driver.set_object_category_dict(intra_extension_dict["id"], uuid4().hex, + {"name": _cat, "description": _cat}) # Initialize scope categories - for _cat in object_categories_dict.keys(): - self.driver.set_object_scope_dict(intra_extension_dict["id"], _cat, {}) - intra_extension_dict['object_categories'] = object_categories_dict + # for _cat in object_categories_dict.keys(): + # self.driver.set_object_scope_dict(intra_extension_dict["id"], _cat, {}) + # intra_extension_dict['object_categories'] = object_categories_dict - action_categories_dict = dict() + # action_categories_dict = dict() for _cat in json_perimeter['action_categories']: - action_categories_dict[uuid4().hex] = {"name": _cat} - self.driver.set_action_category_dict(intra_extension_dict["id"], action_categories_dict) + self.driver.set_action_category_dict(intra_extension_dict["id"], uuid4().hex, + {"name": _cat, "description": _cat}) # Initialize scope categories - for _cat in action_categories_dict.keys(): - self.driver.set_action_scope_dict(intra_extension_dict["id"], _cat, {}) - intra_extension_dict['action_categories'] = action_categories_dict + # for _cat in action_categories_dict.keys(): + # self.driver.set_action_scope_dict(intra_extension_dict["id"], _cat, {}) + # intra_extension_dict['action_categories'] = action_categories_dict def __load_perimeter_file(self, intra_extension_dict, policy_dir): @@ -436,20 +435,23 @@ class IntraExtensionManager(manager.Manager): for _subject in json_perimeter['subjects']: user = self.identity_api.get_user_by_name(_subject, "default") subject_dict[user["id"]] = user - self.driver.set_subject_dict(intra_extension_dict["id"], subject_dict) + subject_dict[user["id"]].pop("id") + self.driver.set_subject_dict(intra_extension_dict["id"], user["id"]) intra_extension_dict["subjects"] = subject_dict - # Copy all values for objects and subjects + # Copy all values for objects and actions object_dict = dict() for _object in json_perimeter['objects']: - object_dict[uuid4().hex] = {"name": _object} - self.driver.set_object_dict(intra_extension_dict["id"], object_dict) + _id = uuid4().hex + object_dict[_id] = {"name": _object, "description": _object} + self.driver.set_object_dict(intra_extension_dict["id"], _id, object_dict[_id]) intra_extension_dict["objects"] = object_dict action_dict = dict() for _action in json_perimeter['actions']: - action_dict[uuid4().hex] = {"name": _action} - self.driver.set_action_dict(intra_extension_dict["id"], action_dict) + _id = uuid4().hex + action_dict[_id] = {"name": _action, "description": _action} + self.driver.set_action_dict(intra_extension_dict["id"], _id, action_dict[_id]) intra_extension_dict["ations"] = action_dict def __load_scope_file(self, intra_extension_dict, policy_dir): @@ -460,29 +462,32 @@ class IntraExtensionManager(manager.Manager): intra_extension_dict['subject_category_scope'] = dict() for category, scope in json_perimeter["subject_category_scope"].iteritems(): - category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.SUBJECT_CATEGORY) + category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.SUBJECT_CATEGORY) _scope_dict = dict() for _scope in scope: - _scope_dict[uuid4().hex] = {"name": _scope} - self.driver.set_subject_scope_dict(intra_extension_dict["id"], category, _scope_dict) + _id = uuid4().hex + _scope_dict[_id] = {"name": _scope, "description": _scope} + self.driver.set_subject_scope_dict(intra_extension_dict["id"], category_id, _id, _scope_dict[_id]) intra_extension_dict['subject_category_scope'][category] = _scope_dict intra_extension_dict['object_category_scope'] = dict() for category, scope in json_perimeter["object_category_scope"].iteritems(): - category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.OBJECT_CATEGORY) + category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.OBJECT_CATEGORY) _scope_dict = dict() for _scope in scope: - _scope_dict[uuid4().hex] = {"name": _scope} - self.driver.set_object_scope_dict(intra_extension_dict["id"], category, _scope_dict) + _id = uuid4().hex + _scope_dict[_id] = {"name": _scope, "description": _scope} + self.driver.set_object_scope_dict(intra_extension_dict["id"], category_id, _id, _scope_dict[_id]) intra_extension_dict['object_category_scope'][category] = _scope_dict intra_extension_dict['action_category_scope'] = dict() for category, scope in json_perimeter["action_category_scope"].iteritems(): - category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.ACTION_CATEGORY) + category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category, self.driver.ACTION_CATEGORY) _scope_dict = dict() for _scope in scope: - _scope_dict[uuid4().hex] = {"name": _scope} - self.driver.set_action_scope_dict(intra_extension_dict["id"], category, _scope_dict) + _id = uuid4().hex + _scope_dict[_id] = {"name": _scope, "description": _scope} + self.driver.set_action_scope_dict(intra_extension_dict["id"], category_id, _scope_dict[_id]) intra_extension_dict['action_category_scope'][category] = _scope_dict def __load_assignment_file(self, intra_extension_dict, policy_dir): @@ -492,67 +497,60 @@ class IntraExtensionManager(manager.Manager): subject_assignments = dict() for category_name, value in json_assignments['subject_assignments'].iteritems(): - category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.SUBJECT_CATEGORY) + category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.SUBJECT_CATEGORY) for user_name in value: - user_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], user_name, self.driver.SUBJECT) - if user_id not in subject_assignments: - subject_assignments[user_id] = dict() - if category not in subject_assignments[user_id]: - subject_assignments[user_id][category] = \ + subject_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], user_name, self.driver.SUBJECT) + if subject_id not in subject_assignments: + subject_assignments[subject_id] = dict() + if category_id not in subject_assignments[subject_id]: + subject_assignments[subject_id][category_id] = \ map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.SUBJECT_SCOPE, category_name), value[user_name]) else: - subject_assignments[user_id][category].extend( + subject_assignments[subject_id][category_id].extend( map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.SUBJECT_SCOPE, category_name), value[user_name]) ) - # Note (dthom): subject_category_assignment must be initialized because when there is no data in json - # we will not go through the for loop - self.driver.set_subject_assignment_list(intra_extension_dict["id"]) - for subject in subject_assignments: - self.driver.set_subject_assignment_list(intra_extension_dict["id"], subject, subject_assignments[subject]) + self.driver.set_subject_assignment_list(intra_extension_dict["id"], subject_id, category_id, + subject_assignments[subject_id][category_id]) object_assignments = dict() for category_name, value in json_assignments["object_assignments"].iteritems(): - category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.OBJECT_CATEGORY) + category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.OBJECT_CATEGORY) for object_name in value: + object_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], object_name, self.driver.OBJECT) if object_name not in object_assignments: - object_assignments[object_name] = dict() - if category not in object_assignments[object_name]: - object_assignments[object_name][category] = \ + object_assignments[object_id] = dict() + if category_id not in object_assignments[object_name]: + object_assignments[object_id][category_id] = \ map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.OBJECT_SCOPE, category_name), value[object_name]) else: - object_assignments[object_name][category].extend( + object_assignments[object_id][category_id].extend( map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.OBJECT_SCOPE, category_name), value[object_name]) ) - # Note (dthom): object_category_assignment must be initialized because when there is no data in json - # we will not go through the for loop - self.driver.set_object_assignment_list(intra_extension_dict["id"]) - for object in object_assignments: - self.driver.set_object_assignment_list(intra_extension_dict["id"], object, object_assignments[object]) + self.driver.set_object_assignment_list(intra_extension_dict["id"], object_id, category_id, + object_assignments[object_id][category_id]) action_assignments = dict() for category_name, value in json_assignments["action_assignments"].iteritems(): - category = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.ACTION_CATEGORY) + category_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], category_name, self.driver.ACTION_CATEGORY) for action_name in value: + action_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], action_name, self.driver.ACTION) if action_name not in action_assignments: - action_assignments[action_name] = dict() - if category not in action_assignments[action_name]: - action_assignments[action_name][category] = \ + action_assignments[action_id] = dict() + if category_id not in action_assignments[action_name]: + action_assignments[action_id][category_id] = \ map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.ACTION_SCOPE, category_name), value[action_name]) else: - action_assignments[action_name][category].extend( + action_assignments[action_id][category_id].extend( map(lambda x: self.driver.get_uuid_from_name(intra_extension_dict["id"], x, self.driver.ACTION_SCOPE, category_name), value[action_name]) ) - # Note (dthom): action_category_assignment must be initialized because when there is no data in json - # we will not go through the for loop - self.driver.set_action_assignment_list(intra_extension_dict["id"]) - for action in action_assignments: - self.driver.set_action_assignment_list(intra_extension_dict["id"], action, action_assignments[action]) + self.driver.set_action_assignment_list(intra_extension_dict["id"], action_id, category_id, + action_assignments[action_id][category_id]) def __load_metarule_file(self, intra_extension_dict, policy_dir): @@ -567,18 +565,25 @@ class IntraExtensionManager(manager.Manager): "action_categories": self.driver.ACTION_CATEGORY } # Translate value from JSON file to UUID for Database - for relation in json_metarule["sub_meta_rules"]: - metarule[relation] = dict() + for metarule_name in json_metarule["sub_meta_rules"]: + _id = uuid4().hex + metarule[_id] = dict() + metarule[_id]["name"] = metarule_name for item in ("subject_categories", "object_categories", "action_categories"): - metarule[relation][item] = list() - for element in json_metarule["sub_meta_rules"][relation][item]: - metarule[relation][item].append(self.driver.get_uuid_from_name(intra_extension_dict["id"], element, categories[item])) - metarule[relation]["algorithm"] = json_metarule["sub_meta_rules"][relation]["algorithm"] + metarule[_id][item] = list() + for element in json_metarule["sub_meta_rules"][metarule_name][item]: + metarule[[_id]][item].append(self.driver.get_uuid_from_name(intra_extension_dict["id"], element, categories[item])) + metarule[[_id]]["algorithm"] = json_metarule["sub_meta_rules"][metarule_name]["algorithm"] + self.driver.set_sub_meta_rule_dict(intra_extension_dict["id"], _id, metarule[[_id]]) submetarules = { "aggregation": json_metarule["aggregation"], "sub_meta_rules": metarule } - self.driver.set_sub_meta_rule_dict(intra_extension_dict["id"], submetarules) + self.driver.set_aggregation_algorithm(intra_extension_dict["id"], uuid4().hex, + { + "name": json_metarule["aggregation"], + "description": json_metarule["aggregation"], + }) def __load_rule_file(self, intra_extension_dict, policy_dir): @@ -589,59 +594,47 @@ class IntraExtensionManager(manager.Manager): # Translate value from JSON file to UUID for Database rules = dict() sub_meta_rules = self.driver.get_sub_meta_rules_dict(intra_extension_dict["id"]) - for relation in json_rules: - # print(relation) + for sub_rule_name in json_rules: + sub_rule_id = self.driver.get_uuid_from_name(intra_extension_dict["id"], + sub_rule_name, + self.driver.SUB_META_RULE) + # print(sub_rule_name) # print(self.get_sub_meta_rule_relations("admin", ie["id"])) - # if relation not in self.get_sub_meta_rule_relations("admin", ie["id"])["sub_meta_rule_relations"]: - # raise IntraExtensionException("Bad relation name {} in rules".format(relation)) - rules[relation] = list() - for rule in json_rules[relation]: + # if sub_rule_name not in self.get_sub_meta_rule_relations("admin", ie["id"])["sub_meta_rule_relations"]: + # raise IntraExtensionException("Bad sub_rule_name name {} in rules".format(sub_rule_name)) + rules[sub_rule_id] = list() + for rule in json_rules[sub_rule_name]: subrule = list() _rule = list(rule) - for category_uuid in sub_meta_rules["rule"][relation]["subject_categories"]: + for category_uuid in sub_meta_rules["rule"][sub_rule_name]["subject_categories"]: scope_name = _rule.pop(0) scope_uuid = self.driver.get_uuid_from_name(intra_extension_dict["id"], scope_name, self.driver.SUBJECT_SCOPE, category_uuid=category_uuid) subrule.append(scope_uuid) - for category_uuid in sub_meta_rules["rule"][relation]["action_categories"]: + for category_uuid in sub_meta_rules["rule"][sub_rule_name]["action_categories"]: scope_name = _rule.pop(0) scope_uuid = self.driver.get_uuid_from_name(intra_extension_dict["id"], scope_name, self.driver.ACTION_SCOPE, category_uuid=category_uuid) subrule.append(scope_uuid) - for category_uuid in sub_meta_rules["rule"][relation]["object_categories"]: + for category_uuid in sub_meta_rules["rule"][sub_rule_name]["object_categories"]: scope_name = _rule.pop(0) scope_uuid = self.driver.get_uuid_from_name(intra_extension_dict["id"], scope_name, self.driver.OBJECT_SCOPE, category_uuid=category_uuid) subrule.append(scope_uuid) - # for cat, cat_func, cat_func_cat in ( - # ("subject_categories", self.driver.get_uuid_from_name, self.driver.SUBJECT_SCOPE), - # ("action_categories", self.driver.ACTION_SCOPE), - # ("object_categories", self.driver.OBJECT_SCOPE), - # ): - # for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: - # scope = cat_func( - # ie["id"], - # cat_value, - # cat_func_cat - # )[cat_func.__name__.replace("get_", "").replace("_dict", "")] - # - # _ = rule.pop(0) - # a_scope = self.driver.get_uuid_from_name(ie["id"], _, scope[cat_value]) - # subrule.append(a_scope) # if a positive/negative value exists, all item of rule have not be consumed if len(rule) >= 1 and type(rule[0]) is bool: subrule.append(rule[0]) else: # if value doesn't exist add a default value subrule.append(True) - rules[relation].append(subrule) - self.driver.set_rule_dict(intra_extension_dict["id"], rules) + rules[sub_rule_id].append(subrule) + self.driver.set_rule_dict(intra_extension_dict["id"], sub_rule_id, uuid4().hex, rules) def load_intra_extension_dict(self, user_id, intra_extension_dict): # TODO: check will be done through super_extension later @@ -726,16 +719,19 @@ class IntraExtensionManager(manager.Manager): def del_subject_category(self, user_id, intra_extension_id, subject_category_id): if subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id): raise SubjectCategoryUnknown() - # TODO (dthom): destroy category in scope - # self.driver.destroy_subject_category_in_scope(intra_extension_id, subject_category_id) - # TODO (dthom): destroy category-related assignment in assignments - # self.driver.destroy_subject_category_in_assignement(intra_extension_id, subject_category_id) - return self.driver.del_subject_category(intra_extension_id, subject_category_id) + # Destroy scopes related to this category + for scope in self.driver.get_subject_scope_dict(intra_extension_id, subject_category_id): + self.del_subject_scope(intra_extension_id, subject_category_id, scope) + # Destroy assignments related to this category + for subject_id in self.driver.get_subjects_dict(intra_extension_id): + for assignment_id in self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id): + self.driver.del_subject_assignment(intra_extension_id, subject_id, subject_category_id, assignment_id) + self.driver.del_subject_category(intra_extension_id, subject_category_id) @filter_args @enforce(("read", "write"), "subject_categories") def set_subject_category(self, user_id, intra_extension_id, subject_category_id, subject_category_dict): - for subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id): + if subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id): raise SubjectCategoryUnknown() return self.driver.set_subject_category(intra_extension_id, subject_category_id, subject_category_dict) @@ -757,10 +753,7 @@ class IntraExtensionManager(manager.Manager): for object_category_id in object_category_dict: if object_category_dict[object_category_id]["name"] is object_category_name: raise ObjectCategoryNameExisting() - object_category_id = uuid4().hex - # TODO (dthom): create category in scope - # self.driver.create_object_category_in_scope(intra_extension_id, object_category_id) - return self.driver.add_object_category(intra_extension_id, object_category_id, object_category_name) + return self.driver.add_object_category(intra_extension_id, uuid4().hex, object_category_name) @filter_args @enforce("read", "object_categories") @@ -775,14 +768,16 @@ class IntraExtensionManager(manager.Manager): @enforce(("read", "write"), "object_scopes") @enforce(("read", "write"), "object_assignments") def del_object_category(self, user_id, intra_extension_id, object_category_id): - object_category_dict = self.driver.get_object_categories_dict(intra_extension_id) - if object_category_id not in object_category_dict: + if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): raise ObjectCategoryUnknown() - # TODO (dthom): destroy category in scope - # self.driver.destroy_object_category_in_scope(intra_extension_id, object_category_id) - # TODO (dthom): destroy category-related assignment in assignments - # self.driver.destroy_object_category_in_assignement(intra_extension_id, object_category_id) - return self.driver.del_object_category(intra_extension_id, object_category_id) + # Destroy scopes related to this category + for scope in self.driver.get_object_scopes_dict(intra_extension_id, object_category_id): + self.del_object_scope(intra_extension_id, object_category_id, scope) + # Destroy assignments related to this category + for object_id in self.driver.get_objects_dict(intra_extension_id): + for assignment_id in self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id): + self.driver.del_object_assignment(intra_extension_id, object_id, object_category_id, assignment_id) + self.driver.del_object_category(intra_extension_id, object_category_id) @filter_args @enforce("read", "action_categories") @@ -802,10 +797,7 @@ class IntraExtensionManager(manager.Manager): for action_category_id in action_category_dict: if action_category_dict[action_category_id]['name'] is action_category_name: raise ActionCategoryNameExisting() - action_category_id = uuid4().hex - # TODO (dthom): create category in scope - # self.driver.create_action_category_in_scope(intra_extension_id, action_category_id) - return self.driver.add_action_category(intra_extension_id, action_category_id, action_category_name) + return self.driver.add_action_category(intra_extension_id, uuid4().hex, action_category_name) @filter_args @enforce("read", "action_categories") @@ -819,14 +811,16 @@ class IntraExtensionManager(manager.Manager): @enforce(("read", "write"), "action_categories") @enforce(("read", "write"), "action_category_scopes") def del_action_category(self, user_id, intra_extension_id, action_category_id): - action_category_dict = self.driver.get_action_categories_dict(intra_extension_id) - if action_category_id not in action_category_dict: + if action_category_id not in self.driver.get_action_categories_dict(intra_extension_id): raise ActionCategoryUnknown() - # TODO (dthom): destroy category in scope - # self.driver.destroy_action_category_in_scope(intra_extension_id, action_category_id) - # TODO (dthom): destroy category-related assignment in assignement - # self.driver.destroy_action_category_in_assignement(intra_extension_id, action_category_id) - return self.driver.del_action_category(intra_extension_id, action_category_id) + # Destroy scopes related to this category + for scope in self.driver.get_action_scopes_dict(intra_extension_id, action_category_id): + self.del_action_scope(intra_extension_id, action_category_id, scope) + # Destroy assignments related to this category + for action_id in self.driver.get_actions_dict(intra_extension_id): + for assignment_id in self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id): + self.driver.del_action_assignment(intra_extension_id, action_id, action_category_id, assignment_id) + self.driver.del_action_category(intra_extension_id, action_category_id) # Perimeter functions @@ -859,7 +853,11 @@ class IntraExtensionManager(manager.Manager): def del_subject(self, user_id, intra_extension_id, subject_id): if subject_id in self.driver.get_subjects_dict(intra_extension_id): raise SubjectUnknown() - # TODO (dthom): destroy item-related assignment + # Destroy assignments related to this category + for subject_category_id in self.driver.get_subject_categories_dict(intra_extension_id): + for _subject_id in self.driver.get_subjects_dict(intra_extension_id): + for assignment_id in self.driver.get_subject_assignment_list(intra_extension_id, _subject_id, subject_category_id): + self.driver.del_subject_assignment(intra_extension_id, _subject_id, subject_category_id, assignment_id) self.driver.del_subject(intra_extension_id, subject_id) @filter_args @@ -875,22 +873,30 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce("read", "objects") - def get_object_dict(self, user_id, intra_extension_id): + def get_objects_dict(self, user_id, intra_extension_id): return self.driver.get_objects_dict(intra_extension_id) @filter_args @enforce(("read", "write"), "objects") - def add_object(self, user_id, intra_extension_id, object_name): + def add_object_dict(self, user_id, intra_extension_id, object_name): object_dict = self.driver.get_objects_dict(intra_extension_id) for object_id in object_dict: if object_dict[object_id]["name"] is object_name: raise ObjectNameExisting() - object_id = uuid4().hex - return self.driver.add_object(intra_extension_id, object_id, object_name) + return self.driver.set_object_dict(intra_extension_id, uuid4().hex, object_name) + + @filter_args + @enforce(("read", "write"), "objects") + def set_object_dict(self, user_id, intra_extension_id, object_id, object_dict): + objects_dict = self.driver.get_objects_dict(intra_extension_id) + for object_id in objects_dict: + if objects_dict[object_id]["name"] is object_dict['name']: + raise ObjectNameExisting() + return self.driver.set_object_dict(intra_extension_id, object_id, object_dict) @filter_args @enforce("read", "objects") - def get_object(self, user_id, intra_extension_id, object_id): + def get_object_dict(self, user_id, intra_extension_id, object_id): object_dict = self.driver.get_objects_dict(intra_extension_id) if object_id in object_dict: raise ObjectUnknown() @@ -901,27 +907,39 @@ class IntraExtensionManager(manager.Manager): def del_object(self, user_id, intra_extension_id, object_id): if object_id in self.driver.get_objects_dict(intra_extension_id): raise ObjectUnknown() - # TODO (dthom): destroy item-related assignment - return self.driver.del_object(intra_extension_id, object_id) + # Destroy assignments related to this category + for object_category_id in self.driver.get_object_categories_dict(intra_extension_id): + for _object_id in self.driver.get_objects_dict(intra_extension_id): + for assignment_id in self.driver.get_object_assignment_list(intra_extension_id, _object_id, object_category_id): + self.driver.del_object_assignment(intra_extension_id, _object_id, object_category_id, assignment_id) + self.driver.del_object(intra_extension_id, object_id) @filter_args @enforce("read", "actions") - def get_action_dict(self, user_id, intra_extension_id): + def get_actions_dict(self, user_id, intra_extension_id): return self.driver.get_actions_dict(intra_extension_id) @filter_args @enforce(("read", "write"), "actions") - def add_action(self, user_id, intra_extension_id, action_name): + def add_action_dict(self, user_id, intra_extension_id, action_name): action_dict = self.driver.get_actions_dict(intra_extension_id) for action_id in action_dict: if action_dict[action_id]["name"] is action_name: raise ActionNameExisting() - action_id = uuid4().hex - return self.driver.add_action(intra_extension_id, action_id, action_name) + return self.driver.add_action_dict(intra_extension_id, uuid4().hex, action_name) + + @filter_args + @enforce(("read", "write"), "actions") + def set_action_dict(self, user_id, intra_extension_id, action_id, action_dict): + actions_dict = self.driver.get_actions_dict(intra_extension_id) + for action_id in actions_dict: + if actions_dict[action_id]["name"] is action_dict['name']: + raise ActionNameExisting() + return self.driver.set_action_dict(intra_extension_id, action_id, action_dict) @filter_args @enforce("read", "actions") - def get_action(self, user_id, intra_extension_id, action_id): + def get_action_dict(self, user_id, intra_extension_id, action_id): action_dict = self.driver.get_actions_dict(intra_extension_id) if action_id in action_dict: raise ActionUnknown() @@ -932,7 +950,11 @@ class IntraExtensionManager(manager.Manager): def del_action(self, user_id, intra_extension_id, action_id): if action_id in self.driver.get_actions_dict(intra_extension_id): raise ActionUnknown() - # TODO (dthom): destroy item-related assignment + # Destroy assignments related to this category + for action_category_id in self.driver.get_action_categories_dict(intra_extension_id): + for _action_id in self.driver.get_actions_dict(intra_extension_id): + for assignment_id in self.driver.get_action_assignment_list(intra_extension_id, _action_id, action_category_id): + self.driver.del_action_assignment(intra_extension_id, _action_id, action_category_id, assignment_id) return self.driver.del_action(intra_extension_id, action_id) # Scope functions @@ -990,8 +1012,16 @@ class IntraExtensionManager(manager.Manager): raise SubjectCategoryUnknown() if subject_scope_id not in self.driver.get_subject_scopes_dict(intra_extension_id, subject_category_id): raise SubjectScopeUnknown() - # TODO (dthom): destroy scope-related assignment - # TODO (dthom): destroy scope-related rule + # Destroy scope-related assignment + for subject_id in self.driver.get_subjects_dict(intra_extension_id): + for assignment_id in self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id): + self.driver.del_subject_assignment(intra_extension_id, subject_id, subject_category_id, assignment_id) + # Destroy scope-related rule + for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id): + rules_dict = self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id) + for rule_id in rules_dict: + if subject_scope_id in rules_dict[rule_id]: + self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id) return self.driver.del_subject_scope(intra_extension_id, subject_category_id, subject_scope_id) @filter_args @@ -1009,7 +1039,7 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce("read", "object_category_scopes") @enforce("read", "object_categories") - def get_object_scope_dict(self, user_id, intra_extension_id, object_category_id): + def get_object_scopes_dict(self, user_id, intra_extension_id, object_category_id): if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): raise ObjectCategoryUnknown() return self.driver.get_object_scopes_dict(intra_extension_id, object_category_id) @@ -1017,7 +1047,7 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce(("read", "write"), "object_scopes") @enforce("read", "object_categories") - def add_object_scope(self, user_id, intra_extension_id, object_category_id, object_scope_name): + def add_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_name): if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): raise ObjectCategoryUnknown() object_scope_dict = self.driver.get_object_scopes_dict(intra_extension_id, object_category_id) @@ -1034,7 +1064,7 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce("read", "object_scopes") @enforce("read", "object_categories") - def get_object_scope(self, user_id, intra_extension_id, object_category_id, object_scope_id): + def get_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_id): if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): raise ObjectCategoryUnknown() object_scopte_dict = self.driver.get_object_scopes_dict(intra_extension_id, object_category_id) @@ -1050,14 +1080,34 @@ class IntraExtensionManager(manager.Manager): raise ObjectCategoryUnknown() if object_scope_id not in self.driver.get_object_scopes_dict(intra_extension_id, object_category_id): raise ObjectScopeUnknown() - # TODO (dthom): destroy scope-related assignment - # TODO (dthom): destroy scope-related rule + # Destroy scope-related assignment + for object_id in self.driver.get_objects_dict(intra_extension_id): + for assignment_id in self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id): + self.driver.del_object_assignment(intra_extension_id, object_id, object_category_id, assignment_id) + # Destroy scope-related rule + for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id): + rules_dict = self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id) + for rule_id in rules_dict: + if object_scope_id in rules_dict[rule_id]: + self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id) return self.driver.del_object_scope(intra_extension_id, object_category_id, object_scope_id) + @filter_args + @enforce(("read", "write"), "object_scopes") + @enforce("read", "object_categories") + def set_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_id, object_scope_name): + if object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): + raise ObjectCategoryUnknown() + object_scope_dict = self.driver.get_object_scopes_dict(intra_extension_id, object_category_id) + for _object_scope_id in object_scope_dict: + if object_scope_name is object_scope_dict[_object_scope_id]['name']: + raise ObjectScopeNameExisting() + return self.driver.set_object_scope_dict(intra_extension_id, object_category_id, uuid4().hex, object_scope_dict) + @filter_args @enforce("read", "action_category_scopes") @enforce("read", "action_categories") - def get_action_scope_dict(self, user_id, intra_extension_id, action_category_id): + def get_action_scopes_dict(self, user_id, intra_extension_id, action_category_id): if action_category_id not in self.driver.get_object_categories_dict(intra_extension_id): raise ActionCategoryUnknown() return self.driver.get_action_scopes_dict(intra_extension_id, action_category_id) @@ -1065,7 +1115,7 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce(("read", "write"), "action_scopes") @enforce("read", "action_categories") - def add_action_scope(self, user_id, intra_extension_id, action_category_id, action_scope_name): + def add_action_scope_dict(self, user_id, intra_extension_id, action_category_id, action_scope_name): if action_category_id not in self.driver.get_action_categories_dict(intra_extension_id): raise ActionCategoryUnknown() action_scope_dict = self.driver.get_action_scopes_dict(intra_extension_id, action_category_id) @@ -1073,7 +1123,7 @@ class IntraExtensionManager(manager.Manager): if action_scope_name is action_scope_dict[_action_scope_id]['name']: raise ActionScopeNameExisting() action_scope_id = uuid4().hex - return self.driver.add_action_scope( + return self.driver.add_action_scope_dict( intra_extension_id, action_category_id, action_scope_id, @@ -1082,7 +1132,7 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce("read", "action_scopes") @enforce("read", "action_categories") - def get_action_scope(self, user_id, intra_extension_id, action_category_id, action_scope_id): + def get_action_scope_dict(self, user_id, intra_extension_id, action_category_id, action_scope_id): if action_category_id not in self.driver.get_action_categories_dict(intra_extension_id): raise ActionCategoryUnknown() action_scopte_dict = self.driver.get_action_scopes_dict(intra_extension_id, action_category_id) @@ -1098,18 +1148,37 @@ class IntraExtensionManager(manager.Manager): raise ActionCategoryUnknown() if action_scope_id not in self.driver.get_action_scopes_dict(intra_extension_id, action_category_id): raise ActionScopeUnknown() - # TODO (dthom): destroy scope-related assignment - # TODO (dthom): destroy scope-related rule + # Destroy scope-related assignment + for action_id in self.driver.get_actions_dict(intra_extension_id): + for assignment_id in self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id): + self.driver.del_action_assignment(intra_extension_id, action_id, action_category_id, assignment_id) + # Destroy scope-related rule + for sub_meta_rule_id in self.driver.get_sub_meta_rules_dict(intra_extension_id): + rules_dict = self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id) + for rule_id in rules_dict: + if action_scope_id in rules_dict[rule_id]: + self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id) return self.driver.del_action_scope(intra_extension_id, action_category_id, action_scope_id) # Assignment functions + @filter_args + @enforce("read", "subject_assignments") + @enforce("read", "subjects") + @enforce("read", "subject_categories") + def get_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id): + if subject_id not in self.driver.get_subjects_dict(user_id, intra_extension_id): + raise SubjectUnknown() + elif subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id): + raise SubjectCategoryUnknown() + return self.driver.get_subject_assignment_list(intra_extension_id, subject_id, subject_category_id) + @filter_args @enforce(("read", "write"), "subject_assignments") @enforce("read", "subjects") @enforce("read", "subject_categories") @enforce("read", "subject_scopes") - def add_subject_assignment(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id): + def add_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id): if subject_id not in self.driver.get_subjects_dict(intra_extension_id): raise SubjectUnknown() if subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id): @@ -1120,17 +1189,6 @@ class IntraExtensionManager(manager.Manager): raise SubjectAssignmentExisting() return self.driver.add_subject_assignment_list(intra_extension_id, subject_id, subject_category_id, subject_scope_id) - @filter_args - @enforce("read", "subject_assignments") - @enforce("read", "subjects") - @enforce("read", "subject_categories") - def get_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id): - if subject_id not in self.driver.get_subjects_dict(user_id, intra_extension_id): - raise SubjectUnknown() - elif subject_category_id not in self.driver.get_subject_categories_dict(intra_extension_id): - raise SubjectCategoryUnknown() - return self.driver.get_subject_assignment_list(intra_extension_id, subject_id)[subject_category_id] - @filter_args @enforce(("read", "write"), "subject_assignments") @enforce("read", "subjects") @@ -1150,16 +1208,19 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce("read", "object_assignments") @enforce("read", "objects") - def get_object_assignment_dict(self, user_id, intra_extension_id, object_id): - if object_id not in self.get_object_dict(user_id, intra_extension_id): + @enforce("read", "object_categories") + def get_object_assignment_list(self, user_id, intra_extension_id, object_id, object_category_id): + if object_id not in self.driver.get_objects_dict(user_id, intra_extension_id): raise ObjectUnknown() - return self.driver.get_object_assignment_list(intra_extension_id, object_id) + elif object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): + raise ObjectCategoryUnknown() + return self.driver.get_object_assignment_list(intra_extension_id, object_id, object_category_id) @filter_args @enforce(("read", "write"), "object_assignments") @enforce("read", "objects") @enforce("read", "object_categories") - def add_object_assignment(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id): + def add_object_assignment_list(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id): if object_id not in self.driver.get_objects_dict(intra_extension_id): raise ObjectUnknown() elif object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): @@ -1170,17 +1231,6 @@ class IntraExtensionManager(manager.Manager): raise ObjectAssignmentExisting() return self.driver.add_object_assignment_list(intra_extension_id, object_id, object_category_id, object_scope_id) - @filter_args - @enforce("read", "object_assignments") - @enforce("read", "objects") - @enforce("read", "object_categories") - def get_object_assignment(self, user_id, intra_extension_id, object_id, object_category_id): - if object_id not in self.driver.get_objects_dict(user_id, intra_extension_id): - raise ObjectUnknown() - elif object_category_id not in self.driver.get_object_categories_dict(intra_extension_id): - raise ObjectCategoryUnknown() - return self.driver.get_object_assignment_list(intra_extension_id, object_id)[object_category_id] - @filter_args @enforce(("read", "write"), "object_assignments") @enforce("read", "objects") @@ -1200,16 +1250,19 @@ class IntraExtensionManager(manager.Manager): @filter_args @enforce("read", "action_assignments") @enforce("read", "actions") - def get_action_assignment_dict(self, user_id, intra_extension_id, action_id): - if action_id not in self.get_action_dict(user_id, intra_extension_id): + @enforce("read", "action_categories") + def get_action_assignment_list(self, user_id, intra_extension_id, action_id, action_category_id): + if action_id not in self.driver.get_actions_dict(user_id, intra_extension_id): raise ActionUnknown() - return self.driver.get_action_assignment_list(intra_extension_id, action_id) + elif action_category_id not in self.driver.get_action_categories_dict(intra_extension_id): + raise ActionCategoryUnknown() + return self.driver.get_action_assignment_list(intra_extension_id, action_id, action_category_id) @filter_args @enforce(("read", "write"), "action_assignments") @enforce("read", "actions") @enforce("read", "action_categories") - def add_action_assignment(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id): + def add_action_assignment_list(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id): if action_id not in self.driver.get_actions_dict(intra_extension_id): raise ActionUnknown() elif action_category_id not in self.driver.get_action_categories_dict(intra_extension_id): @@ -1220,17 +1273,6 @@ class IntraExtensionManager(manager.Manager): raise ObjectAssignmentExisting() return self.driver.add_action_assignment_list(intra_extension_id, action_id, action_category_id, action_scope_id) - @filter_args - @enforce("read", "action_assignments") - @enforce("read", "actions") - @enforce("read", "action_categories") - def get_action_assignment(self, user_id, intra_extension_id, action_id, action_category_id): - if action_id not in self.driver.get_actions_dict(user_id, intra_extension_id): - raise ActionUnknown() - elif action_category_id not in self.driver.get_action_categories_dict(intra_extension_id): - raise ActionCategoryUnknown() - return self.driver.get_action_assignment_list(intra_extension_id, action_id)[action_category_id] - @filter_args @enforce(("read", "write"), "action_assignments") @enforce("read", "actions") @@ -1562,6 +1604,7 @@ class IntraExtensionDriver(object): SUBJECT_SCOPE = 'subject_scope' OBJECT_SCOPE = 'object_scope' ACTION_SCOPE = 'action_scope' + SUB_META_RULE = 'sub_meta_rule' def __get_data_from_type(self, intra_extension_uuid, @@ -1614,8 +1657,8 @@ class IntraExtensionDriver(object): elif data_name == self.SUBJECT_SCOPE: if not category_uuid: category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.SUBJECT_CATEGORY) - data_values = self.get_subject_category_scope_dict(intra_extension_uuid, - category_uuid)["subject_category_scope"] + data_values = self.get_subject_scopes_dict(intra_extension_uuid, + category_uuid)["subject_category_scope"] if (name and name not in extract_name(data_values)) or \ (uuid and uuid not in data_values.keys()): raise SubjectScopeUnknown() @@ -1623,7 +1666,7 @@ class IntraExtensionDriver(object): if not category_uuid: category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.OBJECT_CATEGORY) data_values = self.get_object_scopes_dict(intra_extension_uuid, - category_uuid)["object_category_scope"] + category_uuid)["object_category_scope"] if (name and name not in extract_name(data_values)) or \ (uuid and uuid not in data_values.keys()): raise ObjectScopeUnknown() @@ -1631,10 +1674,15 @@ class IntraExtensionDriver(object): if not category_uuid: category_uuid = self.get_uuid_from_name(intra_extension_uuid, category_name, self.ACTION_CATEGORY) data_values = self.get_action_scopes_dict(intra_extension_uuid, - category_uuid)["action_category_scope"] + category_uuid)["action_category_scope"] if (name and name not in extract_name(data_values)) or \ (uuid and uuid not in data_values.keys()): raise ActionScopeUnknown() + elif data_name == self.SUB_META_RULE: + data_values = self.get_sub_meta_rules_dict(intra_extension_uuid)["sub_meta_rule"] + if (name and name not in extract_name(data_values)) or \ + (uuid and uuid not in data_values.keys()): + raise SubMetaRuleUnknown() if category_uuid: return data_values[category_uuid] return data_values @@ -1833,6 +1881,7 @@ class IntraExtensionDriver(object): def del_rule(self, intra_extension_id, sub_meta_rule_id, rule_id): raise exception.NotImplemented() # pragma: no cover + class LogDriver(object): def authz(self, message): diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py index cfa2eb4f..2039c348 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py @@ -152,7 +152,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.create_user("admin") self.create_intra_extension() - objects = self.manager.get_object_dict("admin", self.ref["id"]) + objects = self.manager.get_objects_dict("admin", self.ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -174,7 +174,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): # Delete the new object self.manager.del_object("admin", self.ref["id"], new_object["id"]) - objects = self.manager.get_object_dict("admin", self.ref["id"]) + objects = self.manager.get_objects_dict("admin", self.ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -183,13 +183,13 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertNotIn(new_object["id"], objects["objects"]) # Add a particular object - objects = self.manager.add_object("admin", self.ref["id"], new_object["name"]) + objects = self.manager.add_object_dict("admin", self.ref["id"], new_object["name"]) self.assertIsInstance(objects, dict) self.assertIn("object", objects) self.assertIn("uuid", objects["object"]) self.assertEqual(new_object["name"], objects["object"]["name"]) new_object["id"] = objects["object"]["uuid"] - objects = self.manager.get_object_dict("admin", self.ref["id"]) + objects = self.manager.get_objects_dict("admin", self.ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -202,7 +202,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.create_user("admin") self.create_intra_extension() - actions = self.manager.get_action_dict("admin", self.ref["id"]) + actions = self.manager.get_actions_dict("admin", self.ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -224,7 +224,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): # Delete the new action self.manager.del_action("admin", self.ref["id"], new_action["id"]) - actions = self.manager.get_action_dict("admin", self.ref["id"]) + actions = self.manager.get_actions_dict("admin", self.ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -233,13 +233,13 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertNotIn(new_action["id"], actions["actions"]) # Add a particular action - actions = self.manager.add_action("admin", self.ref["id"], new_action["name"]) + actions = self.manager.add_action_dict("admin", self.ref["id"], new_action["name"]) self.assertIsInstance(actions, dict) self.assertIn("action", actions) self.assertIn("uuid", actions["action"]) self.assertEqual(new_action["name"], actions["action"]["name"]) new_action["id"] = actions["action"]["uuid"] - actions = self.manager.get_action_dict("admin", self.ref["id"]) + actions = self.manager.get_actions_dict("admin", self.ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -508,7 +508,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): ) for object_category in object_categories["object_categories"]: - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( "admin", self.ref["id"], object_category) @@ -541,7 +541,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.ref["id"], object_category, new_object_category_scope_uuid) - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( "admin", self.ref["id"], object_category) @@ -553,7 +553,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) # Add a particular object_category_scope - object_category_scope = self.manager.add_object_scope( + object_category_scope = self.manager.add_object_scope_dict( "admin", self.ref["id"], object_category, @@ -563,7 +563,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertIn("uuid", object_category_scope["object_category_scope"]) self.assertEqual(new_object_category_scope[new_object_category_scope_uuid], object_category_scope["object_category_scope"]["name"]) - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( "admin", self.ref["id"], object_category) @@ -589,7 +589,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): ) for action_category in action_categories["action_categories"]: - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( "admin", self.ref["id"], action_category) @@ -622,7 +622,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.ref["id"], action_category, new_action_category_scope_uuid) - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( "admin", self.ref["id"], action_category) @@ -634,7 +634,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) # Add a particular action_category_scope - action_category_scope = self.manager.add_action_scope( + action_category_scope = self.manager.add_action_scope_dict( "admin", self.ref["id"], action_category, @@ -644,7 +644,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertIn("uuid", action_category_scope["action_category_scope"]) self.assertEqual(new_action_category_scope[new_action_category_scope_uuid], action_category_scope["action_category_scope"]["name"]) - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( "admin", self.ref["id"], action_category) @@ -782,7 +782,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): {new_subject_category_uuid: [new_subject_category_scope2_uuid, ]}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) - data = self.manager.add_subject_assignment( + data = self.manager.add_subject_assignment_list( "admin", self.ref["id"], new_subject["id"], @@ -825,7 +825,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): ) for object_category in object_categories["object_categories"]: - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( "admin", self.ref["id"], object_category) @@ -868,7 +868,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], object_category_scope["object_category_scope"][object_category].values()) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( "admin", self.ref["id"], new_object["id"] @@ -896,7 +896,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertEqual( {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, object_category_assignments["object_category_assignments"][new_object["id"]]) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( "admin", self.ref["id"], new_object["id"] @@ -917,7 +917,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): new_object_category_uuid, new_object_category_scope_uuid ) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( "admin", self.ref["id"], new_object["id"] @@ -931,7 +931,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): {new_object_category_uuid: [new_object_category_scope2_uuid, ]}, object_category_assignments["object_category_assignments"][new_object["id"]]) - self.manager.add_object_assignment( + self.manager.add_object_assignment_list( "admin", self.ref["id"], new_object["id"], @@ -939,7 +939,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): new_object_category_scope_uuid ) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( "admin", self.ref["id"], new_object["id"] @@ -974,7 +974,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): ) for action_category in action_categories["action_categories"]: - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( "admin", self.ref["id"], action_category) @@ -1017,7 +1017,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], action_category_scope["action_category_scope"][action_category].values()) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( "admin", self.ref["id"], new_action["id"] @@ -1045,7 +1045,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): self.assertEqual( {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, action_category_assignments["action_category_assignments"][new_action["id"]]) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( "admin", self.ref["id"], new_action["id"] @@ -1066,7 +1066,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): new_action_category_uuid, new_action_category_scope_uuid ) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( "admin", self.ref["id"], new_action["id"] @@ -1080,7 +1080,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): {new_action_category_uuid: [new_action_category_scope2_uuid, ]}, action_category_assignments["action_category_assignments"][new_action["id"]]) - self.manager.add_action_assignment( + self.manager.add_action_assignment_list( "admin", self.ref["id"], new_action["id"], @@ -1088,7 +1088,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): new_action_category_scope_uuid ) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( "admin", self.ref["id"], new_action["id"] @@ -1197,8 +1197,8 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( @@ -1216,8 +1216,8 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): sub_rule = [] for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( @@ -1239,8 +1239,8 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase): for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( @@ -1391,10 +1391,10 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( ObjectReadNotAuthorized, - self.manager.get_object_dict, + self.manager.get_objects_dict, demo_user["id"], ref["id"]) - objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + objects = self.manager.get_objects_dict(admin_user["id"], ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -1427,7 +1427,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): demo_user["id"], ref["id"], new_object["id"]) self.manager.del_object(admin_user["id"], ref["id"], new_object["id"]) - objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + objects = self.manager.get_objects_dict(admin_user["id"], ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -1438,16 +1438,16 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): # Add a particular object self.assertRaises( ObjectAddNotAuthorized, - self.manager.add_object, + self.manager.add_object_dict, demo_user["id"], ref["id"], new_object["name"]) - objects = self.manager.add_object(admin_user["id"], ref["id"], new_object["name"]) + objects = self.manager.add_object_dict(admin_user["id"], ref["id"], new_object["name"]) self.assertIsInstance(objects, dict) self.assertIn("object", objects) self.assertIn("uuid", objects["object"]) self.assertEqual(new_object["name"], objects["object"]["name"]) new_object["id"] = objects["object"]["uuid"] - objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + objects = self.manager.get_objects_dict(admin_user["id"], ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -1462,10 +1462,10 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( ActionReadNotAuthorized, - self.manager.get_action_dict, + self.manager.get_actions_dict, demo_user["id"], ref["id"]) - actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + actions = self.manager.get_actions_dict(admin_user["id"], ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -1498,7 +1498,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): demo_user["id"], ref["id"], new_action["id"]) self.manager.del_action(admin_user["id"], ref["id"], new_action["id"]) - actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + actions = self.manager.get_actions_dict(admin_user["id"], ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -1509,16 +1509,16 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): # Add a particular action self.assertRaises( ActionAddNotAuthorized, - self.manager.add_action, + self.manager.add_action_dict, demo_user["id"], ref["id"], new_action["name"]) - actions = self.manager.add_action(admin_user["id"], ref["id"], new_action["name"]) + actions = self.manager.add_action_dict(admin_user["id"], ref["id"], new_action["name"]) self.assertIsInstance(actions, dict) self.assertIn("action", actions) self.assertIn("uuid", actions["action"]) self.assertEqual(new_action["name"], actions["action"]["name"]) new_action["id"] = actions["action"]["uuid"] - actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + actions = self.manager.get_actions_dict(admin_user["id"], ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -1879,10 +1879,10 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): for object_category in object_categories["object_categories"]: self.assertRaises( ObjectCategoryScopeReadNotAuthorized, - self.manager.get_object_scope_dict, + self.manager.get_object_scopes_dict, demo_user["id"], ref["id"], object_category) - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( admin_user["id"], ref["id"], object_category) @@ -1926,7 +1926,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): ref["id"], object_category, new_object_category_scope_uuid) - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( admin_user["id"], ref["id"], object_category) @@ -1940,12 +1940,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): # Add a particular object_category_scope self.assertRaises( ObjectCategoryScopeAddNotAuthorized, - self.manager.add_object_scope, + self.manager.add_object_scope_dict, demo_user["id"], ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid] ) - object_category_scope = self.manager.add_object_scope( + object_category_scope = self.manager.add_object_scope_dict( admin_user["id"], ref["id"], object_category, @@ -1955,7 +1955,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertIn("uuid", object_category_scope["object_category_scope"]) self.assertEqual(new_object_category_scope[new_object_category_scope_uuid], object_category_scope["object_category_scope"]["name"]) - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( admin_user["id"], ref["id"], object_category) @@ -1983,10 +1983,10 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): for action_category in action_categories["action_categories"]: self.assertRaises( ActionCategoryScopeReadNotAuthorized, - self.manager.get_object_scope_dict, + self.manager.get_object_scopes_dict, demo_user["id"], ref["id"], action_category) - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( admin_user["id"], ref["id"], action_category) @@ -2032,7 +2032,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): ref["id"], action_category, new_action_category_scope_uuid) - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( admin_user["id"], ref["id"], action_category) @@ -2046,12 +2046,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): # Add a particular action_category_scope self.assertRaises( ActionCategoryScopeAddNotAuthorized, - self.manager.add_action_scope, + self.manager.add_action_scope_dict, demo_user["id"], ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid] ) - action_category_scope = self.manager.add_action_scope( + action_category_scope = self.manager.add_action_scope_dict( admin_user["id"], ref["id"], action_category, @@ -2061,7 +2061,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertIn("uuid", action_category_scope["action_category_scope"]) self.assertEqual(new_action_category_scope[new_action_category_scope_uuid], action_category_scope["action_category_scope"]["name"]) - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( admin_user["id"], ref["id"], action_category) @@ -2223,13 +2223,13 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( SubjectCategoryAssignmentAddNotAuthorized, - self.manager.add_subject_assignment, + self.manager.add_subject_assignment_list, demo_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid ) - data = self.manager.add_subject_assignment( + data = self.manager.add_subject_assignment_list( admin_user["id"], ref["id"], new_subject["id"], @@ -2272,7 +2272,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): ) for object_category in object_categories["object_categories"]: - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( admin_user["id"], ref["id"], object_category) @@ -2317,11 +2317,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( ObjectCategoryAssignmentReadNotAuthorized, - self.manager.get_object_assignment_dict, + self.manager.get_object_assignment_list, demo_user["id"], ref["id"], new_object["id"] ) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( admin_user["id"], ref["id"], new_object["id"] @@ -2358,7 +2358,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertEqual( {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, object_category_assignments["object_category_assignments"][new_object["id"]]) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( admin_user["id"], ref["id"], new_object["id"] @@ -2387,7 +2387,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): new_object_category_uuid, new_object_category_scope_uuid ) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( admin_user["id"], ref["id"], new_object["id"] @@ -2403,13 +2403,13 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( ObjectCategoryAssignmentAddNotAuthorized, - self.manager.add_object_assignment, + self.manager.add_object_assignment_list, demo_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid ) - self.manager.add_object_assignment( + self.manager.add_object_assignment_list( admin_user["id"], ref["id"], new_object["id"], @@ -2417,7 +2417,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): new_object_category_scope_uuid ) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( admin_user["id"], ref["id"], new_object["id"] @@ -2452,7 +2452,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): ) for action_category in action_categories["action_categories"]: - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( admin_user["id"], ref["id"], action_category) @@ -2497,11 +2497,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( ActionCategoryAssignmentReadNotAuthorized, - self.manager.get_action_assignment_dict, + self.manager.get_action_assignment_list, demo_user["id"], ref["id"], new_action["id"] ) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( admin_user["id"], ref["id"], new_action["id"] @@ -2538,7 +2538,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertEqual( {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, action_category_assignments["action_category_assignments"][new_action["id"]]) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( admin_user["id"], ref["id"], new_action["id"] @@ -2567,7 +2567,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): new_action_category_uuid, new_action_category_scope_uuid ) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( admin_user["id"], ref["id"], new_action["id"] @@ -2583,13 +2583,13 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): self.assertRaises( ActionCategoryAssignmentAddNotAuthorized, - self.manager.add_action_assignment, + self.manager.add_action_assignment_list, demo_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid ) - self.manager.add_action_assignment( + self.manager.add_action_assignment_list( admin_user["id"], ref["id"], new_action["id"], @@ -2597,7 +2597,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): new_action_category_scope_uuid ) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( admin_user["id"], ref["id"], new_action["id"] @@ -2738,8 +2738,8 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( @@ -2757,8 +2757,8 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): sub_rule = [] for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( @@ -2786,8 +2786,8 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase): for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py index 5eeed82a..80c0598e 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py @@ -175,7 +175,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) # Test when subject and object are known but not the action - _tmp = self.manager.add_object( + _tmp = self.manager.add_object_dict( admin_user['id'], self.ref["id"], "my_object" @@ -189,7 +189,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) # Test when subject and object and action are known - _tmp = self.manager.add_action( + _tmp = self.manager.add_action_dict( admin_user['id'], self.ref["id"], "my_action" @@ -232,7 +232,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) my_object_category = {"id": _tmp[0], "name": _tmp[1]} - _tmp = self.manager.add_object_scope( + _tmp = self.manager.add_object_scope_dict( admin_user['id'], self.ref["id"], my_object_category["id"], @@ -254,7 +254,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) my_action_category = {"id": _tmp[0], "name": _tmp[1]} - _tmp = self.manager.add_action_scope( + _tmp = self.manager.add_action_scope_dict( admin_user['id'], self.ref["id"], my_action_category["id"], @@ -269,7 +269,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) # Add a subject assignment and test ObjectCategoryAssignmentUnknown - self.manager.add_subject_assignment( + self.manager.add_subject_assignment_list( admin_user['id'], self.ref["id"], demo_user["id"], @@ -284,7 +284,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) # Add an object assignment and test ActionCategoryAssignmentUnknown - self.manager.add_object_assignment( + self.manager.add_object_assignment_list( admin_user['id'], self.ref["id"], demo_user["id"], @@ -299,7 +299,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) # Add an action assignment and test RuleUnknown - self.manager.add_action_assignment( + self.manager.add_action_assignment_list( admin_user['id'], self.ref["id"], demo_user["id"], @@ -381,7 +381,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ref_admin = self.create_intra_extension("policy_admin") self.create_mapping(tenant, ref["id"], ref_admin["id"]) - objects = self.manager.get_object_dict(admin_user["id"], tenant["id"]) + objects = self.manager.get_objects_dict(admin_user["id"], tenant["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) @@ -406,7 +406,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): # Add a particular object self.assertRaises( ObjectAddNotAuthorized, - self.manager.add_object, + self.manager.add_object_dict, admin_user["id"], ref["id"], new_object["name"]) def test_actions(self): @@ -417,7 +417,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ref_admin = self.create_intra_extension("policy_admin") self.create_mapping(tenant, ref["id"], ref_admin["id"]) - actions = self.manager.get_action_dict(admin_user["id"], tenant["id"]) + actions = self.manager.get_actions_dict(admin_user["id"], tenant["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) @@ -442,7 +442,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): # Add a particular action self.assertRaises( ActionAddNotAuthorized, - self.manager.add_action, + self.manager.add_action_dict, admin_user["id"], ref["id"], new_action["id"]) def test_subject_categories(self): @@ -620,7 +620,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) for object_category in object_categories["object_categories"]: - object_category_scope = self.manager.get_object_scope_dict( + object_category_scope = self.manager.get_object_scopes_dict( admin_user["id"], ref["id"], object_category) @@ -648,7 +648,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): # Add a particular object_category_scope self.assertRaises( ObjectCategoryScopeAddNotAuthorized, - self.manager.add_object_scope, + self.manager.add_object_scope_dict, admin_user["id"], ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) def test_action_category_scope(self): @@ -669,7 +669,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) for action_category in action_categories["action_categories"]: - action_category_scope = self.manager.get_action_scope_dict( + action_category_scope = self.manager.get_action_scopes_dict( admin_user["id"], ref["id"], action_category) @@ -697,7 +697,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): # Add a particular action_category_scope self.assertRaises( ActionCategoryScopeAddNotAuthorized, - self.manager.add_action_scope, + self.manager.add_action_scope_dict, admin_user["id"], ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) def test_subject_category_assignment(self): @@ -796,7 +796,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): self.assertRaises( SubjectCategoryAssignmentAddNotAuthorized, - self.manager.add_subject_assignment, + self.manager.add_subject_assignment_list, admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) @@ -825,7 +825,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) for object_category in object_categories["object_categories"]: - object_category_scope = self.admin_manager.get_object_scope_dict( + object_category_scope = self.admin_manager.get_object_scopes_dict( admin_user["id"], ref["id"], object_category) @@ -868,7 +868,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], object_category_scope["object_category_scope"][object_category].values()) - object_category_assignments = self.manager.get_object_assignment_dict( + object_category_assignments = self.manager.get_object_assignment_list( admin_user["id"], ref["id"], new_object["id"] @@ -897,7 +897,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): self.assertRaises( ObjectCategoryAssignmentAddNotAuthorized, - self.manager.add_object_assignment, + self.manager.add_object_assignment_list, admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) @@ -926,7 +926,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): ) for action_category in action_categories["action_categories"]: - action_category_scope = self.admin_manager.get_action_scope_dict( + action_category_scope = self.admin_manager.get_action_scopes_dict( admin_user["id"], ref["id"], action_category) @@ -969,7 +969,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], action_category_scope["action_category_scope"][action_category].values()) - action_category_assignments = self.manager.get_action_assignment_dict( + action_category_assignments = self.manager.get_action_assignment_list( admin_user["id"], ref["id"], new_action["id"] @@ -998,7 +998,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): self.assertRaises( ActionCategoryAssignmentAddNotAuthorized, - self.manager.add_action_assignment, + self.manager.add_action_assignment_list, admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) @@ -1101,8 +1101,8 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( @@ -1119,8 +1119,8 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): sub_rule = [] for cat, cat_func, func_name in ( ("subject_categories", self.manager.get_subject_scopes_dict, "subject_category_scope"), - ("action_categories", self.manager.get_action_scope_dict, "action_category_scope"), - ("object_categories", self.manager.get_object_scope_dict, "object_category_scope"), + ("action_categories", self.manager.get_action_scopes_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_scopes_dict, "object_category_scope"), ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( -- cgit 1.2.3-korg