aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2015-07-08 09:17:35 +0000
committerGerrit Code Review <gerrit@172.30.200.206>2015-07-08 09:17:35 +0000
commitcbda180fc4370b8eda4b886f79bfbc0fb739608e (patch)
treefbda1b8dd5d778ad3ddee1022e21f7f77d6f302a
parent175e4b7ca2d7287f8a2404fb2a183e193ad55e15 (diff)
parent778fa0bc8228c523560674e509aecc01ba49a38e (diff)
Merge "Fix some bugs on the enforce function."
-rw-r--r--keystone-moon/keystone/contrib/moon/core.py443
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py38
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py85
3 files changed, 314 insertions, 252 deletions
diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py
index 4d19e7cc..db31f4b9 100644
--- a/keystone-moon/keystone/contrib/moon/core.py
+++ b/keystone-moon/keystone/contrib/moon/core.py
@@ -68,9 +68,11 @@ def filter_args(func):
def enforce(actions, object, **extra):
+ _actions = actions
+
def wrap(func):
def wrapped(*args):
- global actions
+ # global actions
self = args[0]
user_name = args[1]
intra_extension_uuid = args[2]
@@ -81,10 +83,12 @@ def enforce(actions, object, **extra):
return func(*args)
else:
_authz = False
- if type(actions) in (str, unicode):
- actions = (actions, )
+ if type(_actions) in (str, unicode):
+ actions = (_actions, )
+ else:
+ actions = _actions
for action in actions:
- if self.authz_api.authz(
+ if self.admin_api.authz(
intra_extension_uuid,
user_name,
object,
@@ -275,7 +279,7 @@ class TenantDriver:
raise exception.NotImplemented() # pragma: no cover
-@dependency.requires('identity_api', 'moonlog_api', 'tenant_api', 'authz_api')
+@dependency.requires('identity_api', 'moonlog_api', 'tenant_api', 'authz_api', 'admin_api')
class IntraExtensionManager(manager.Manager):
__genre__ = None
@@ -307,7 +311,10 @@ class IntraExtensionManager(manager.Manager):
raise IntraExtensionNotFound()
# self.moonlog_api.authz("Unknown: Authorization framework disabled ({} {} {} {})".format(uuid, sub, obj, act))
# self.moonlog_api.warning("Unknown: Authorization framework disabled ({} {} {} {})".format(uuid, sub, obj, act))
- # return True
+ users = self.driver.get_subject_dict(uuid)
+ if sub in users and users[sub] == "admin":
+ return True
+ return False
# #TODO (dthom): must raise IntraExtensionNotAuthorized
# try:
# _subject_category_dict = self.driver.get_subject_category_dict(extension_uuid)
@@ -322,82 +329,82 @@ class IntraExtensionManager(manager.Manager):
# return True
# except exception: # TODO: exception.IntraExtension.NotAuthorized
# pass
- sub_meta_rule = self.driver.get_meta_rule(uuid)
- subject_assignments = self.driver.get_subject_category_assignment_dict(uuid)
- action_assignments = self.driver.get_action_category_assignment_dict(uuid)
- object_assignments = self.driver.get_object_category_assignment_dict(uuid)
- # check if subject exists
- if sub not in self.driver.get_subject_dict(uuid):
- self.moonlog_api.authz("KO: Subject {} unknown".format(sub))
- return False
- # check if object exists
- if obj not in self.driver.get_object_dict(uuid):
- self.moonlog_api.authz("KO: Object {} unknown".format(obj))
- return False
- # check if action exists
- if act not in self.driver.get_action_dict(uuid):
- self.moonlog_api.authz("KO: Action {} unknown".format(act))
- return False
- # check if subject is in subject_assignment
- for cat in subject_assignments.keys():
- if sub in subject_assignments[cat]:
- break
- else:
- self.moonlog_api.authz("KO: Subject no found in categories {}".format(
- subject_assignments.keys()))
- return False
- # check if object is in object_assignment
- for cat in object_assignments.keys():
- if obj in object_assignments[cat]:
- break
- else:
- self.moonlog_api.authz("KO: Object no found in categories {}".format(
- object_assignments))
- return False
- # check if action is in action_assignment
- for cat in action_assignments.keys():
- if act in action_assignments[cat]:
- break
- else:
- self.moonlog_api.authz("KO: Action no found in categories {}".format(
- action_assignments.keys()))
- return False
- # get all rules for intra_extension
- rules = self.driver.get_rules(uuid)
- # check if relation exists in rules
- relation_to_check = None
- relations = self.driver.get_sub_meta_rule_relations(uuid)
- for relation in rules:
- if relation in relations:
- # hypothesis: only one relation to check
- relation_to_check = relation
- break
- else:
- self.moonlog_api.authz("KO: No relation can be used {}".format(rules.keys()))
- return False
- for sub_rule in rules[relation_to_check]:
- for cat in sub_meta_rule[relation_to_check]["subject_categories"]:
- rule_scope = sub_rule.pop(0)
- if rule_scope in subject_assignments[cat][sub]:
- break
- else:
- continue
- for cat in sub_meta_rule[relation_to_check]["action_categories"]:
- rule_scope = sub_rule.pop(0)
- if rule_scope in action_assignments[cat][act]:
- break
- else:
- continue
- for cat in sub_meta_rule[relation_to_check]["object_categories"]:
- rule_scope = sub_rule.pop(0)
- if rule_scope in object_assignments[cat][obj]:
- break
- else:
- continue
- self.moonlog_api.authz("OK ({} {},{},{})".format(uuid, sub, act, obj))
- return True
- self.moonlog_api.authz("KO ({} {},{},{})".format(uuid, sub, act, obj))
- return False
+ # sub_meta_rule = self.driver.get_meta_rule(uuid)
+ # subject_assignments = self.driver.get_subject_category_assignment_dict(uuid)
+ # action_assignments = self.driver.get_action_category_assignment_dict(uuid)
+ # object_assignments = self.driver.get_object_category_assignment_dict(uuid)
+ # # check if subject exists
+ # if sub not in self.driver.get_subject_dict(uuid):
+ # self.moonlog_api.authz("KO: Subject {} unknown".format(sub))
+ # return False
+ # # check if object exists
+ # if obj not in self.driver.get_object_dict(uuid):
+ # self.moonlog_api.authz("KO: Object {} unknown".format(obj))
+ # return False
+ # # check if action exists
+ # if act not in self.driver.get_action_dict(uuid):
+ # self.moonlog_api.authz("KO: Action {} unknown".format(act))
+ # return False
+ # # check if subject is in subject_assignment
+ # for cat in subject_assignments.keys():
+ # if sub in subject_assignments[cat]:
+ # break
+ # else:
+ # self.moonlog_api.authz("KO: Subject no found in categories {}".format(
+ # subject_assignments.keys()))
+ # return False
+ # # check if object is in object_assignment
+ # for cat in object_assignments.keys():
+ # if obj in object_assignments[cat]:
+ # break
+ # else:
+ # self.moonlog_api.authz("KO: Object no found in categories {}".format(
+ # object_assignments))
+ # return False
+ # # check if action is in action_assignment
+ # for cat in action_assignments.keys():
+ # if act in action_assignments[cat]:
+ # break
+ # else:
+ # self.moonlog_api.authz("KO: Action no found in categories {}".format(
+ # action_assignments.keys()))
+ # return False
+ # # get all rules for intra_extension
+ # rules = self.driver.get_rules(uuid)
+ # # check if relation exists in rules
+ # relation_to_check = None
+ # relations = self.driver.get_sub_meta_rule_relations(uuid)
+ # for relation in rules:
+ # if relation in relations:
+ # # hypothesis: only one relation to check
+ # relation_to_check = relation
+ # break
+ # else:
+ # self.moonlog_api.authz("KO: No relation can be used {}".format(rules.keys()))
+ # return False
+ # for sub_rule in rules[relation_to_check]:
+ # for cat in sub_meta_rule[relation_to_check]["subject_categories"]:
+ # rule_scope = sub_rule.pop(0)
+ # if rule_scope in subject_assignments[cat][sub]:
+ # break
+ # else:
+ # continue
+ # for cat in sub_meta_rule[relation_to_check]["action_categories"]:
+ # rule_scope = sub_rule.pop(0)
+ # if rule_scope in action_assignments[cat][act]:
+ # break
+ # else:
+ # continue
+ # for cat in sub_meta_rule[relation_to_check]["object_categories"]:
+ # rule_scope = sub_rule.pop(0)
+ # if rule_scope in object_assignments[cat][obj]:
+ # break
+ # else:
+ # continue
+ # self.moonlog_api.authz("OK ({} {},{},{})".format(uuid, sub, act, obj))
+ # return True
+ # self.moonlog_api.authz("KO ({} {},{},{})".format(uuid, sub, act, obj))
+ # return False
def __get_key_from_value(self, value, values_dict):
return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0]
@@ -677,12 +684,12 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "subjects")
- def get_subject_dict(self, user_name, intra_extension_uuid):
+ def get_subject_dict(self, user_uuid, intra_extension_uuid):
return self.driver.get_subject_dict(intra_extension_uuid)
@filter_args
@enforce(("read", "write"), "subjects")
- def set_subject_dict(self, user_name, intra_extension_uuid, subject_dict):
+ def set_subject_dict(self, user_uuid, intra_extension_uuid, subject_dict):
for uuid in subject_dict:
# Next line will raise an error if user is not present in Keystone database
self.identity_api.get_user(uuid)
@@ -690,70 +697,70 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce(("read", "write"), "subjects")
- def add_subject_dict(self, user_name, intra_extension_uuid, subject_uuid):
+ def add_subject_dict(self, user_uuid, intra_extension_uuid, subject_uuid):
# Next line will raise an error if user is not present in Keystone database
user = self.identity_api.get_user(subject_uuid)
return self.driver.add_subject(intra_extension_uuid, subject_uuid, user["name"])
@filter_args
@enforce("write", "subjects")
- def del_subject(self, user_name, intra_extension_uuid, subject_uuid):
+ def del_subject(self, user_uuid, intra_extension_uuid, subject_uuid):
self.driver.remove_subject(intra_extension_uuid, subject_uuid)
@filter_args
@enforce("read", "objects")
- def get_object_dict(self, user_name, intra_extension_uuid):
+ def get_object_dict(self, user_uuid, intra_extension_uuid):
return self.driver.get_object_dict(intra_extension_uuid)
@filter_args
@enforce(("read", "write"), "objects")
- def set_object_dict(self, user_name, intra_extension_uuid, object_dict):
+ def set_object_dict(self, user_uuid, intra_extension_uuid, object_dict):
return self.driver.set_object_dict(intra_extension_uuid, object_dict)
@filter_args
@enforce(("read", "write"), "objects")
- def add_object_dict(self, user_name, intra_extension_uuid, object_name):
+ def add_object_dict(self, user_uuid, intra_extension_uuid, object_name):
object_uuid = uuid4().hex
return self.driver.add_object(intra_extension_uuid, object_uuid, object_name)
@filter_args
@enforce("write", "objects")
- def del_object(self, user_name, intra_extension_uuid, object_uuid):
+ def del_object(self, user_uuid, intra_extension_uuid, object_uuid):
self.driver.remove_object(intra_extension_uuid, object_uuid)
@filter_args
@enforce("read", "actions")
- def get_action_dict(self, user_name, intra_extension_uuid):
+ def get_action_dict(self, user_uuid, intra_extension_uuid):
return self.driver.get_action_dict(intra_extension_uuid)
@filter_args
@enforce(("read", "write"), "actions")
- def set_action_dict(self, user_name, intra_extension_uuid, action_dict):
+ def set_action_dict(self, user_uuid, intra_extension_uuid, action_dict):
return self.driver.set_action_dict(intra_extension_uuid, action_dict)
@filter_args
@enforce(("read", "write"), "actions")
- def add_action_dict(self, user_name, intra_extension_uuid, action_name):
+ def add_action_dict(self, user_uuid, intra_extension_uuid, action_name):
action_uuid = uuid4().hex
return self.driver.add_action(intra_extension_uuid, action_uuid, action_name)
@filter_args
@enforce("write", "actions")
- def del_action(self, user_name, intra_extension_uuid, action_uuid):
+ def del_action(self, user_uuid, intra_extension_uuid, action_uuid):
self.driver.remove_action(intra_extension_uuid, action_uuid)
# Metadata functions
@filter_args
@enforce("read", "subject_categories")
- def get_subject_category_dict(self, user_name, intra_extension_uuid):
+ def get_subject_category_dict(self, user_uuid, intra_extension_uuid):
return self.driver.get_subject_category_dict(intra_extension_uuid)
@filter_args
@enforce("read", "subject_categories")
@enforce("read", "subject_category_scope")
@enforce("write", "subject_category_scope")
- def set_subject_category_dict(self, user_name, intra_extension_uuid, subject_category):
+ def set_subject_category_dict(self, user_uuid, intra_extension_uuid, subject_category):
subject_category_dict = self.driver.set_subject_category_dict(intra_extension_uuid, subject_category)
# if we add a new category, we must add it to the subject_category_scope
for _cat in subject_category.keys():
@@ -766,25 +773,25 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "subject_categories")
@enforce("write", "subject_categories")
- def add_subject_category_dict(self, user_name, intra_extension_uuid, subject_category_name):
+ def add_subject_category_dict(self, user_uuid, intra_extension_uuid, subject_category_name):
subject_category_uuid = uuid4().hex
return self.driver.add_subject_category_dict(intra_extension_uuid, subject_category_uuid, subject_category_name)
@filter_args
@enforce("write", "subject_categories")
- def del_subject_category(self, user_name, intra_extension_uuid, subject_uuid):
+ def del_subject_category(self, user_uuid, intra_extension_uuid, subject_uuid):
return self.driver.remove_subject_category(intra_extension_uuid, subject_uuid)
@filter_args
@enforce("read", "object_categories")
- def get_object_category_dict(self, user_name, intra_extension_uuid):
+ def get_object_category_dict(self, user_uuid, intra_extension_uuid):
return self.driver.get_object_category_dict(intra_extension_uuid)
@filter_args
@enforce("read", "object_categories")
@enforce("read", "object_category_scope")
@enforce("write", "object_category_scope")
- def set_object_category_dict(self, user_name, intra_extension_uuid, object_category):
+ def set_object_category_dict(self, user_uuid, intra_extension_uuid, object_category):
object_category_dict = self.driver.set_object_category_dict(intra_extension_uuid, object_category)
# if we add a new category, we must add it to the object_category_scope
for _cat in object_category.keys():
@@ -797,25 +804,25 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "object_categories")
@enforce("write", "object_categories")
- def add_object_category_dict(self, user_name, intra_extension_uuid, object_category_name):
+ def add_object_category_dict(self, user_uuid, intra_extension_uuid, object_category_name):
object_category_uuid = uuid4().hex
return self.driver.add_object_category_dict(intra_extension_uuid, object_category_uuid, object_category_name)
@filter_args
@enforce("write", "object_categories")
- def del_object_category(self, user_name, intra_extension_uuid, object_uuid):
+ def del_object_category(self, user_uuid, intra_extension_uuid, object_uuid):
return self.driver.remove_object_category(intra_extension_uuid, object_uuid)
@filter_args
@enforce("read", "action_categories")
- def get_action_category_dict(self, user_name, intra_extension_uuid):
+ def get_action_category_dict(self, user_uuid, intra_extension_uuid):
return self.driver.get_action_category_dict(intra_extension_uuid)
@filter_args
@enforce("read", "action_categories")
@enforce("read", "action_category_scope")
@enforce("write", "action_category_scope")
- def set_action_category_dict(self, user_name, intra_extension_uuid, action_category):
+ def set_action_category_dict(self, user_uuid, intra_extension_uuid, action_category):
action_category_dict = self.driver.set_action_category_dict(intra_extension_uuid, action_category)
# if we add a new category, we must add it to the action_category_scope
for _cat in action_category.keys():
@@ -828,37 +835,37 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "action_categories")
@enforce("write", "action_categories")
- def add_action_category_dict(self, user_name, intra_extension_uuid, action_category_name):
+ def add_action_category_dict(self, user_uuid, intra_extension_uuid, action_category_name):
action_category_uuid = uuid4().hex
return self.driver.add_action_category_dict(intra_extension_uuid, action_category_uuid, action_category_name)
@filter_args
@enforce("write", "action_categories")
- def del_action_category(self, user_name, intra_extension_uuid, action_uuid):
+ def del_action_category(self, user_uuid, intra_extension_uuid, action_uuid):
return self.driver.remove_action_category(intra_extension_uuid, action_uuid)
# Scope functions
@filter_args
@enforce("read", "subject_category_scope")
@enforce("read", "subject_category")
- def get_subject_category_scope_dict(self, user_name, intra_extension_uuid, category):
- if category not in self.get_subject_category_dict(user_name, intra_extension_uuid)["subject_categories"]:
+ def get_subject_category_scope_dict(self, user_uuid, intra_extension_uuid, category):
+ if category not in self.get_subject_category_dict(user_uuid, intra_extension_uuid)["subject_categories"]:
raise IntraExtensionError("Subject category {} is unknown.".format(category))
return self.driver.get_subject_category_scope_dict(intra_extension_uuid, category)
@filter_args
@enforce("read", "subject_category_scope")
@enforce("read", "subject_category")
- def set_subject_category_scope_dict(self, user_name, intra_extension_uuid, category, scope):
- if category not in self.get_subject_category_dict(user_name, intra_extension_uuid)["subject_categories"]:
+ def set_subject_category_scope_dict(self, user_uuid, intra_extension_uuid, category, scope):
+ if category not in self.get_subject_category_dict(user_uuid, intra_extension_uuid)["subject_categories"]:
raise IntraExtensionError("Subject category {} is unknown.".format(category))
return self.driver.set_subject_category_scope_dict(intra_extension_uuid, category, scope)
@filter_args
@enforce(("read", "write"), "subject_category_scope")
@enforce("read", "subject_category")
- def add_subject_category_scope_dict(self, user_name, intra_extension_uuid, subject_category, scope_name):
- subject_categories = self.get_subject_category_dict(user_name, intra_extension_uuid)
+ def add_subject_category_scope_dict(self, user_uuid, intra_extension_uuid, subject_category, scope_name):
+ subject_categories = self.get_subject_category_dict(user_uuid, intra_extension_uuid)
# check if subject_category exists in database
if subject_category not in subject_categories["subject_categories"]:
raise IntraExtensionError("Subject category {} is unknown.".format(subject_category))
@@ -872,8 +879,8 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("write", "subject_category_scope")
@enforce("read", "subject_category")
- def del_subject_category_scope(self, user_name, intra_extension_uuid, subject_category, subject_category_scope):
- subject_categories = self.get_subject_category_dict(user_name, intra_extension_uuid)
+ def del_subject_category_scope(self, user_uuid, intra_extension_uuid, subject_category, subject_category_scope):
+ subject_categories = self.get_subject_category_dict(user_uuid, intra_extension_uuid)
# check if subject_category exists in database
if subject_category not in subject_categories["subject_categories"]:
raise IntraExtensionError("Subject category {} is unknown.".format(subject_category))
@@ -885,24 +892,24 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "object_category_scope")
@enforce("read", "object_category")
- def get_object_category_scope_dict(self, user_name, intra_extension_uuid, category):
- if category not in self.get_object_category_dict(user_name, intra_extension_uuid)["object_categories"]:
+ def get_object_category_scope_dict(self, user_uuid, intra_extension_uuid, category):
+ if category not in self.get_object_category_dict(user_uuid, intra_extension_uuid)["object_categories"]:
raise IntraExtensionError("Object category {} is unknown.".format(category))
return self.driver.get_object_category_scope_dict(intra_extension_uuid, category)
@filter_args
@enforce("read", "object_category_scope")
@enforce("read", "object_category")
- def set_object_category_scope_dict(self, user_name, intra_extension_uuid, category, scope):
- if category not in self.get_object_category_dict(user_name, intra_extension_uuid)["object_categories"]:
+ def set_object_category_scope_dict(self, user_uuid, intra_extension_uuid, category, scope):
+ if category not in self.get_object_category_dict(user_uuid, intra_extension_uuid)["object_categories"]:
raise IntraExtensionError("Object category {} is unknown.".format(category))
return self.driver.set_object_category_scope_dict(intra_extension_uuid, category, scope)
@filter_args
@enforce(("read", "write"), "object_category_scope")
@enforce("read", "object_category")
- def add_object_category_scope_dict(self, user_name, intra_extension_uuid, object_category, scope_name):
- object_categories = self.get_object_category_dict(user_name, intra_extension_uuid)
+ def add_object_category_scope_dict(self, user_uuid, intra_extension_uuid, object_category, scope_name):
+ object_categories = self.get_object_category_dict(user_uuid, intra_extension_uuid)
# check if object_category exists in database
if object_category not in object_categories["object_categories"]:
raise IntraExtensionError("Object category {} is unknown.".format(object_category))
@@ -916,8 +923,8 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("write", "object_category_scope")
@enforce("read", "object_category")
- def del_object_category_scope(self, user_name, intra_extension_uuid, object_category, object_category_scope):
- object_categories = self.get_object_category_dict(user_name, intra_extension_uuid)
+ def del_object_category_scope(self, user_uuid, intra_extension_uuid, object_category, object_category_scope):
+ object_categories = self.get_object_category_dict(user_uuid, intra_extension_uuid)
# check if object_category exists in database
if object_category not in object_categories["object_categories"]:
raise IntraExtensionError("Object category {} is unknown.".format(object_category))
@@ -929,24 +936,24 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "action_category_scope")
@enforce("read", "action_category")
- def get_action_category_scope_dict(self, user_name, intra_extension_uuid, category):
- if category not in self.get_action_category_dict(user_name, intra_extension_uuid)["action_categories"]:
+ def get_action_category_scope_dict(self, user_uuid, intra_extension_uuid, category):
+ if category not in self.get_action_category_dict(user_uuid, intra_extension_uuid)["action_categories"]:
raise IntraExtensionError("Action category {} is unknown.".format(category))
return self.driver.get_action_category_scope_dict(intra_extension_uuid, category)
@filter_args
@enforce(("read", "write"), "action_category_scope")
@enforce("read", "action_category")
- def set_action_category_scope_dict(self, user_name, intra_extension_uuid, category, scope):
- if category not in self.get_action_category_dict(user_name, intra_extension_uuid)["action_categories"]:
+ def set_action_category_scope_dict(self, user_uuid, intra_extension_uuid, category, scope):
+ if category not in self.get_action_category_dict(user_uuid, intra_extension_uuid)["action_categories"]:
raise IntraExtensionError("Action category {} is unknown.".format(category))
return self.driver.set_action_category_scope_dict(intra_extension_uuid, category, scope)
@filter_args
@enforce(("read", "write"), "action_category_scope")
@enforce("read", "action_category")
- def add_action_category_scope_dict(self, user_name, intra_extension_uuid, action_category, scope_name):
- action_categories = self.get_action_category_dict(user_name, intra_extension_uuid)
+ def add_action_category_scope_dict(self, user_uuid, intra_extension_uuid, action_category, scope_name):
+ action_categories = self.get_action_category_dict(user_uuid, intra_extension_uuid)
# check if action_category exists in database
if action_category not in action_categories["action_categories"]:
raise IntraExtensionError("Action category {} is unknown.".format(action_category))
@@ -960,8 +967,8 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("write", "action_category_scope")
@enforce("read", "action_category")
- def del_action_category_scope(self, user_name, intra_extension_uuid, action_category, action_category_scope):
- action_categories = self.get_action_category_dict(user_name, intra_extension_uuid)
+ def del_action_category_scope(self, user_uuid, intra_extension_uuid, action_category, action_category_scope):
+ action_categories = self.get_action_category_dict(user_uuid, intra_extension_uuid)
# check if action_category exists in database
if action_category not in action_categories["action_categories"]:
raise IntraExtensionError("Action category {} is unknown.".format(action_category))
@@ -975,9 +982,9 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "subject_category_assignment")
@enforce("read", "subjects")
- def get_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid):
+ def get_subject_category_assignment_dict(self, user_uuid, intra_extension_uuid, subject_uuid):
# check if subject exists in database
- if subject_uuid not in self.get_subject_dict(user_name, intra_extension_uuid)["subjects"]:
+ if subject_uuid not in self.get_subject_dict(user_uuid, intra_extension_uuid)["subjects"]:
LOG.error("add_subject_assignment: unknown subject_id {}".format(subject_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.get_subject_category_assignment_dict(intra_extension_uuid, subject_uuid)
@@ -986,9 +993,9 @@ class IntraExtensionManager(manager.Manager):
@enforce("read", "subject_category_assignment")
@enforce("write", "subject_category_assignment")
@enforce("read", "subjects")
- def set_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, assignment_dict):
+ def set_subject_category_assignment_dict(self, user_uuid, intra_extension_uuid, subject_uuid, assignment_dict):
# check if subject exists in database
- if subject_uuid not in self.get_subject_dict(user_name, intra_extension_uuid)["subjects"]:
+ if subject_uuid not in self.get_subject_dict(user_uuid, intra_extension_uuid)["subjects"]:
LOG.error("add_subject_assignment: unknown subject_id {}".format(subject_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.set_subject_category_assignment_dict(intra_extension_uuid, subject_uuid, assignment_dict)
@@ -998,13 +1005,13 @@ class IntraExtensionManager(manager.Manager):
@enforce("write", "subject_category_assignment")
@enforce("read", "subjects")
@enforce("read", "subject_category")
- def del_subject_category_assignment(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
+ def del_subject_category_assignment(self, user_uuid, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
# check if category exists in database
- if category_uuid not in self.get_subject_category_dict(user_name, intra_extension_uuid)["subject_categories"]:
+ if category_uuid not in self.get_subject_category_dict(user_uuid, intra_extension_uuid)["subject_categories"]:
LOG.error("add_subject_category_scope: unknown subject_category {}".format(category_uuid))
raise IntraExtensionError("Bad input data")
# check if subject exists in database
- if subject_uuid not in self.get_subject_dict(user_name, intra_extension_uuid)["subjects"]:
+ if subject_uuid not in self.get_subject_dict(user_uuid, intra_extension_uuid)["subjects"]:
LOG.error("add_subject_assignment: unknown subject_id {}".format(subject_uuid))
raise IntraExtensionError("Bad input data")
self.driver.remove_subject_category_assignment(intra_extension_uuid, subject_uuid, category_uuid, scope_uuid)
@@ -1013,13 +1020,13 @@ class IntraExtensionManager(manager.Manager):
@enforce("write", "subject_category_assignment")
@enforce("read", "subjects")
@enforce("read", "subject_category")
- def add_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
+ def add_subject_category_assignment_dict(self, user_uuid, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
# check if category exists in database
- if category_uuid not in self.get_subject_category_dict(user_name, intra_extension_uuid)["subject_categories"]:
+ if category_uuid not in self.get_subject_category_dict(user_uuid, intra_extension_uuid)["subject_categories"]:
LOG.error("add_subject_category_scope: unknown subject_category {}".format(category_uuid))
raise IntraExtensionError("Bad input data")
# check if subject exists in database
- if subject_uuid not in self.get_subject_dict(user_name, intra_extension_uuid)["subjects"]:
+ if subject_uuid not in self.get_subject_dict(user_uuid, intra_extension_uuid)["subjects"]:
LOG.error("add_subject_assignment: unknown subject_id {}".format(subject_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.add_subject_category_assignment_dict(intra_extension_uuid, subject_uuid, category_uuid, scope_uuid)
@@ -1027,9 +1034,9 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "object_category_assignment")
@enforce("read", "objects")
- def get_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid):
+ def get_object_category_assignment_dict(self, user_uuid, intra_extension_uuid, object_uuid):
# check if object exists in database
- if object_uuid not in self.get_object_dict(user_name, intra_extension_uuid)["objects"]:
+ if object_uuid not in self.get_object_dict(user_uuid, intra_extension_uuid)["objects"]:
LOG.error("add_object_assignment: unknown object_id {}".format(object_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.get_object_category_assignment_dict(intra_extension_uuid, object_uuid)
@@ -1038,9 +1045,9 @@ class IntraExtensionManager(manager.Manager):
@enforce("read", "object_category_assignment")
@enforce("write", "object_category_assignment")
@enforce("read", "objects")
- def set_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, assignment_dict):
+ def set_object_category_assignment_dict(self, user_uuid, intra_extension_uuid, object_uuid, assignment_dict):
# check if object exists in database
- if object_uuid not in self.get_object_dict(user_name, intra_extension_uuid)["objects"]:
+ if object_uuid not in self.get_object_dict(user_uuid, intra_extension_uuid)["objects"]:
LOG.error("add_object_assignment: unknown object_id {}".format(object_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.set_object_category_assignment_dict(intra_extension_uuid, object_uuid, assignment_dict)
@@ -1050,13 +1057,13 @@ class IntraExtensionManager(manager.Manager):
@enforce("write", "object_category_assignment")
@enforce("read", "objects")
@enforce("read", "object_category")
- def del_object_category_assignment(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
+ def del_object_category_assignment(self, user_uuid, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
# check if category exists in database
- if category_uuid not in self.get_object_category_dict(user_name, intra_extension_uuid)["object_categories"]:
+ if category_uuid not in self.get_object_category_dict(user_uuid, intra_extension_uuid)["object_categories"]:
LOG.error("add_object_category_scope: unknown object_category {}".format(category_uuid))
raise IntraExtensionError("Bad input data")
# check if object exists in database
- if object_uuid not in self.get_object_dict(user_name, intra_extension_uuid)["objects"]:
+ if object_uuid not in self.get_object_dict(user_uuid, intra_extension_uuid)["objects"]:
LOG.error("add_object_assignment: unknown object_id {}".format(object_uuid))
raise IntraExtensionError("Bad input data")
self.driver.remove_object_category_assignment(intra_extension_uuid, object_uuid, category_uuid, scope_uuid)
@@ -1065,13 +1072,13 @@ class IntraExtensionManager(manager.Manager):
@enforce("write", "object_category_assignment")
@enforce("read", "objects")
@enforce("read", "object_category")
- def add_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
+ def add_object_category_assignment_dict(self, user_uuid, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
# check if category exists in database
- if category_uuid not in self.get_object_category_dict(user_name, intra_extension_uuid)["object_categories"]:
+ if category_uuid not in self.get_object_category_dict(user_uuid, intra_extension_uuid)["object_categories"]:
LOG.error("add_object_category_scope: unknown object_category {}".format(category_uuid))
raise IntraExtensionError("Bad input data")
# check if object exists in database
- if object_uuid not in self.get_object_dict(user_name, intra_extension_uuid)["objects"]:
+ if object_uuid not in self.get_object_dict(user_uuid, intra_extension_uuid)["objects"]:
LOG.error("add_object_assignment: unknown object_id {}".format(object_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.add_object_category_assignment_dict(intra_extension_uuid, object_uuid, category_uuid, scope_uuid)
@@ -1079,9 +1086,9 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "action_category_assignment")
@enforce("read", "actions")
- def get_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid):
+ def get_action_category_assignment_dict(self, user_uuid, intra_extension_uuid, action_uuid):
# check if action exists in database
- if action_uuid not in self.get_action_dict(user_name, intra_extension_uuid)["actions"]:
+ if action_uuid not in self.get_action_dict(user_uuid, intra_extension_uuid)["actions"]:
LOG.error("add_action_assignment: unknown action_id {}".format(action_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.get_action_category_assignment_dict(intra_extension_uuid, action_uuid)
@@ -1090,9 +1097,9 @@ class IntraExtensionManager(manager.Manager):
@enforce("read", "action_category_assignment")
@enforce("write", "action_category_assignment")
@enforce("read", "actions")
- def set_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, assignment_dict):
+ def set_action_category_assignment_dict(self, user_uuid, intra_extension_uuid, action_uuid, assignment_dict):
# check if action exists in database
- if action_uuid not in self.get_action_dict(user_name, intra_extension_uuid)["actions"]:
+ if action_uuid not in self.get_action_dict(user_uuid, intra_extension_uuid)["actions"]:
LOG.error("add_action_assignment: unknown action_id {}".format(action_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.set_action_category_assignment_dict(intra_extension_uuid, action_uuid, assignment_dict)
@@ -1102,13 +1109,13 @@ class IntraExtensionManager(manager.Manager):
@enforce("write", "action_category_assignment")
@enforce("read", "actions")
@enforce("read", "action_category")
- def del_action_category_assignment(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
+ def del_action_category_assignment(self, user_uuid, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
# check if category exists in database
- if category_uuid not in self.get_action_category_dict(user_name, intra_extension_uuid)["action_categories"]:
+ if category_uuid not in self.get_action_category_dict(user_uuid, intra_extension_uuid)["action_categories"]:
LOG.error("add_action_category_scope: unknown action_category {}".format(category_uuid))
raise IntraExtensionError("Bad input data")
# check if action exists in database
- if action_uuid not in self.get_action_dict(user_name, intra_extension_uuid)["actions"]:
+ if action_uuid not in self.get_action_dict(user_uuid, intra_extension_uuid)["actions"]:
LOG.error("add_action_assignment: unknown action_id {}".format(action_uuid))
raise IntraExtensionError("Bad input data")
self.driver.remove_action_category_assignment(intra_extension_uuid, action_uuid, category_uuid, scope_uuid)
@@ -1117,13 +1124,13 @@ class IntraExtensionManager(manager.Manager):
@enforce("write", "action_category_assignment")
@enforce("read", "actions")
@enforce("read", "action_category")
- def add_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
+ def add_action_category_assignment_dict(self, user_uuid, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
# check if category exists in database
- if category_uuid not in self.get_action_category_dict(user_name, intra_extension_uuid)["action_categories"]:
+ if category_uuid not in self.get_action_category_dict(user_uuid, intra_extension_uuid)["action_categories"]:
LOG.error("add_action_category_scope: unknown action_category {}".format(category_uuid))
raise IntraExtensionError("Bad input data")
# check if action exists in database
- if action_uuid not in self.get_action_dict(user_name, intra_extension_uuid)["actions"]:
+ if action_uuid not in self.get_action_dict(user_uuid, intra_extension_uuid)["actions"]:
LOG.error("add_action_assignment: unknown action_id {}".format(action_uuid))
raise IntraExtensionError("Bad input data")
return self.driver.add_action_category_assignment_dict(
@@ -1135,21 +1142,21 @@ class IntraExtensionManager(manager.Manager):
# Metarule functions
@filter_args
- def get_aggregation_algorithms(self, user_name, intra_extension_uuid):
+ def get_aggregation_algorithms(self, user_uuid, intra_extension_uuid):
# TODO: check which algorithms are really usable
return {"aggregation_algorithms": ["and_true_aggregation", "test_aggregation"]}
@filter_args
@enforce("read", "aggregation_algorithms")
- def get_aggregation_algorithm(self, user_name, intra_extension_uuid):
+ def get_aggregation_algorithm(self, user_uuid, intra_extension_uuid):
return self.driver.get_meta_rule_dict(intra_extension_uuid)
@filter_args
@enforce("read", "aggregation_algorithms")
@enforce("write", "aggregation_algorithms")
- def set_aggregation_algorithm(self, user_name, intra_extension_uuid, aggregation_algorithm):
+ def set_aggregation_algorithm(self, user_uuid, intra_extension_uuid, aggregation_algorithm):
if aggregation_algorithm not in self.get_aggregation_algorithms(
- user_name, intra_extension_uuid)["aggregation_algorithms"]:
+ user_uuid, intra_extension_uuid)["aggregation_algorithms"]:
raise IntraExtensionError("Unknown aggregation_algorithm: {}".format(aggregation_algorithm))
meta_rule = self.driver.get_meta_rule_dict(intra_extension_uuid)
meta_rule["aggregation"] = aggregation_algorithm
@@ -1157,17 +1164,17 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "sub_meta_rule")
- def get_sub_meta_rule(self, user_name, intra_extension_uuid):
+ def get_sub_meta_rule(self, user_uuid, intra_extension_uuid):
return self.driver.get_meta_rule_dict(intra_extension_uuid)
@filter_args
@enforce("read", "sub_meta_rule")
@enforce("write", "sub_meta_rule")
- def set_sub_meta_rule(self, user_name, intra_extension_uuid, sub_meta_rules):
+ def set_sub_meta_rule(self, user_uuid, intra_extension_uuid, sub_meta_rules):
# TODO (dthom): When sub_meta_rule is set, all rules must be dropped
# because the previous rules cannot be mapped to the new sub_meta_rule.
for relation in sub_meta_rules.keys():
- if relation not in self.get_sub_meta_rule_relations(user_name, intra_extension_uuid)["sub_meta_rule_relations"]:
+ if relation not in self.get_sub_meta_rule_relations(user_uuid, intra_extension_uuid)["sub_meta_rule_relations"]:
LOG.error("set_sub_meta_rule unknown MetaRule relation {}".format(relation))
raise IntraExtensionError("Bad input data.")
for cat in ("subject_categories", "object_categories", "action_categories"):
@@ -1177,19 +1184,19 @@ class IntraExtensionManager(manager.Manager):
if type(sub_meta_rules[relation][cat]) is not list:
LOG.error("set_sub_meta_rule category {} is not a list".format(cat))
raise IntraExtensionError("Bad input data.")
- subject_categories = self.get_subject_category_dict(user_name, intra_extension_uuid)
+ subject_categories = self.get_subject_category_dict(user_uuid, intra_extension_uuid)
for data in sub_meta_rules[relation]["subject_categories"]:
if data not in subject_categories["subject_categories"]:
LOG.error("set_sub_meta_rule category {} is not part of subject_categories {}".format(
data, subject_categories))
raise IntraExtensionError("Bad input data.")
- object_categories = self.get_object_category_dict(user_name, intra_extension_uuid)
+ object_categories = self.get_object_category_dict(user_uuid, intra_extension_uuid)
for data in sub_meta_rules[relation]["object_categories"]:
if data not in object_categories["object_categories"]:
LOG.error("set_sub_meta_rule category {} is not part of object_categories {}".format(
data, object_categories))
raise IntraExtensionError("Bad input data.")
- action_categories = self.get_action_category_dict(user_name, intra_extension_uuid)
+ action_categories = self.get_action_category_dict(user_uuid, intra_extension_uuid)
for data in sub_meta_rules[relation]["action_categories"]:
if data not in action_categories["action_categories"]:
LOG.error("set_sub_meta_rule category {} is not part of action_categories {}".format(
@@ -1206,24 +1213,24 @@ class IntraExtensionManager(manager.Manager):
# Sub-rules functions
@filter_args
@enforce("read", "sub_rules")
- def get_sub_rules(self, user_name, intra_extension_uuid):
+ def get_sub_rules(self, user_uuid, intra_extension_uuid):
return self.driver.get_rules(intra_extension_uuid)
@filter_args
@enforce("read", "sub_rules")
@enforce("write", "sub_rules")
- def set_sub_rule(self, user_name, intra_extension_uuid, relation, sub_rule):
+ def set_sub_rule(self, user_uuid, intra_extension_uuid, relation, sub_rule):
for item in sub_rule:
if type(item) not in (str, unicode, bool):
raise IntraExtensionError("Bad input data (sub_rule).")
ref_rules = self.driver.get_rules(intra_extension_uuid)
_sub_rule = list(sub_rule)
- if relation not in self.get_sub_meta_rule_relations(user_name, intra_extension_uuid)["sub_meta_rule_relations"]:
+ if relation not in self.get_sub_meta_rule_relations(user_uuid, intra_extension_uuid)["sub_meta_rule_relations"]:
raise IntraExtensionError("Bad input data (rules).")
# filter strings in sub_rule
sub_rule = [filter_input(x) for x in sub_rule]
# check if length of sub_rule is correct from metadata_sub_rule
- metadata_sub_rule = self.get_sub_meta_rule(user_name, intra_extension_uuid)
+ metadata_sub_rule = self.get_sub_meta_rule(user_uuid, intra_extension_uuid)
metadata_sub_rule_length = len(metadata_sub_rule['sub_meta_rules'][relation]["subject_categories"]) + \
len(metadata_sub_rule['sub_meta_rules'][relation]["action_categories"]) + \
len(metadata_sub_rule['sub_meta_rules'][relation]["object_categories"]) + 1
@@ -1234,12 +1241,12 @@ class IntraExtensionManager(manager.Manager):
for category in metadata_sub_rule['sub_meta_rules'][relation]["subject_categories"]:
item = _sub_rule.pop(0)
if item not in self.get_subject_category_scope_dict(
- user_name,
+ user_uuid,
intra_extension_uuid, category)["subject_category_scope"][category].keys():
raise IntraExtensionError("Bad subject value in sub_rule {}/{}".format(category, item))
for category in metadata_sub_rule['sub_meta_rules'][relation]["action_categories"]:
action_categories = self.get_action_category_scope_dict(
- user_name,
+ user_uuid,
intra_extension_uuid, category)["action_category_scope"][category]
item = _sub_rule.pop(0)
if item not in action_categories.keys():
@@ -1248,7 +1255,7 @@ class IntraExtensionManager(manager.Manager):
for category in metadata_sub_rule['sub_meta_rules'][relation]["object_categories"]:
item = _sub_rule.pop(0)
if item not in self.get_object_category_scope_dict(
- user_name,
+ user_uuid,
intra_extension_uuid, category)["object_category_scope"][category].keys():
raise IntraExtensionError("Bad object value in sub_rule {}/{}".format(category, item))
# check if relation is already there
@@ -1261,7 +1268,7 @@ class IntraExtensionManager(manager.Manager):
@filter_args
@enforce("read", "sub_rules")
@enforce("write", "sub_rules")
- def del_sub_rule(self, user_name, intra_extension_uuid, relation_name, rule):
+ def del_sub_rule(self, user_uuid, intra_extension_uuid, relation_name, rule):
ref_rules = self.driver.get_rules(intra_extension_uuid)
rule = rule.split("+")
for index, _item in enumerate(rule):
@@ -1300,124 +1307,124 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
def delete_intra_extension(self, intra_extension_id):
raise AdminException()
- def set_subject_dict(self, user_name, intra_extension_uuid, subject_dict):
+ def set_subject_dict(self, user_uuid, intra_extension_uuid, subject_dict):
raise SubjectAddNotAuthorized()
- def add_subject_dict(self, user_name, intra_extension_uuid, subject_uuid):
+ def add_subject_dict(self, user_uuid, intra_extension_uuid, subject_uuid):
raise SubjectAddNotAuthorized()
- def del_subject(self, user_name, intra_extension_uuid, subject_uuid):
+ def del_subject(self, user_uuid, intra_extension_uuid, subject_uuid):
raise SubjectDelNotAuthorized()
- def set_object_dict(self, user_name, intra_extension_uuid, object_dict):
+ def set_object_dict(self, user_uuid, intra_extension_uuid, object_dict):
raise ObjectAddNotAuthorized()
- def add_object_dict(self, user_name, intra_extension_uuid, object_name):
+ def add_object_dict(self, user_uuid, intra_extension_uuid, object_name):
raise ObjectAddNotAuthorized()
- def del_object(self, user_name, intra_extension_uuid, object_uuid):
+ def del_object(self, user_uuid, intra_extension_uuid, object_uuid):
raise ObjectDelNotAuthorized()
- def set_action_dict(self, user_name, intra_extension_uuid, action_dict):
+ def set_action_dict(self, user_uuid, intra_extension_uuid, action_dict):
raise ActionAddNotAuthorized()
- def add_action_dict(self, user_name, intra_extension_uuid, action_name):
+ def add_action_dict(self, user_uuid, intra_extension_uuid, action_name):
raise ActionAddNotAuthorized()
- def del_action(self, user_name, intra_extension_uuid, action_uuid):
+ def del_action(self, user_uuid, intra_extension_uuid, action_uuid):
raise ActionDelNotAuthorized()
- def set_subject_category_dict(self, user_name, intra_extension_uuid, subject_category):
+ def set_subject_category_dict(self, user_uuid, intra_extension_uuid, subject_category):
raise SubjectCategoryAddNotAuthorized()
- def add_subject_category_dict(self, user_name, intra_extension_uuid, subject_category_name):
+ def add_subject_category_dict(self, user_uuid, intra_extension_uuid, subject_category_name):
raise SubjectCategoryAddNotAuthorized()
- def del_subject_category(self, user_name, intra_extension_uuid, subject_uuid):
+ def del_subject_category(self, user_uuid, intra_extension_uuid, subject_uuid):
raise SubjectCategoryDelNotAuthorized()
- def set_object_category_dict(self, user_name, intra_extension_uuid, object_category):
+ def set_object_category_dict(self, user_uuid, intra_extension_uuid, object_category):
raise ObjectCategoryAddNotAuthorized()
- def add_object_category_dict(self, user_name, intra_extension_uuid, object_category_name):
+ def add_object_category_dict(self, user_uuid, intra_extension_uuid, object_category_name):
raise ObjectCategoryAddNotAuthorized()
- def del_object_category(self, user_name, intra_extension_uuid, object_uuid):
+ def del_object_category(self, user_uuid, intra_extension_uuid, object_uuid):
raise ObjectCategoryDelNotAuthorized()
- def set_action_category_dict(self, user_name, intra_extension_uuid, action_category):
+ def set_action_category_dict(self, user_uuid, intra_extension_uuid, action_category):
raise ActionCategoryAddNotAuthorized()
- def add_action_category_dict(self, user_name, intra_extension_uuid, action_category_name):
+ def add_action_category_dict(self, user_uuid, intra_extension_uuid, action_category_name):
raise ActionCategoryAddNotAuthorized()
- def del_action_category(self, user_name, intra_extension_uuid, action_uuid):
+ def del_action_category(self, user_uuid, intra_extension_uuid, action_uuid):
raise ActionCategoryDelNotAuthorized()
- def set_subject_category_scope_dict(self, user_name, intra_extension_uuid, category, scope):
+ def set_subject_category_scope_dict(self, user_uuid, intra_extension_uuid, category, scope):
raise SubjectCategoryScopeAddNotAuthorized()
- def add_subject_category_scope_dict(self, user_name, intra_extension_uuid, subject_category, scope_name):
+ def add_subject_category_scope_dict(self, user_uuid, intra_extension_uuid, subject_category, scope_name):
raise SubjectCategoryScopeAddNotAuthorized()
- def del_subject_category_scope(self, user_name, intra_extension_uuid, subject_category, subject_category_scope):
+ def del_subject_category_scope(self, user_uuid, intra_extension_uuid, subject_category, subject_category_scope):
raise SubjectCategoryScopeDelNotAuthorized()
- def set_object_category_scope_dict(self, user_name, intra_extension_uuid, category, scope):
+ def set_object_category_scope_dict(self, user_uuid, intra_extension_uuid, category, scope):
raise ObjectCategoryScopeAddNotAuthorized()
- def add_object_category_scope_dict(self, user_name, intra_extension_uuid, object_category, scope_name):
+ def add_object_category_scope_dict(self, user_uuid, intra_extension_uuid, object_category, scope_name):
raise ObjectCategoryScopeAddNotAuthorized()
- def del_object_category_scope(self, user_name, intra_extension_uuid, object_category, object_category_scope):
+ def del_object_category_scope(self, user_uuid, intra_extension_uuid, object_category, object_category_scope):
raise ObjectCategoryScopeDelNotAuthorized()
- def set_action_category_scope_dict(self, user_name, intra_extension_uuid, category, scope):
+ def set_action_category_scope_dict(self, user_uuid, intra_extension_uuid, category, scope):
raise ActionCategoryScopeAddNotAuthorized()
- def add_action_category_scope_dict(self, user_name, intra_extension_uuid, action_category, scope_name):
+ def add_action_category_scope_dict(self, user_uuid, intra_extension_uuid, action_category, scope_name):
raise ActionCategoryScopeAddNotAuthorized()
- def del_action_category_scope(self, user_name, intra_extension_uuid, action_category, action_category_scope):
+ def del_action_category_scope(self, user_uuid, intra_extension_uuid, action_category, action_category_scope):
raise ActionCategoryScopeDelNotAuthorized()
- def set_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, assignment_dict):
+ def set_subject_category_assignment_dict(self, user_uuid, intra_extension_uuid, subject_uuid, assignment_dict):
raise SubjectCategoryAssignmentAddNotAuthorized()
- def del_subject_category_assignment(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
+ def del_subject_category_assignment(self, user_uuid, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
raise SubjectCategoryAssignmentAddNotAuthorized()
- def add_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
+ def add_subject_category_assignment_dict(self, user_uuid, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid):
raise SubjectCategoryAssignmentDelNotAuthorized()
- def set_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, assignment_dict):
+ def set_object_category_assignment_dict(self, user_uuid, intra_extension_uuid, object_uuid, assignment_dict):
raise ObjectCategoryAssignmentAddNotAuthorized()
- def del_object_category_assignment(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
+ def del_object_category_assignment(self, user_uuid, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
raise ObjectCategoryAssignmentAddNotAuthorized()
- def add_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
+ def add_object_category_assignment_dict(self, user_uuid, intra_extension_uuid, object_uuid, category_uuid, scope_uuid):
raise ObjectCategoryAssignmentDelNotAuthorized()
- def set_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, assignment_dict):
+ def set_action_category_assignment_dict(self, user_uuid, intra_extension_uuid, action_uuid, assignment_dict):
raise ActionCategoryAssignmentAddNotAuthorized()
- def del_action_category_assignment(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
+ def del_action_category_assignment(self, user_uuid, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
raise ActionCategoryAssignmentAddNotAuthorized()
- def add_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
+ def add_action_category_assignment_dict(self, user_uuid, intra_extension_uuid, action_uuid, category_uuid, scope_uuid):
raise ActionCategoryAssignmentDelNotAuthorized()
- def set_aggregation_algorithm(self, user_name, intra_extension_uuid, aggregation_algorithm):
+ def set_aggregation_algorithm(self, user_uuid, intra_extension_uuid, aggregation_algorithm):
raise MetaRuleAddNotAuthorized()
- def set_sub_meta_rule(self, user_name, intra_extension_uuid, sub_meta_rules):
+ def set_sub_meta_rule(self, user_uuid, intra_extension_uuid, sub_meta_rules):
raise MetaRuleAddNotAuthorized()
- def set_sub_rule(self, user_name, intra_extension_uuid, relation, sub_rule):
+ def set_sub_rule(self, user_uuid, intra_extension_uuid, relation, sub_rule):
raise RuleAddNotAuthorized()
- def del_sub_rule(self, user_name, intra_extension_uuid, relation_name, rule):
+ def del_sub_rule(self, user_uuid, intra_extension_uuid, relation_name, rule):
raise RuleAddNotAuthorized()
@dependency.provider('admin_api')
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py
index 6426bf84..684b9695 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py
@@ -62,8 +62,9 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
def create_intra_extension(self, policy_model="policy_rbac_admin"):
# Create the admin user because IntraExtension needs it
- self.admin = self.identity_api.create_user(USER)
+ #self.admin = self.identity_api.create_user(USER)
IE["policymodel"] = policy_model
+ IE["name"] = uuid.uuid4().hex
self.ref = self.manager.load_intra_extension(IE)
self.assertIsInstance(self.ref, dict)
self.create_tenant(self.ref["id"])
@@ -98,6 +99,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.manager.delete_intra_extension(self.ref["id"])
def test_subjects(self):
+ self.create_user("admin")
self.create_intra_extension()
subjects = self.manager.get_subject_dict("admin", self.ref["id"])
@@ -145,6 +147,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(new_subject["id"], subjects["subjects"])
def test_objects(self):
+ self.create_user("admin")
self.create_intra_extension()
objects = self.manager.get_object_dict("admin", self.ref["id"])
@@ -155,7 +158,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertEqual(self.ref["id"], objects["intra_extension_uuid"])
self.assertIsInstance(objects["objects"], dict)
- new_object = self.create_user()
+ new_object = {"id": uuid.uuid4().hex, "name": "my_object"}
new_objects = dict()
new_objects[new_object["id"]] = new_object["name"]
objects = self.manager.set_object_dict("admin", self.ref["id"], new_objects)
@@ -193,6 +196,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(new_object["id"], objects["objects"])
def test_actions(self):
+ self.create_user("admin")
self.create_intra_extension()
actions = self.manager.get_action_dict("admin", self.ref["id"])
@@ -203,7 +207,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertEqual(self.ref["id"], actions["intra_extension_uuid"])
self.assertIsInstance(actions["actions"], dict)
- new_action = self.create_user()
+ new_action = {"id": uuid.uuid4().hex, "name": "my_action"}
new_actions = dict()
new_actions[new_action["id"]] = new_action["name"]
actions = self.manager.set_action_dict("admin", self.ref["id"], new_actions)
@@ -241,6 +245,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(new_action["id"], actions["actions"])
def test_subject_categories(self):
+ self.create_user("admin")
self.create_intra_extension()
subject_categories = self.manager.get_subject_category_dict("admin", self.ref["id"])
@@ -294,6 +299,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(new_subject_category["id"], subject_categories["subject_categories"])
def test_object_categories(self):
+ self.create_user("admin")
self.create_intra_extension()
object_categories = self.manager.get_object_category_dict("admin", self.ref["id"])
@@ -347,6 +353,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(new_object_category["id"], object_categories["object_categories"])
def test_action_categories(self):
+ self.create_user("admin")
self.create_intra_extension()
action_categories = self.manager.get_action_category_dict("admin", self.ref["id"])
@@ -400,6 +407,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(new_action_category["id"], action_categories["action_categories"])
def test_subject_category_scope(self):
+ self.create_user("admin")
self.create_intra_extension()
subject_categories = self.manager.set_subject_category_dict(
@@ -479,6 +487,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"])
def test_object_category_scope(self):
+ self.create_user("admin")
self.create_intra_extension()
object_categories = self.manager.set_object_category_dict(
@@ -558,6 +567,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"])
def test_action_category_scope(self):
+ self.create_user("admin")
self.create_intra_extension()
action_categories = self.manager.set_action_category_dict(
@@ -637,6 +647,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"])
def test_subject_category_assignment(self):
+ self.create_user("admin")
self.create_intra_extension()
new_subject = self.create_user()
@@ -784,9 +795,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
subject_category_assignments["subject_category_assignments"][new_subject["id"]])
def test_object_category_assignment(self):
+ self.create_user("admin")
self.create_intra_extension()
- new_object = self.create_user()
+ new_object = {"id": uuid.uuid4().hex, "name": "my_object"}
new_objects = dict()
new_objects[new_object["id"]] = new_object["name"]
objects = self.manager.set_object_dict("admin", self.ref["id"], new_objects)
@@ -931,9 +943,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
object_category_assignments["object_category_assignments"][new_object["id"]])
def test_action_category_assignment(self):
+ self.create_user("admin")
self.create_intra_extension()
- new_action = self.create_user()
+ new_action = {"id": uuid.uuid4().hex, "name": "my_action"}
new_actions = dict()
new_actions[new_action["id"]] = new_action["name"]
actions = self.manager.set_action_dict("admin", self.ref["id"], new_actions)
@@ -1078,6 +1091,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
action_category_assignments["action_category_assignments"][new_action["id"]])
def test_sub_meta_rules(self):
+ self.create_user("admin")
self.create_intra_extension()
aggregation_algorithms = self.manager.get_aggregation_algorithms("admin", self.ref["id"])
@@ -1152,6 +1166,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
)
def test_sub_rules(self):
+ self.create_user("admin")
self.create_intra_extension()
sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"])
@@ -1166,7 +1181,6 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"])
rules[relation] = list()
for rule in sub_rules["rules"][relation]:
- print(rule)
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"),
("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"),
@@ -1179,7 +1193,6 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
cat_value
)
a_scope = rule.pop(0)
- print(a_scope)
if type(a_scope) is not bool:
self.assertIn(a_scope, scope[func_name][cat_value])
@@ -1242,7 +1255,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
return {
"moonlog_api": LogManager(),
"tenant_api": TenantManager(),
- "resource_api": resource.Manager(),
+ # "resource_api": resource.Manager(),
}
def config_overrides(self):
@@ -1282,8 +1295,9 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
def create_intra_extension(self, policy_model="policy_rbac_authz"):
IE["policymodel"] = policy_model
+ IE["name"] = uuid.uuid4().hex
ref = self.admin_manager.load_intra_extension(IE)
- self.assertIsInstance(self.ref, dict)
+ self.assertIsInstance(ref, dict)
return ref
def test_subjects(self):
@@ -2222,7 +2236,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
ref = self.create_intra_extension()
demo_user = self.create_user("demo")
- new_object = self.create_user()
+ new_object = {"id": uuid.uuid4().hex, "name": "my_object"}
new_objects = dict()
new_objects[new_object["id"]] = new_object["name"]
objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects)
@@ -2402,7 +2416,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
ref = self.create_intra_extension()
demo_user = self.create_user("demo")
- new_action = self.create_user()
+ new_action = {"id": uuid.uuid4().hex, "name": "my_action"}
new_actions = dict()
new_actions[new_action["id"]] = new_action["name"]
actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions)
@@ -2702,7 +2716,6 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"])
rules[relation] = list()
for rule in sub_rules["rules"][relation]:
- print(rule)
for cat, cat_func, func_name in (
("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"),
("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"),
@@ -2715,7 +2728,6 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
cat_value
)
a_scope = rule.pop(0)
- print(a_scope)
if type(a_scope) is not bool:
self.assertIn(a_scope, scope[func_name][cat_value])
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py
index 64a2d38f..4752632b 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py
@@ -48,7 +48,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
return {
"moonlog_api": LogManager(),
"tenant_api": TenantManager(),
- "resource_api": resource.Manager(),
+ # "resource_api": resource.Manager(),
}
def config_overrides(self):
@@ -88,8 +88,9 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
def create_intra_extension(self, policy_model="policy_rbac_authz"):
IE["policymodel"] = policy_model
+ IE["name"] = uuid.uuid4().hex
ref = self.admin_manager.load_intra_extension(IE)
- self.assertIsInstance(self.ref, dict)
+ self.assertIsInstance(ref, dict)
return ref
def test_tenant_exceptions(self):
@@ -337,10 +338,13 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
self.assertEqual(True, result)
def test_subjects(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
- subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"])
+ subjects = self.manager.get_subject_dict(admin_user["id"], tenant["id"])
self.assertIsInstance(subjects, dict)
self.assertIn("subjects", subjects)
self.assertIn("id", subjects)
@@ -369,10 +373,13 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], new_subject["id"])
def test_objects(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
- objects = self.manager.get_object_dict(admin_user["id"], ref["id"])
+ objects = self.manager.get_object_dict(admin_user["id"], tenant["id"])
self.assertIsInstance(objects, dict)
self.assertIn("objects", objects)
self.assertIn("id", objects)
@@ -401,10 +408,13 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], new_object["name"])
def test_actions(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
- actions = self.manager.get_action_dict(admin_user["id"], ref["id"])
+ actions = self.manager.get_action_dict(admin_user["id"], tenant["id"])
self.assertIsInstance(actions, dict)
self.assertIn("actions", actions)
self.assertIn("id", actions)
@@ -433,8 +443,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], new_action["id"])
def test_subject_categories(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"])
self.assertIsInstance(subject_categories, dict)
@@ -465,8 +478,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], new_subject_category["name"])
def test_object_categories(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"])
self.assertIsInstance(object_categories, dict)
@@ -497,8 +513,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], new_object_category["name"])
def test_action_categories(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"])
self.assertIsInstance(action_categories, dict)
@@ -529,8 +548,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], new_action_category["name"])
def test_subject_category_scope(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
subject_categories = self.admin_manager.set_subject_category_dict(
admin_user["id"],
@@ -574,8 +596,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid])
def test_object_category_scope(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
object_categories = self.admin_manager.set_object_category_dict(
admin_user["id"],
@@ -619,8 +644,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid])
def test_action_category_scope(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
action_categories = self.admin_manager.set_action_category_dict(
admin_user["id"],
@@ -664,8 +692,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
admin_user["id"], ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid])
def test_subject_category_assignment(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
new_subject = self.create_user()
new_subjects = dict()
@@ -761,8 +792,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
new_subject_category_scope_uuid)
def test_object_category_assignment(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
new_object = {"id": uuid.uuid4().hex, "name": "my_object"}
new_objects = dict()
@@ -858,8 +892,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
new_object_category_scope_uuid)
def test_action_category_assignment(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
new_action = {"id": uuid.uuid4().hex, "name": "my_action"}
new_actions = dict()
@@ -955,8 +992,11 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
new_action_category_scope_uuid)
def test_sub_meta_rules(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"])
self.assertIsInstance(aggregation_algorithms, dict)
@@ -1021,14 +1061,17 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase):
self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"])
self.assertIn(new_subject_category["id"], subject_categories["subject_categories"])
metarule[relation]["subject_categories"].append(new_subject_category["id"])
- self.MetaRuleAddNotAuthorized(
- AdminException,
+ self.assertRaises(
+ MetaRuleAddNotAuthorized,
self.manager.set_sub_meta_rule,
admin_user["id"], ref["id"], metarule)
def test_sub_rules(self):
- ref = self.create_intra_extension()
admin_user = self.create_user()
+ tenant = self.create_tenant()
+ ref = self.create_intra_extension("policy_rbac_authz")
+ ref_admin = self.create_intra_extension("policy_rbac_admin")
+ self.create_mapping(tenant, ref["id"], ref_admin["id"])
sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"])
self.assertIsInstance(sub_meta_rules, dict)