summaryrefslogtreecommitdiffstats
path: root/keystone-moon
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-08-31 12:00:56 +0200
committerasteroide <thomas.duval@orange.com>2015-08-31 12:00:56 +0200
commit26e753254f3e43399cc76e62892908b7742415e8 (patch)
tree9f7b41a6ae862288cb5b083c8627ce5e415568cc /keystone-moon
parent67c5b73910f5fc437429c356978081b252a59480 (diff)
Fix all tests.
Change-Id: I62fcce5942dee7ed5755fe20d012e4a0d5c535c9
Diffstat (limited to 'keystone-moon')
-rw-r--r--keystone-moon/etc/keystone.conf.sample5
-rw-r--r--keystone-moon/keystone/contrib/moon/algorithms.py34
-rw-r--r--keystone-moon/keystone/contrib/moon/backends/memory.py17
-rw-r--r--keystone-moon/keystone/contrib/moon/backends/sql.py23
-rw-r--r--keystone-moon/keystone/contrib/moon/core.py601
-rw-r--r--keystone-moon/keystone/contrib/moon/extension.py740
-rw-r--r--keystone-moon/keystone/contrib/moon/migrate_repo/versions/001_moon.py12
-rw-r--r--keystone-moon/keystone/tests/moon/unit/__init__.py10
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_configuration.py21
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py402
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py515
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py58
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py181
13 files changed, 927 insertions, 1692 deletions
diff --git a/keystone-moon/etc/keystone.conf.sample b/keystone-moon/etc/keystone.conf.sample
index b3c741c8..2e062a5a 100644
--- a/keystone-moon/etc/keystone.conf.sample
+++ b/keystone-moon/etc/keystone.conf.sample
@@ -1700,9 +1700,6 @@
# Moon Log driver. (string value)
#log_driver = keystone.contrib.moon.backends.flat.LogConnector
-# SuperExtension backend driver. (string value)
-#superextension_driver = keystone.contrib.moon.backends.flat.SuperExtensionConnector
-
# IntraExtension backend driver. (string value)
#intraextension_driver = keystone.contrib.moon.backends.sql.IntraExtensionConnector
@@ -1713,4 +1710,4 @@
#policy_directory = /etc/keystone/policies
# Local directory where SuperExtension configuration is stored. (string value)
-#super_extension_directory = /etc/keystone/super_extension
+#root_policy_directory = /etc/keystone/policies/policy_root
diff --git a/keystone-moon/keystone/contrib/moon/algorithms.py b/keystone-moon/keystone/contrib/moon/algorithms.py
index 8644e02d..30305fc1 100644
--- a/keystone-moon/keystone/contrib/moon/algorithms.py
+++ b/keystone-moon/keystone/contrib/moon/algorithms.py
@@ -22,18 +22,19 @@ sub_meta_rule_dict = {
}
rule_dict = [
- ["high", "vm_admin", "medium"],
- ["high", "vm_admin", "low"],
- ["medium", "vm_admin", "low"],
- ["high", "vm_access", "high"],
- ["high", "vm_access", "medium"],
- ["high", "vm_access", "low"],
- ["medium", "vm_access", "medium"],
- ["medium", "vm_access", "low"],
- ["low", "vm_access", "low"]
+ ["high", "vm_admin", "medium", True],
+ ["high", "vm_admin", "low", True],
+ ["medium", "vm_admin", "low", True],
+ ["high", "vm_access", "high", True],
+ ["high", "vm_access", "medium", True],
+ ["high", "vm_access", "low", True],
+ ["medium", "vm_access", "medium", True],
+ ["medium", "vm_access", "low", True],
+ ["low", "vm_access", "low", True]
]
"""
+
def inclusion(authz_buffer, sub_meta_rule_dict, rule_list):
_cat = []
for subject_cat in sub_meta_rule_dict['subject_categories']:
@@ -46,14 +47,10 @@ def inclusion(authz_buffer, sub_meta_rule_dict, rule_list):
if object_cat in authz_buffer['object_assignments']:
_cat.append(authz_buffer['object_assignments'][object_cat])
- print("authz_buffer", authz_buffer)
- print("rule_list", rule_list)
- print("_cat", _cat)
for _element in itertools.product(*_cat):
# Add the boolean at the end
_element = list(_element)
_element.append(True)
- print("_element", _element)
if _element in rule_list:
return True
@@ -66,6 +63,13 @@ def comparison(authz_buffer, sub_meta_rule_dict, rule_list):
def all_true(decision_buffer):
for _rule in decision_buffer:
- if decision_buffer[_rule] is False:
+ if decision_buffer[_rule] == False:
return False
- return True \ No newline at end of file
+ return True
+
+
+def one_true(decision_buffer):
+ for _rule in decision_buffer:
+ if decision_buffer[_rule] == True:
+ return True
+ return False
diff --git a/keystone-moon/keystone/contrib/moon/backends/memory.py b/keystone-moon/keystone/contrib/moon/backends/memory.py
index 675240e5..7a996847 100644
--- a/keystone-moon/keystone/contrib/moon/backends/memory.py
+++ b/keystone-moon/keystone/contrib/moon/backends/memory.py
@@ -6,6 +6,7 @@
from uuid import uuid4
from glob import glob
import os
+import json
from keystone import config
from keystone.contrib.moon.core import ConfigurationDriver
@@ -19,12 +20,12 @@ class ConfigurationConnector(ConfigurationDriver):
super(ConfigurationConnector, self).__init__()
self.aggregation_algorithms_dict = dict()
self.aggregation_algorithms_dict[uuid4().hex] = {'name': 'all_true', 'description': 'all_true'}
+ self.aggregation_algorithms_dict[uuid4().hex] = {'name': 'one_true', 'description': 'one_true'}
self.sub_meta_rule_algorithms_dict = dict()
self.sub_meta_rule_algorithms_dict[uuid4().hex] = {'name': 'inclusion', 'description': 'inclusion'}
self.sub_meta_rule_algorithms_dict[uuid4().hex] = {'name': 'comparison', 'description': 'comparison'}
def get_policy_templates_dict(self):
- # TODO (dthom): this function should return a dictionary of all policy templates as:
"""
:return: {
template_id1: {name: template_name, description: template_description},
@@ -33,11 +34,15 @@ class ConfigurationConnector(ConfigurationDriver):
}
"""
nodes = glob(os.path.join(CONF.moon.policy_directory, "*"))
- return {
- "authz_templates": [os.path.basename(n) for n in nodes if os.path.isdir(n)]
- }
-
- def get_aggregation_algorithm_dict(self):
+ templates = dict()
+ for node in nodes:
+ templates[os.path.basename(node)] = dict()
+ metadata = json.load(open(os.path.join(node, "metadata.json")))
+ templates[os.path.basename(node)]["name"] = metadata["name"]
+ templates[os.path.basename(node)]["description"] = metadata["description"]
+ return templates
+
+ def get_aggregation_algorithms_dict(self):
return self.aggregation_algorithms_dict
def get_sub_meta_rule_algorithms_dict(self):
diff --git a/keystone-moon/keystone/contrib/moon/backends/sql.py b/keystone-moon/keystone/contrib/moon/backends/sql.py
index 7a75af39..cb64c1f7 100644
--- a/keystone-moon/keystone/contrib/moon/backends/sql.py
+++ b/keystone-moon/keystone/contrib/moon/backends/sql.py
@@ -887,7 +887,7 @@ class IntraExtensionConnector(IntraExtensionDriver):
def set_aggregation_algorithm_dict(self, intra_extension_id, aggregation_algorithm_id, aggregation_algorithm_dict):
with sql.transaction() as session:
query = session.query(AggregationAlgorithm)
- query = query.filter_by(intra_extension_id=intra_extension_id, id=aggregation_algorithm_id)
+ query = query.filter_by(intra_extension_id=intra_extension_id)
ref = query.first()
new_ref = AggregationAlgorithm.from_dict(
{
@@ -896,21 +896,18 @@ class IntraExtensionConnector(IntraExtensionDriver):
'intra_extension_id': intra_extension_id
}
)
- if not ref:
- session.add(new_ref)
- else:
- for attr in AggregationAlgorithm.attributes:
- if attr != 'id':
- setattr(ref, attr, getattr(new_ref, attr))
+ if ref:
+ session.delete(ref)
+ session.add(new_ref)
session.flush()
return self.get_aggregation_algorithm_dict(intra_extension_id)
- # def del_aggregation_algorithm(self, intra_extension_id, aggregation_algorithm_id):
- # with sql.transaction() as session:
- # query = session.query(AggregationAlgorithm)
- # query = query.filter_by(intra_extension_id=intra_extension_id, id=aggregation_algorithm_id)
- # ref = query.first()
- # session.delete(ref)
+ def del_aggregation_algorithm(self, intra_extension_id, aggregation_algorithm_id):
+ with sql.transaction() as session:
+ query = session.query(AggregationAlgorithm)
+ query = query.filter_by(intra_extension_id=intra_extension_id, id=aggregation_algorithm_id)
+ ref = query.first()
+ session.delete(ref)
# Getter and Setter for sub_meta_rule
diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py
index d82c9fcc..a1255fe2 100644
--- a/keystone-moon/keystone/contrib/moon/core.py
+++ b/keystone-moon/keystone/contrib/moon/core.py
@@ -25,9 +25,9 @@ from keystone.contrib.moon.algorithms import *
CONF = config.CONF
LOG = log.getLogger(__name__)
-ADMIN_ID = None # default user_id for internal invocation
-ROOT_EXTENSION_ID = None
-ROOT_EXTENSION_MODEL = "policy_root"
+# ADMIN_ID = None # default user_id for internal invocation
+# ROOT_EXTENSION_ID = None
+# ROOT_EXTENSION_MODEL = "policy_root"
_OPTS = [
@@ -52,9 +52,9 @@ _OPTS = [
cfg.StrOpt('policy_directory',
default='/etc/keystone/policies',
help='Local directory where all policies are stored.'),
- cfg.StrOpt('super_extension_directory',
- default='/etc/keystone/super_extension',
- help='Local directory where SuperExtension configuration is stored.'),
+ cfg.StrOpt('root_policy_directory',
+ default='policy_root',
+ help='Local directory where Root IntraExtension configuration is stored.'),
]
CONF.register_opts(_OPTS, group='moon')
@@ -108,29 +108,29 @@ def enforce(action_names, object_name, **extra):
_action_name_list = action_names
_object_name = object_name
- def get_root_extension(self, args, kwargs):
- if not ROOT_EXTENSION_ID:
- global ROOT_EXTENSION_MODEL, ROOT_EXTENSION_ID, ADMIN_ID
- try:
- # if it is the first time we passed here, the root extension may be not initialized
- # specially during unittest. So we raise RootExtensionNotInitialized to authorize the
- # current creation process
- if 'intra_extension_dict' in kwargs:
- intra_extension_dict = kwargs['intra_extension_dict']
- else:
- intra_extension_dict = args[2]
- if isinstance(intra_extension_dict, dict) and \
- "model" in intra_extension_dict and \
- intra_extension_dict["model"] == "policy_root":
- raise RootExtensionNotInitialized()
- except KeyError:
- pass
- return ROOT_EXTENSION_ID
+ # def get_root_extension(self, args, kwargs):
+ # if not ROOT_EXTENSION_ID:
+ # global ROOT_EXTENSION_MODEL, ROOT_EXTENSION_ID, ADMIN_ID
+ # try:
+ # # if it is the first time we passed here, the root extension may be not initialized
+ # # specially during unittest. So we raise RootExtensionNotInitialized to authorize the
+ # # current creation process
+ # if 'intra_extension_dict' in kwargs:
+ # intra_extension_dict = kwargs['intra_extension_dict']
+ # else:
+ # intra_extension_dict = args[2]
+ # if isinstance(intra_extension_dict, dict) and \
+ # "model" in intra_extension_dict and \
+ # intra_extension_dict["model"] == "policy_root":
+ # raise RootExtensionNotInitialized()
+ # except KeyError:
+ # pass
+ # return ROOT_EXTENSION_ID
def wrap(func):
def wrapped(*args, **kwargs):
- global ADMIN_ID, ROOT_EXTENSION_ID
+ # global ADMIN_ID, ROOT_EXTENSION_ID
returned_value_for_func = None
self = args[0]
try:
@@ -140,46 +140,42 @@ def enforce(action_names, object_name, **extra):
intra_extension_id = None
intra_admin_extension_id = None
- try:
- intra_root_extension_id = get_root_extension(self, args, kwargs)
- # FIXME (asteroide): intra_root_extension_id is not used at all...
- except RootExtensionNotInitialized:
- # Root extension is not initialized, the current requested function must be the creation
- # of this root extension
- returned_value_for_func = func(*args, **kwargs)
- # after the creation, we must update ROOT_EXTENSION_ID and ADMIN_ID
- intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
- for ext in intra_extensions_dict:
- if intra_extensions_dict[ext]["model"] == ROOT_EXTENSION_MODEL:
- ROOT_EXTENSION_ID = ext
- break
- if not ROOT_EXTENSION_ID:
- raise RootExtensionUnknown()
- subjects_dict = self.admin_api.driver.get_subjects_dict(returned_value_for_func['id'])
- for subject_id in subjects_dict:
- if subjects_dict[subject_id]["name"] == "admin":
- ADMIN_ID = subject_id
- break
- if not ADMIN_ID:
- raise RootExtensionUnknown()
- # if all is OK, return values from func (creation of the root extension)
- return returned_value_for_func
+ # try:
+ intra_root_extension_id = self.root_api.get_root_extension_id()
+ # except RootExtensionNotInitialized:
+ # # Root extension is not initialized, the current requested function must be the creation
+ # # of this root extension
+ # returned_value_for_func = func(*args, **kwargs)
+ # # after the creation, we must update ROOT_EXTENSION_ID and ADMIN_ID
+ # intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
+ # for ext in intra_extensions_dict:
+ # if intra_extensions_dict[ext]["model"] == ROOT_EXTENSION_MODEL:
+ # ROOT_EXTENSION_ID = ext
+ # break
+ # if not ROOT_EXTENSION_ID:
+ # raise RootExtensionUnknown()
+ # subjects_dict = self.admin_api.driver.get_subjects_dict(returned_value_for_func['id'])
+ # for subject_id in subjects_dict:
+ # if subjects_dict[subject_id]["name"] == "admin":
+ # ADMIN_ID = subject_id
+ # break
+ # if not ADMIN_ID:
+ # raise RootExtensionUnknown()
+ # # if all is OK, return values from func (creation of the root extension)
+ # return returned_value_for_func
try:
intra_extension_id = args[2]
except IndexError:
- print("IndexError", kwargs)
if 'intra_extension_id' in kwargs:
intra_extension_id = kwargs['intra_extension_id']
else:
- print("in else", intra_root_extension_id)
intra_extension_id = intra_root_extension_id
- if ADMIN_ID and user_id == ADMIN_ID:
+ if user_id == self.root_api.get_root_admin_id():
# TODO: check if there is no security hole here
returned_value_for_func = func(*args, **kwargs)
else:
intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
- print(intra_extension_id, intra_extensions_dict)
if intra_extension_id not in intra_extensions_dict:
raise IntraExtensionUnknown()
tenants_dict = self.tenant_api.driver.get_tenants_dict()
@@ -213,7 +209,10 @@ def enforce(action_names, object_name, **extra):
# if we found the object in intra_root_extension_id, so we change the intra_admin_extension_id
# into intra_root_extension_id and we modify the ID of the subject
subjects_dict = self.admin_api.driver.get_subjects_dict(intra_admin_extension_id)
- subject_name = subjects_dict[user_id]["name"]
+ try:
+ subject_name = subjects_dict[user_id]["name"]
+ except KeyError:
+ raise SubjectUnknown()
intra_admin_extension_id = intra_root_extension_id
subjects_dict = self.admin_api.driver.get_subjects_dict(intra_admin_extension_id)
user_id = None
@@ -221,7 +220,7 @@ def enforce(action_names, object_name, **extra):
if subjects_dict[_subject_id]["name"] == subject_name:
user_id = _subject_id
if not user_id:
- raise SubjectUnknown("Subject Unknown for Root intraExtension...")
+ raise SubjectUnknown("Subject {} Unknown for Root IntraExtension...".format(subject_name))
if type(_action_name_list) in (str, unicode):
action_name_list = (_action_name_list, )
else:
@@ -256,7 +255,7 @@ def enforce(action_names, object_name, **extra):
@dependency.provider('configuration_api')
-@dependency.requires('moonlog_api', 'admin_api', 'tenant_api')
+@dependency.requires('moonlog_api', 'admin_api', 'tenant_api', 'root_api')
class ConfigurationManager(manager.Manager):
def __init__(self):
@@ -278,7 +277,7 @@ class ConfigurationManager(manager.Manager):
def get_policy_template_id_from_name(self, user_id, policy_template_name):
policy_templates_dict = self.driver.get_policy_templates_dict()
for policy_template_id in policy_templates_dict:
- if policy_templates_dict[policy_template_id]['name'] is policy_template_name:
+ if policy_templates_dict[policy_template_id]['name'] == policy_template_name:
return policy_template_id
return None
@@ -298,7 +297,7 @@ class ConfigurationManager(manager.Manager):
def get_aggregation_algorithm_id_from_name(self, user_id, aggregation_algorithm_name):
aggregation_algorithms_dict = self.driver.get_aggregation_algorithms_dict()
for aggregation_algorithm_id in aggregation_algorithms_dict:
- if aggregation_algorithms_dict[aggregation_algorithm_id]['name'] is aggregation_algorithm_name:
+ if aggregation_algorithms_dict[aggregation_algorithm_id]['name'] == aggregation_algorithm_name:
return aggregation_algorithm_id
return None
@@ -318,13 +317,13 @@ class ConfigurationManager(manager.Manager):
def get_sub_meta_rule_algorithm_id_from_name(self, sub_meta_rule_algorithm_name):
sub_meta_rule_algorithms_dict = self.driver.get_sub_meta_rule_algorithms_dict()
for sub_meta_rule_algorithm_id in sub_meta_rule_algorithms_dict:
- if sub_meta_rule_algorithms_dict[sub_meta_rule_algorithm_id]['name'] is sub_meta_rule_algorithm_name:
+ if sub_meta_rule_algorithms_dict[sub_meta_rule_algorithm_id]['name'] == sub_meta_rule_algorithm_name:
return sub_meta_rule_algorithm_id
return None
@dependency.provider('tenant_api')
-@dependency.requires('moonlog_api', 'admin_api', 'configuration_api')
+@dependency.requires('moonlog_api', 'admin_api', 'configuration_api', 'root_api', 'resource_api')
class TenantManager(manager.Manager):
def __init__(self):
@@ -348,38 +347,66 @@ class TenantManager(manager.Manager):
"""
return self.driver.get_tenants_dict()
+ def __get_keystone_tenant_dict(self, tenant_id="", tenant_name=""):
+ tenants = self.resource_api.list_projects()
+ for tenant in tenants:
+ if tenant_id and tenant_id == tenant['id']:
+ return tenant
+ if tenant_name and tenant_name == tenant['name']:
+ return tenant
+ if not tenant_id:
+ tenant_id = uuid4().hex
+ if not tenant_name:
+ tenant_name = tenant_id
+ tenant = {
+ "id": tenant_id,
+ "name": tenant_name,
+ "description": "Auto generated tenant from Moon platform",
+ "enabled": True,
+ "domain_id": "default"
+ }
+ keystone_tenant = self.resource_api.create_project(tenant["id"], tenant)
+ return keystone_tenant
+
@filter_input
@enforce(("read", "write"), "tenants")
def add_tenant_dict(self, user_id, tenant_dict):
tenants_dict = self.driver.get_tenants_dict()
for tenant_id in tenants_dict:
- if tenants_dict[tenant_id]['name'] is tenant_dict['name']:
+ if tenants_dict[tenant_id]['name'] == tenant_dict['name']:
raise TenantAddedNameExisting()
+ # Check (and eventually sync) Keystone tenant
+ if 'id' not in tenant_dict:
+ tenant_dict['id'] = None
+ keystone_tenant = self.__get_keystone_tenant_dict(tenant_dict['id'], tenant_dict['name'])
+ tenant_dict.update(keystone_tenant)
# Sync users between intra_authz_extension and intra_admin_extension
if tenant_dict['intra_admin_extension_id']:
if not tenant_dict['intra_authz_extension_id']:
raise TenantNoIntraAuthzExtension()
- authz_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_authz_extension_id'])
- admin_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_admin_extension_id'])
- for _subject_id in authz_subjects_dict:
- if _subject_id not in admin_subjects_dict:
- self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_admin_extension_id'], authz_subjects_dict[_subject_id])
- for _subject_id in admin_subjects_dict:
- if _subject_id not in authz_subjects_dict:
- self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_authz_extension_id'], admin_subjects_dict[_subject_id])
-
- # TODO (dthom): check whether we can replace the below code by the above one
- # authz_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_authz_extension_id'])
- # authz_subject_names_list = [authz_subjects_dict[subject_id]["name"] for subject_id in authz_subjects_dict]
- # admin_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_admin_extension_id'])
- # admin_subject_names_list = [admin_subjects_dict[subject_id]["name"] for subject_id in admin_subjects_dict]
+ # authz_subjects_dict = self.admin_api.get_subjects_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_authz_extension_id'])
+ # admin_subjects_dict = self.admin_api.get_subjects_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_admin_extension_id'])
# for _subject_id in authz_subjects_dict:
- # if authz_subjects_dict[_subject_id]["name"] not in admin_subject_names_list:
- # self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_admin_extension_id'], authz_subjects_dict[_subject_id])
+ # if _subject_id not in admin_subjects_dict:
+ # self.admin_api.add_subject_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_admin_extension_id'], authz_subjects_dict[_subject_id])
# for _subject_id in admin_subjects_dict:
- # if admin_subjects_dict[_subject_id]["name"] not in authz_subject_names_list:
- # self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_authz_extension_id'], admin_subjects_dict[_subject_id])
+ # if _subject_id not in authz_subjects_dict:
+ # self.admin_api.add_subject_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_authz_extension_id'], admin_subjects_dict[_subject_id])
+
+ # TODO (ateroide): check whether we can replace the below code by the above one
+ # NOTE (ateroide): at a first glance: no, subject_id changes depending on which intra_extesion is used
+ # we must use name which is constant.
+ authz_subjects_dict = self.admin_api.get_subjects_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_authz_extension_id'])
+ authz_subject_names_list = [authz_subjects_dict[subject_id]["name"] for subject_id in authz_subjects_dict]
+ admin_subjects_dict = self.admin_api.get_subjects_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_admin_extension_id'])
+ admin_subject_names_list = [admin_subjects_dict[subject_id]["name"] for subject_id in admin_subjects_dict]
+ for _subject_id in authz_subjects_dict:
+ if authz_subjects_dict[_subject_id]["name"] not in admin_subject_names_list:
+ self.admin_api.add_subject_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_admin_extension_id'], authz_subjects_dict[_subject_id])
+ for _subject_id in admin_subjects_dict:
+ if admin_subjects_dict[_subject_id]["name"] not in authz_subject_names_list:
+ self.admin_api.add_subject_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_authz_extension_id'], admin_subjects_dict[_subject_id])
return self.driver.add_tenant_dict(tenant_dict['id'], tenant_dict)
@@ -409,50 +436,20 @@ class TenantManager(manager.Manager):
if tenant_dict['intra_admin_extension_id']:
if not tenant_dict['intra_authz_extension_id']:
raise TenantNoIntraAuthzExtension
- authz_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_authz_extension_id'])
- admin_subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, tenant_dict['intra_admin_extension_id'])
+ authz_subjects_dict = self.admin_api.get_subjects_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_authz_extension_id'])
+ authz_subject_names_list = [authz_subjects_dict[subject_id]["name"] for subject_id in authz_subjects_dict]
+ admin_subjects_dict = self.admin_api.get_subjects_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_admin_extension_id'])
+ admin_subject_names_list = [admin_subjects_dict[subject_id]["name"] for subject_id in admin_subjects_dict]
for _subject_id in authz_subjects_dict:
- if _subject_id not in admin_subjects_dict:
- self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_admin_extension_id'], authz_subjects_dict[_subject_id])
+ if authz_subjects_dict[_subject_id]["name"] not in admin_subject_names_list:
+ self.admin_api.add_subject_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_admin_extension_id'], authz_subjects_dict[_subject_id])
for _subject_id in admin_subjects_dict:
- if _subject_id not in authz_subjects_dict:
- self.admin_api.add_subject_dict(ADMIN_ID, tenant_dict['intra_authz_extension_id'], admin_subjects_dict[_subject_id])
+ if admin_subjects_dict[_subject_id]["name"] not in authz_subject_names_list:
+ self.admin_api.add_subject_dict(self.root_api.get_root_admin_id(), tenant_dict['intra_authz_extension_id'], admin_subjects_dict[_subject_id])
return self.driver.set_tenant_dict(tenant_id, tenant_dict)
- # TODO (dthom): move the following 2 functions to perimeter functions
- @filter_input
- def get_subject_dict_from_keystone_id(self, tenant_id, intra_extension_id, keystone_id):
- tenants_dict = self.driver.get_tenants_dict()
- if tenant_id not in tenants_dict:
- raise TenantUnknown()
- if intra_extension_id not in (tenants_dict[tenant_id]['intra_authz_extension_id'],
- tenants_dict[tenant_id]['intra_admin_extension_id'], ):
- raise IntraExtensionUnknown()
- # Note (asteroide): We used ADMIN_ID because the user requesting this information may only know his keystone_id
- # and not the subject ID in the requested intra_extension.
- subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, intra_extension_id)
- for subject_id in subjects_dict:
- if keystone_id is subjects_dict[subject_id]['keystone_id']:
- return {subject_id: subjects_dict[subject_id]}
-
- @filter_input
- def get_subject_dict_from_keystone_name(self, tenant_id, intra_extension_id, keystone_name):
- tenants_dict = self.driver.get_tenants_dict()
- if tenant_id not in tenants_dict:
- raise TenantUnknown()
- if intra_extension_id not in (tenants_dict[tenant_id]['intra_authz_extension_id'],
- tenants_dict[tenant_id]['intra_admin_extension_id'], ):
- raise IntraExtensionUnknown()
- # Note (asteroide): We used ADMIN_ID because the user requesting this information may only know his
- # keystone_name and not the subject ID in the requested intra_extension.
- subjects_dict = self.admin_api.get_subjects_dict(ADMIN_ID, intra_extension_id)
- for subject_id in subjects_dict:
- if keystone_name is subjects_dict[subject_id]['keystone_name']:
- return {subject_id: subjects_dict[subject_id]}
-
-
-@dependency.requires('identity_api', 'tenant_api', 'configuration_api', 'authz_api', 'admin_api', 'moonlog_api')
+@dependency.requires('identity_api', 'tenant_api', 'configuration_api', 'authz_api', 'admin_api', 'moonlog_api', 'root_api')
class IntraExtensionManager(manager.Manager):
def __init__(self):
@@ -501,6 +498,7 @@ class IntraExtensionManager(manager.Manager):
authz_buffer['subject_assignments'] = dict()
authz_buffer['object_assignments'] = dict()
authz_buffer['action_assignments'] = dict()
+
for _subject_category in meta_data_dict['subject_categories']:
authz_buffer['subject_assignments'][_subject_category] = list(subject_assignment_dict[_subject_category])
for _object_category in meta_data_dict['object_categories']:
@@ -543,6 +541,8 @@ class IntraExtensionManager(manager.Manager):
aggregation_algorithm_id = aggregation_algorithm_dict.keys()[0]
if aggregation_algorithm_dict[aggregation_algorithm_id]['name'] == 'all_true':
decision = all_true(decision_buffer)
+ elif aggregation_algorithm_dict[aggregation_algorithm_id]['name'] == 'one_true':
+ decision = one_true(decision_buffer)
if not decision:
raise AuthzException("{} {}-{}-{}".format(intra_extension_id, subject_id, action_id, object_id))
return decision
@@ -607,7 +607,12 @@ class IntraExtensionManager(manager.Manager):
subject_dict = dict()
# We suppose that all subjects can be mapped to a true user in Keystone
for _subject in json_perimeter['subjects']:
- keystone_user = self.identity_api.get_user_by_name(_subject, "default")
+ try:
+ keystone_user = self.identity_api.get_user_by_name(_subject, "default")
+ except exception.UserNotFound:
+ # TODO (asteroide): must add a configuration option to allow that exception
+ # maybe a debug option for unittest
+ keystone_user = {'id': "", 'name': _subject}
subject_id = uuid4().hex
subject_dict[subject_id] = keystone_user
subject_dict[subject_id]['keystone_id'] = keystone_user["id"]
@@ -774,8 +779,6 @@ class IntraExtensionManager(manager.Manager):
sub_rule_id = self.driver.get_uuid_from_name(intra_extension_dict["id"],
sub_rule_name,
self.driver.SUB_META_RULE)
- # print(sub_rule_name)
- # print(self.get_sub_meta_rule_relations("admin", ie["id"]))
# if sub_rule_name not in self.get_sub_meta_rule_relations("admin", ie["id"])["sub_meta_rule_relations"]:
# raise IntraExtensionException("Bad sub_rule_name name {} in rules".format(sub_rule_name))
rules[sub_rule_id] = list()
@@ -833,6 +836,32 @@ class IntraExtensionManager(manager.Manager):
self.__load_rule_file(ie_dict, template_dir)
return ref
+ def load_root_intra_extension_dict(self, policy_template):
+ # Note (asteroide): Only one root Extension is authorized
+ # and this extension is created at the very beginning of the server
+ # so we don't need to use enforce here
+ for key in self.driver.get_intra_extensions_dict():
+ # Note (asteroide): if there is at least one Intra Extension, it implies that
+ # the Root Intra Extension had already been created...
+ return
+ ie_dict = dict()
+ ie_dict['id'] = uuid4().hex
+ ie_dict["name"] = "policy_root"
+ ie_dict["model"] = filter_input(policy_template)
+ ie_dict["genre"] = "admin"
+ ie_dict["description"] = "policy_root"
+ ref = self.driver.set_intra_extension_dict(ie_dict['id'], ie_dict)
+ self.moonlog_api.debug("Creation of IE: {}".format(ref))
+ # read the template given by "model" and populate default variables
+ template_dir = os.path.join(CONF.moon.policy_directory, ie_dict["model"])
+ self.__load_metadata_file(ie_dict, template_dir)
+ self.__load_perimeter_file(ie_dict, template_dir)
+ self.__load_scope_file(ie_dict, template_dir)
+ self.__load_assignment_file(ie_dict, template_dir)
+ self.__load_metarule_file(ie_dict, template_dir)
+ self.__load_rule_file(ie_dict, template_dir)
+ return ref
+
@enforce("read", "intra_extensions")
def get_intra_extension_dict(self, user_id, intra_extension_id):
"""
@@ -858,7 +887,7 @@ class IntraExtensionManager(manager.Manager):
for rule_id in self.driver.get_rules_dict(intra_extension_id, sub_meta_rule_id):
self.driver.del_rule(intra_extension_id, sub_meta_rule_id, rule_id)
self.driver.del_sub_meta_rule(intra_extension_id, sub_meta_rule_id)
- for aggregation_algorithm_id in self.driver.get_aggregation_algorithms_dict(intra_extension_id):
+ for aggregation_algorithm_id in self.driver.get_aggregation_algorithm_dict(intra_extension_id):
self.driver.del_aggregation_algorithm(intra_extension_id, aggregation_algorithm_id)
for subject_id in self.driver.get_subjects_dict(intra_extension_id):
self.driver.del_subject(intra_extension_id, subject_id)
@@ -1049,6 +1078,7 @@ class IntraExtensionManager(manager.Manager):
def add_subject_dict(self, user_id, intra_extension_id, subject_dict):
subjects_dict = self.driver.get_subjects_dict(intra_extension_id)
for subject_id in subjects_dict:
+ print(subjects_dict[subject_id]["name"], subject_dict['name'])
if subjects_dict[subject_id]["name"] == subject_dict['name']:
raise SubjectNameExisting()
# Next line will raise an error if user is not present in Keystone database
@@ -1091,6 +1121,37 @@ class IntraExtensionManager(manager.Manager):
return self.driver.set_subject_dict(intra_extension_id, subject_dict["id"], subject_dict)
@filter_input
+ def get_subject_dict_from_keystone_id(self, tenant_id, intra_extension_id, keystone_id):
+ tenants_dict = self.tenant_api.driver.get_tenants_dict()
+ if tenant_id not in tenants_dict:
+ raise TenantUnknown()
+ if intra_extension_id not in (tenants_dict[tenant_id]['intra_authz_extension_id'],
+ tenants_dict[tenant_id]['intra_admin_extension_id'], ):
+ raise IntraExtensionUnknown()
+ # Note (asteroide): We used self.root_api.get_root_admin_id() because the user requesting this information
+ # may only know his keystone_id and not the subject ID in the requested intra_extension.
+ subjects_dict = self.get_subjects_dict(self.root_api.get_root_admin_id(), intra_extension_id)
+ for subject_id in subjects_dict:
+ if keystone_id == subjects_dict[subject_id]['keystone_id']:
+ return {subject_id: subjects_dict[subject_id]}
+
+ @filter_input
+ def get_subject_dict_from_keystone_name(self, tenant_id, intra_extension_id, keystone_name):
+ tenants_dict = self.tenant_api.driver.get_tenants_dict()
+ if tenant_id not in tenants_dict:
+ raise TenantUnknown()
+ if intra_extension_id not in (tenants_dict[tenant_id]['intra_authz_extension_id'],
+ tenants_dict[tenant_id]['intra_admin_extension_id'], ):
+ raise IntraExtensionUnknown()
+ # Note (asteroide): We used self.root_api.get_root_admin_id() because the user requesting this information
+ # may only know his keystone_name and not the subject ID in the requested intra_extension.
+ subjects_dict = self.get_subjects_dict(self.root_api.get_root_admin_id(), intra_extension_id)
+ for subject_id in subjects_dict:
+ if keystone_name == subjects_dict[subject_id]['keystone_name']:
+ return {subject_id: subjects_dict[subject_id]}
+
+
+ @filter_input
@enforce("read", "objects")
def get_objects_dict(self, user_id, intra_extension_id):
return self.driver.get_objects_dict(intra_extension_id)
@@ -1539,7 +1600,7 @@ class IntraExtensionManager(manager.Manager):
@enforce(("read", "write"), "aggregation_algorithm")
def set_aggregation_algorithm_dict(self, user_id, intra_extension_id, aggregation_algorithm_id, aggregation_algorithm_dict):
if aggregation_algorithm_id:
- if aggregation_algorithm_id not in self.configuration_api.get_aggregation_algorithms_dict(ADMIN_ID):
+ if aggregation_algorithm_id not in self.configuration_api.get_aggregation_algorithms_dict(self.root_api.get_root_admin_id()):
raise AggregationAlgorithmUnknown()
else:
aggregation_algorithm_id = uuid4().hex
@@ -1577,7 +1638,9 @@ class IntraExtensionManager(manager.Manager):
sub_meta_rule_dict['action_categories'] == sub_meta_rules_dict[_sub_meta_rule_id]["action_categories"] and \
sub_meta_rule_dict['algorithm'] == sub_meta_rules_dict[_sub_meta_rule_id]["algorithm"]:
raise SubMetaRuleExisting()
- if sub_meta_rule_dict['algorithm'] not in self.configuration_api.get_sub_meta_rule_algorithms_dict(user_id):
+ algorithm_names = map(lambda x: x['name'],
+ self.configuration_api.get_sub_meta_rule_algorithms_dict(user_id).values())
+ if sub_meta_rule_dict['algorithm'] not in algorithm_names:
raise SubMetaRuleAlgorithmNotExisting()
sub_meta_rule_id = uuid4().hex
# TODO (dthom): add new sub-meta-rule to rule dict
@@ -1682,10 +1745,10 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
elif genre == "admin":
genre = "intra_admin_extension_id"
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
tenant_id = None
for _tenant_id in tenants_dict:
- if tenants_dict[_tenant_id]["name"] is tenant_name:
+ if tenants_dict[_tenant_id]["name"] == tenant_name:
tenant_id = _tenant_id
break
if not tenant_id:
@@ -1697,8 +1760,9 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
subjects_dict = self.driver.get_subjects_dict(intra_extension_id)
subject_id = None
for _subject_id in subjects_dict:
- if subjects_dict[_subject_id]['keystone_name'] is subject_name:
- subject_id = subjects_dict[_subject_id]['keystone_id']
+ if subjects_dict[_subject_id]['keystone_name'] == subject_name:
+ # subject_id = subjects_dict[_subject_id]['keystone_id']
+ subject_id = _subject_id
break
if not subject_id:
raise SubjectUnknown()
@@ -1725,7 +1789,7 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
def add_subject_dict(self, user_id, intra_extension_id, subject_dict):
subject = super(IntraExtensionAuthzManager, self).add_subject_dict(user_id, intra_extension_id, subject_dict)
subject_id, subject_value = subject.iteritems().next()
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
for tenant_id in tenants_dict:
if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id:
_subjects = self.driver.get_subjects_dict(tenants_dict[tenant_id]["intra_admin_extension_id"])
@@ -1742,7 +1806,7 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
def del_subject(self, user_id, intra_extension_id, subject_id):
subject_name = self.driver.get_subjects_dict(intra_extension_id)[subject_id]["name"]
super(IntraExtensionAuthzManager, self).del_subject(user_id, intra_extension_id, subject_id)
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
for tenant_id in tenants_dict:
if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id:
subject_id = self.driver.get_uuid_from_name(tenants_dict[tenant_id]["intra_admin_extension_id"],
@@ -1760,7 +1824,7 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
def set_subject_dict(self, user_id, intra_extension_id, subject_id, subject_dict):
subject = super(IntraExtensionAuthzManager, self).set_subject_dict(user_id, intra_extension_id, subject_dict)
subject_id, subject_value = subject.iteritems().next()
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
for tenant_id in tenants_dict:
if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id:
self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_admin_extension_id"], uuid4().hex, subject_value)
@@ -1770,110 +1834,110 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
break
return subject
- # def add_subject_category(self, user_id, intra_extension_id, subject_category_dict):
- # raise AuthzException()
- #
- # def del_subject_category(self, user_id, intra_extension_id, subject_category_id):
- # raise AuthzException()
- #
- # def set_subject_category(self, user_id, intra_extension_id, subject_category_id, subject_category_dict):
- # raise AuthzException()
- #
- # def add_object_category(self, user_id, intra_extension_id, object_category_dict):
- # raise AuthzException()
- #
- # def del_object_category(self, user_id, intra_extension_id, object_category_id):
- # raise AuthzException()
- #
- # def add_action_category(self, user_id, intra_extension_id, action_category_name):
- # raise AuthzException()
- #
- # def del_action_category(self, user_id, intra_extension_id, action_category_id):
- # raise AuthzException()
- #
- # def add_object_dict(self, user_id, intra_extension_id, object_name):
- # raise AuthzException()
- #
- # def set_object_dict(self, user_id, intra_extension_id, object_id, object_dict):
- # raise AuthzException()
- #
- # def del_object(self, user_id, intra_extension_id, object_id):
- # raise AuthzException()
- #
- # def add_action_dict(self, user_id, intra_extension_id, action_name):
- # raise AuthzException()
- #
- # def set_action_dict(self, user_id, intra_extension_id, action_id, action_dict):
- # raise AuthzException()
- #
- # def del_action(self, user_id, intra_extension_id, action_id):
- # raise AuthzException()
- #
- # def add_subject_scope_dict(self, user_id, intra_extension_id, subject_category_id, subject_scope_dict):
- # raise AuthzException()
- #
- # def del_subject_scope(self, user_id, intra_extension_id, subject_category_id, subject_scope_id):
- # raise AuthzException()
- #
- # def set_subject_scope_dict(self, user_id, intra_extension_id, subject_category_id, subject_scope_id, subject_scope_name):
- # raise AuthzException()
- #
- # def add_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_name):
- # raise AuthzException()
- #
- # def del_object_scope(self, user_id, intra_extension_id, object_category_id, object_scope_id):
- # raise AuthzException()
- #
- # def set_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_id, object_scope_name):
- # raise AuthzException()
- #
- # def add_action_scope_dict(self, user_id, intra_extension_id, action_category_id, action_scope_name):
- # raise AuthzException()
- #
- # def del_action_scope(self, user_id, intra_extension_id, action_category_id, action_scope_id):
- # raise AuthzException()
- #
- # def add_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id):
- # raise AuthzException()
- #
- # def del_subject_assignment(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id):
- # raise AuthzException()
- #
- # def add_object_assignment_list(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id):
- # raise AuthzException()
- #
- # def del_object_assignment(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id):
- # raise AuthzException()
- #
- # def add_action_assignment_list(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id):
- # raise AuthzException()
- #
- # def del_action_assignment(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id):
- # raise AuthzException()
- #
- # def set_aggregation_algorithm_dict(self, user_id, intra_extension_id, aggregation_algorithm_id, aggregation_algorithm_dict):
- # raise AuthzException()
- #
- # def del_aggregation_algorithm_dict(self, user_id, intra_extension_id, aggregation_algorithm_id):
- # raise AuthzException()
- #
- # def add_sub_meta_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_dict):
- # raise AuthzException()
- #
- # def del_sub_meta_rule(self, user_id, intra_extension_id, sub_meta_rule_id):
- # raise AuthzException()
- #
- # def set_sub_meta_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_id, sub_meta_rule_dict):
- # raise AuthzException()
- #
- # def add_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_id, rule_list):
- # raise AuthzException()
- #
- # def del_rule(self, user_id, intra_extension_id, sub_meta_rule_id, rule_id):
- # raise AuthzException()
- #
- # def set_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_id, rule_id, rule_list):
- # raise AuthzException()
+ def add_subject_category(self, user_id, intra_extension_id, subject_category_dict):
+ raise AuthzException()
+
+ def del_subject_category(self, user_id, intra_extension_id, subject_category_id):
+ raise AuthzException()
+
+ def set_subject_category(self, user_id, intra_extension_id, subject_category_id, subject_category_dict):
+ raise AuthzException()
+
+ def add_object_category(self, user_id, intra_extension_id, object_category_dict):
+ raise AuthzException()
+
+ def del_object_category(self, user_id, intra_extension_id, object_category_id):
+ raise AuthzException()
+
+ def add_action_category(self, user_id, intra_extension_id, action_category_name):
+ raise AuthzException()
+
+ def del_action_category(self, user_id, intra_extension_id, action_category_id):
+ raise AuthzException()
+
+ def add_object_dict(self, user_id, intra_extension_id, object_name):
+ raise AuthzException()
+
+ def set_object_dict(self, user_id, intra_extension_id, object_id, object_dict):
+ raise AuthzException()
+
+ def del_object(self, user_id, intra_extension_id, object_id):
+ raise AuthzException()
+
+ def add_action_dict(self, user_id, intra_extension_id, action_name):
+ raise AuthzException()
+
+ def set_action_dict(self, user_id, intra_extension_id, action_id, action_dict):
+ raise AuthzException()
+
+ def del_action(self, user_id, intra_extension_id, action_id):
+ raise AuthzException()
+
+ def add_subject_scope_dict(self, user_id, intra_extension_id, subject_category_id, subject_scope_dict):
+ raise AuthzException()
+
+ def del_subject_scope(self, user_id, intra_extension_id, subject_category_id, subject_scope_id):
+ raise AuthzException()
+
+ def set_subject_scope_dict(self, user_id, intra_extension_id, subject_category_id, subject_scope_id, subject_scope_name):
+ raise AuthzException()
+
+ def add_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_name):
+ raise AuthzException()
+
+ def del_object_scope(self, user_id, intra_extension_id, object_category_id, object_scope_id):
+ raise AuthzException()
+
+ def set_object_scope_dict(self, user_id, intra_extension_id, object_category_id, object_scope_id, object_scope_name):
+ raise AuthzException()
+
+ def add_action_scope_dict(self, user_id, intra_extension_id, action_category_id, action_scope_name):
+ raise AuthzException()
+
+ def del_action_scope(self, user_id, intra_extension_id, action_category_id, action_scope_id):
+ raise AuthzException()
+
+ def add_subject_assignment_list(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id):
+ raise AuthzException()
+
+ def del_subject_assignment(self, user_id, intra_extension_id, subject_id, subject_category_id, subject_scope_id):
+ raise AuthzException()
+
+ def add_object_assignment_list(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id):
+ raise AuthzException()
+
+ def del_object_assignment(self, user_id, intra_extension_id, object_id, object_category_id, object_scope_id):
+ raise AuthzException()
+
+ def add_action_assignment_list(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id):
+ raise AuthzException()
+
+ def del_action_assignment(self, user_id, intra_extension_id, action_id, action_category_id, action_scope_id):
+ raise AuthzException()
+
+ def set_aggregation_algorithm_dict(self, user_id, intra_extension_id, aggregation_algorithm_id, aggregation_algorithm_dict):
+ raise AuthzException()
+
+ def del_aggregation_algorithm_dict(self, user_id, intra_extension_id, aggregation_algorithm_id):
+ raise AuthzException()
+
+ def add_sub_meta_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_dict):
+ raise AuthzException()
+
+ def del_sub_meta_rule(self, user_id, intra_extension_id, sub_meta_rule_id):
+ raise AuthzException()
+
+ def set_sub_meta_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_id, sub_meta_rule_dict):
+ raise AuthzException()
+
+ def add_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_id, rule_list):
+ raise AuthzException()
+
+ def del_rule(self, user_id, intra_extension_id, sub_meta_rule_id, rule_id):
+ raise AuthzException()
+
+ def set_rule_dict(self, user_id, intra_extension_id, sub_meta_rule_id, rule_id, rule_list):
+ raise AuthzException()
@dependency.provider('admin_api')
@@ -1885,7 +1949,7 @@ class IntraExtensionAdminManager(IntraExtensionManager):
def add_subject_dict(self, user_id, intra_extension_id, subject_dict):
subject = super(IntraExtensionAdminManager, self).add_subject_dict(user_id, intra_extension_id, subject_dict)
subject_id, subject_value = subject.iteritems().next()
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
for tenant_id in tenants_dict:
if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id:
_subjects = self.driver.get_subjects_dict(tenants_dict[tenant_id]["intra_admin_extension_id"])
@@ -1902,7 +1966,7 @@ class IntraExtensionAdminManager(IntraExtensionManager):
def del_subject(self, user_id, intra_extension_id, subject_id):
subject_name = self.driver.get_subjects_dict(intra_extension_id)[subject_id]["name"]
super(IntraExtensionAdminManager, self).del_subject(user_id, intra_extension_id, subject_id)
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
for tenant_id in tenants_dict:
if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id:
subject_id = self.driver.get_uuid_from_name(tenants_dict[tenant_id]["intra_admin_extension_id"],
@@ -1920,7 +1984,7 @@ class IntraExtensionAdminManager(IntraExtensionManager):
def set_subject_dict(self, user_id, intra_extension_id, subject_id, subject_dict):
subject = super(IntraExtensionAdminManager, self).set_subject_dict(user_id, intra_extension_id, subject_dict)
subject_id, subject_value = subject.iteritems().next()
- tenants_dict = self.tenant_api.get_tenants_dict(ADMIN_ID)
+ tenants_dict = self.tenant_api.get_tenants_dict(self.root_api.get_root_admin_id())
for tenant_id in tenants_dict:
if tenants_dict[tenant_id]["intra_authz_extension_id"] == intra_extension_id:
self.driver.set_subject_dict(tenants_dict[tenant_id]["intra_admin_extension_id"], uuid4().hex, subject_value)
@@ -1931,27 +1995,74 @@ class IntraExtensionAdminManager(IntraExtensionManager):
return subject
def add_object_dict(self, user_id, intra_extension_id, object_name):
- raise ObjectsWriteNoAuthorized()
+ if "admin" == self.get_intra_extension_dict(self.root_api.get_root_admin_id(), intra_extension_id)['genre']:
+ raise ObjectsWriteNoAuthorized()
+ return super(IntraExtensionAdminManager, self).add_object_dict(user_id, intra_extension_id, object_name)
def set_object_dict(self, user_id, intra_extension_id, object_id, object_dict):
- raise ObjectsWriteNoAuthorized()
+ if "admin" == self.get_intra_extension_dict(self.root_api.get_root_admin_id(), intra_extension_id)['genre']:
+ raise ObjectsWriteNoAuthorized()
+ return super(IntraExtensionAdminManager, self).set_object_dict(user_id, intra_extension_id, object_id, object_dict)
def del_object(self, user_id, intra_extension_id, object_id):
- raise ObjectsWriteNoAuthorized()
+ if "admin" == self.get_intra_extension_dict(self.root_api.get_root_admin_id(), intra_extension_id)['genre']:
+ raise ObjectsWriteNoAuthorized()
+ return super(IntraExtensionAdminManager, self).del_object(user_id, intra_extension_id, object_id)
def add_action_dict(self, user_id, intra_extension_id, action_name):
- raise ActionsWriteNoAuthorized()
+ if "admin" == self.get_intra_extension_dict(self.root_api.get_root_admin_id(), intra_extension_id)['genre']:
+ raise ActionsWriteNoAuthorized()
+ return super(IntraExtensionAdminManager, self).add_action_dict(user_id, intra_extension_id, action_name)
def set_action_dict(self, user_id, intra_extension_id, action_id, action_dict):
- raise ActionsWriteNoAuthorized()
+ if "admin" == self.get_intra_extension_dict(self.root_api.get_root_admin_id(), intra_extension_id)['genre']:
+ raise ActionsWriteNoAuthorized()
+ return super(IntraExtensionAdminManager, self).set_action_dict(user_id, intra_extension_id, action_id, action_dict)
def del_action(self, user_id, intra_extension_id, action_id):
- raise ActionsWriteNoAuthorized()
+ if "admin" == self.get_intra_extension_dict(self.root_api.get_root_admin_id(), intra_extension_id)['genre']:
+ raise ActionsWriteNoAuthorized()
+ return super(IntraExtensionAdminManager, self).del_action(user_id, intra_extension_id, action_id)
+
+
+@dependency.provider('root_api')
+@dependency.requires('moonlog_api', 'admin_api', 'tenant_api')
+class IntraExtensionRootManager(IntraExtensionManager):
+
+ def __init__(self):
+ super(IntraExtensionRootManager, self).__init__()
+ extensions = self.admin_api.driver.get_intra_extensions_dict()
+ for extension_id, extension_dict in extensions.iteritems():
+ if extension_dict["model"] == CONF.moon.root_policy_directory:
+ self.root_extension_id = extension_id
+ else:
+ extension = self.admin_api.load_root_intra_extension_dict(CONF.moon.root_policy_directory)
+ self.root_extension_id = extension['id']
+ self.root_admin_id = self.__compute_admin_id_for_root_extension()
+
+ def get_root_extension_dict(self):
+ """
+
+ :return: {id: {"name": "xxx"}}
+ """
+ return {self.root_extension_id: self.admin_api.driver.get_intra_extensions_dict()[self.root_extension_id]}
+
+ def __compute_admin_id_for_root_extension(self):
+ for subject_id, subject_dict in self.admin_api.driver.get_subjects_dict(self.root_extension_id).iteritems():
+ if subject_dict["name"] == "admin":
+ return subject_id
+ raise RootExtensionNotInitialized()
+
+ def get_root_extension_id(self):
+ return self.root_extension_id
+
+ def get_root_admin_id(self):
+ return self.root_admin_id
@dependency.provider('moonlog_api')
# Next line is mandatory in order to force keystone to process dependencies.
-@dependency.requires('identity_api', 'tenant_api', 'configuration_api', 'authz_api', 'admin_api')
+@dependency.requires('identity_api', 'tenant_api', 'configuration_api', 'authz_api', 'admin_api', 'root_api')
class LogManager(manager.Manager):
def __init__(self):
diff --git a/keystone-moon/keystone/contrib/moon/extension.py b/keystone-moon/keystone/contrib/moon/extension.py
deleted file mode 100644
index efee55c5..00000000
--- a/keystone-moon/keystone/contrib/moon/extension.py
+++ /dev/null
@@ -1,740 +0,0 @@
-# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
-# This software is distributed under the terms and conditions of the 'Apache-2.0'
-# license which can be found in the file 'LICENSE' in this package distribution
-# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
-
-import os.path
-import copy
-import json
-import itertools
-from uuid import uuid4
-import logging
-
-LOG = logging.getLogger("moon.authz")
-
-
-class Metadata:
-
- def __init__(self):
- self.__name = ''
- self.__model = ''
- self.__genre = ''
- self.__description = ''
- self.__subject_categories = list()
- self.__object_categories = list()
- self.__meta_rule = dict()
- self.__meta_rule['sub_meta_rules'] = list()
- self.__meta_rule['aggregation'] = ''
-
- def load_from_json(self, extension_setting_dir):
- metadata_path = os.path.join(extension_setting_dir, 'metadata.json')
- f = open(metadata_path)
- json_metadata = json.load(f)
- self.__name = json_metadata['name']
- self.__model = json_metadata['model']
- self.__genre = json_metadata['genre']
- self.__description = json_metadata['description']
- self.__subject_categories = copy.deepcopy(json_metadata['subject_categories'])
- self.__object_categories = copy.deepcopy(json_metadata['object_categories'])
- self.__meta_rule = copy.deepcopy(json_metadata['meta_rule'])
-
- def get_name(self):
- return self.__name
-
- def get_genre(self):
- return self.__genre
-
- def get_model(self):
- return self.__model
-
- def get_subject_categories(self):
- return self.__subject_categories
-
- def get_object_categories(self):
- return self.__object_categories
-
- def get_meta_rule(self):
- return self.__meta_rule
-
- def get_meta_rule_aggregation(self):
- return self.__meta_rule['aggregation']
-
- def get_data(self):
- data = dict()
- data["name"] = self.get_name()
- data["model"] = self.__model
- data["genre"] = self.__genre
- data["description"] = self.__description
- data["subject_categories"] = self.get_subject_categories()
- data["object_categories"] = self.get_object_categories()
- data["meta_rule"] = dict(self.get_meta_rule())
- return data
-
- def set_data(self, data):
- self.__name = data["name"]
- self.__model = data["model"]
- self.__genre = data["genre"]
- self.__description = data["description"]
- self.__subject_categories = list(data["subject_categories"])
- self.__object_categories = list(data["object_categories"])
- self.__meta_rule = dict(data["meta_rule"])
-
-
-class Configuration:
- def __init__(self):
- self.__subject_category_values = dict()
- # examples: { "role": {"admin", "dev", }, }
- self.__object_category_values = dict()
- self.__rules = list()
-
- def load_from_json(self, extension_setting_dir):
- configuration_path = os.path.join(extension_setting_dir, 'configuration.json')
- f = open(configuration_path)
- json_configuration = json.load(f)
- self.__subject_category_values = copy.deepcopy(json_configuration['subject_category_values'])
- self.__object_category_values = copy.deepcopy(json_configuration['object_category_values'])
- self.__rules = copy.deepcopy(json_configuration['rules']) # TODO: currently a list, will be a dict with sub-meta-rule as key
-
- def get_subject_category_values(self):
- return self.__subject_category_values
-
- def get_object_category_values(self):
- return self.__object_category_values
-
- def get_rules(self):
- return self.__rules
-
- def get_data(self):
- data = dict()
- data["subject_category_values"] = self.get_subject_category_values()
- data["object_category_values"] = self.get_object_category_values()
- data["rules"] = self.get_rules()
- return data
-
- def set_data(self, data):
- self.__subject_category_values = list(data["subject_category_values"])
- self.__object_category_values = list(data["object_category_values"])
- self.__rules = list(data["rules"])
-
-
-class Perimeter:
- def __init__(self):
- self.__subjects = list()
- self.__objects = list()
-
- def load_from_json(self, extension_setting_dir):
- perimeter_path = os.path.join(extension_setting_dir, 'perimeter.json')
- f = open(perimeter_path)
- json_perimeter = json.load(f)
- self.__subjects = copy.deepcopy(json_perimeter['subjects'])
- self.__objects = copy.deepcopy(json_perimeter['objects'])
- # print(self.__subjects)
- # print(self.__objects)
-
- def get_subjects(self):
- return self.__subjects
-
- def get_objects(self):
- return self.__objects
-
- def get_data(self):
- data = dict()
- data["subjects"] = self.get_subjects()
- data["objects"] = self.get_objects()
- return data
-
- def set_data(self, data):
- self.__subjects = list(data["subjects"])
- self.__objects = list(data["objects"])
-
-
-class Assignment:
- def __init__(self):
- self.__subject_category_assignments = dict()
- # examples: { "role": {"user1": {"dev"}, "user2": {"admin",}}, } TODO: limit to one value for each attr
- self.__object_category_assignments = dict()
-
- def load_from_json(self, extension_setting_dir):
- assignment_path = os.path.join(extension_setting_dir, 'assignment.json')
- f = open(assignment_path)
- json_assignment = json.load(f)
-
- self.__subject_category_assignments = dict(copy.deepcopy(json_assignment['subject_category_assignments']))
- self.__object_category_assignments = dict(copy.deepcopy(json_assignment['object_category_assignments']))
-
- def get_subject_category_assignments(self):
- return self.__subject_category_assignments
-
- def get_object_category_assignments(self):
- return self.__object_category_assignments
-
- def get_data(self):
- data = dict()
- data["subject_category_assignments"] = self.get_subject_category_assignments()
- data["object_category_assignments"] = self.get_object_category_assignments()
- return data
-
- def set_data(self, data):
- self.__subject_category_assignments = list(data["subject_category_assignments"])
- self.__object_category_assignments = list(data["object_category_assignments"])
-
-
-class AuthzData:
- def __init__(self, sub, obj, act):
- self.validation = "False" # "OK, KO, Out of Scope" # "auth": False,
- self.subject = sub
- self.object = str(obj)
- self.action = str(act)
- self.type = "" # intra-tenant, inter-tenant, Out of Scope
- self.subject_attrs = dict()
- self.object_attrs = dict()
- self.requesting_tenant = "" # "subject_tenant": subject_tenant,
- self.requested_tenant = "" # "object_tenant": object_tenant,
-
- def __str__(self):
- return """AuthzData:
- validation={}
- subject={}
- object={}
- action={}
- """.format(self.validation, self.subject, self.object, self.action)
-
-
-class Extension:
- def __init__(self):
- self.metadata = Metadata()
- self.configuration = Configuration()
- self.perimeter = Perimeter()
- self.assignment = Assignment()
-
- def load_from_json(self, extension_setting_dir):
- self.metadata.load_from_json(extension_setting_dir)
- self.configuration.load_from_json(extension_setting_dir)
- self.perimeter.load_from_json(extension_setting_dir)
- self.assignment.load_from_json(extension_setting_dir)
-
- def get_name(self):
- return self.metadata.get_name()
-
- def get_genre(self):
- return self.metadata.get_genre()
-
- def authz(self, sub, obj, act):
- authz_data = AuthzData(sub, obj, act)
- # authz_logger.warning('extension/authz request: [sub {}, obj {}, act {}]'.format(sub, obj, act))
-
- if authz_data.subject in self.perimeter.get_subjects() and authz_data.object in self.perimeter.get_objects():
-
- for subject_category in self.metadata.get_subject_categories():
- authz_data.subject_attrs[subject_category] = copy.copy(
- # self.assignment.get_subject_category_attr(subject_category, sub)
- self.assignment.get_subject_category_assignments()[subject_category][sub]
- )
- # authz_logger.warning('extension/authz subject attribute: [subject attr: {}]'.format(
- # #self.assignment.get_subject_category_attr(subject_category, sub))
- # self.assignment.get_subject_category_assignments()[subject_category][sub])
- # )
-
- for object_category in self.metadata.get_object_categories():
- if object_category == 'action':
- authz_data.object_attrs[object_category] = [act]
- # authz_logger.warning('extension/authz object attribute: [object attr: {}]'.format([act]))
- else:
- authz_data.object_attrs[object_category] = copy.copy(
- self.assignment.get_object_category_assignments()[object_category][obj]
- )
- # authz_logger.warning('extension/authz object attribute: [object attr: {}]'.format(
- # self.assignment.get_object_category_assignments()[object_category][obj])
- # )
-
- _aggregation_data = dict()
-
- for sub_meta_rule in self.metadata.get_meta_rule()["sub_meta_rules"].values():
- _tmp_relation_args = list()
-
- for sub_subject_category in sub_meta_rule["subject_categories"]:
- _tmp_relation_args.append(authz_data.subject_attrs[sub_subject_category])
-
- for sub_object_category in sub_meta_rule["object_categories"]:
- _tmp_relation_args.append(authz_data.object_attrs[sub_object_category])
-
- _relation_args = list(itertools.product(*_tmp_relation_args))
-
- if sub_meta_rule['relation'] == 'relation_super': # TODO: replace by Prolog Engine
- _aggregation_data['relation_super'] = dict()
- _aggregation_data['relation_super']['result'] = False
- for _relation_arg in _relation_args:
- if list(_relation_arg) in self.configuration.get_rules()[sub_meta_rule['relation']]:
- # authz_logger.warning(
- # 'extension/authz relation super OK: [sub_sl: {}, obj_sl: {}, action: {}]'.format(
- # _relation_arg[0], _relation_arg[1], _relation_arg[2]
- # )
- # )
- _aggregation_data['relation_super']['result'] = True
- break
- _aggregation_data['relation_super']['status'] = 'finished'
-
- elif sub_meta_rule['relation'] == 'permission':
- _aggregation_data['permission'] = dict()
- _aggregation_data['permission']['result'] = False
- for _relation_arg in _relation_args:
- if list(_relation_arg) in self.configuration.get_rules()[sub_meta_rule['relation']]:
- # authz_logger.warning(
- # 'extension/authz relation permission OK: [role: {}, object: {}, action: {}]'.format(
- # _relation_arg[0], _relation_arg[1], _relation_arg[2]
- # )
- # )
- _aggregation_data['permission']['result'] = True
- break
- _aggregation_data['permission']['status'] = 'finished'
-
- if self.metadata.get_meta_rule_aggregation() == 'and_true_aggregation':
- authz_data.validation = "OK"
- for relation in _aggregation_data:
- if _aggregation_data[relation]['status'] == 'finished' \
- and _aggregation_data[relation]['result'] == False:
- authz_data.validation = "KO"
- else:
- authz_data.validation = 'Out of Scope'
-
- return authz_data.validation
-
- # ---------------- metadate api ----------------
-
- def get_subject_categories(self):
- return self.metadata.get_subject_categories()
-
- def add_subject_category(self, category_id):
- if category_id in self.get_subject_categories():
- return "[ERROR] Add Subject Category: Subject Category Exists"
- else:
- self.get_subject_categories().append(category_id)
- self.configuration.get_subject_category_values()[category_id] = list()
- self.assignment.get_subject_category_assignments()[category_id] = dict()
- return self.get_subject_categories()
-
- def del_subject_category(self, category_id):
- if category_id in self.get_subject_categories():
- self.configuration.get_subject_category_values().pop(category_id)
- self.assignment.get_subject_category_assignments().pop(category_id)
- self.get_subject_categories().remove(category_id)
- return self.get_subject_categories()
- else:
- return "[ERROR] Del Subject Category: Subject Category Unknown"
-
- def get_object_categories(self):
- return self.metadata.get_object_categories()
-
- def add_object_category(self, category_id):
- if category_id in self.get_object_categories():
- return "[ERROR] Add Object Category: Object Category Exists"
- else:
- self.get_object_categories().append(category_id)
- self.configuration.get_object_category_values()[category_id] = list()
- self.assignment.get_object_category_assignments()[category_id] = dict()
- return self.get_object_categories()
-
- def del_object_category(self, category_id):
- if category_id in self.get_object_categories():
- self.configuration.get_object_category_values().pop(category_id)
- self.assignment.get_object_category_assignments().pop(category_id)
- self.get_object_categories().remove(category_id)
- return self.get_object_categories()
- else:
- return "[ERROR] Del Object Category: Object Category Unknown"
-
- def get_meta_rule(self):
- return self.metadata.get_meta_rule()
-
- # ---------------- configuration api ----------------
-
- def get_subject_category_values(self, category_id):
- return self.configuration.get_subject_category_values()[category_id]
-
- def add_subject_category_value(self, category_id, category_value):
- if category_value in self.configuration.get_subject_category_values()[category_id]:
- return "[ERROR] Add Subject Category Value: Subject Category Value Exists"
- else:
- self.configuration.get_subject_category_values()[category_id].append(category_value)
- return self.configuration.get_subject_category_values()[category_id]
-
- def del_subject_category_value(self, category_id, category_value):
- if category_value in self.configuration.get_subject_category_values()[category_id]:
- self.configuration.get_subject_category_values()[category_id].remove(category_value)
- return self.configuration.get_subject_category_values()[category_id]
- else:
- return "[ERROR] Del Subject Category Value: Subject Category Value Unknown"
-
- def get_object_category_values(self, category_id):
- return self.configuration.get_object_category_values()[category_id]
-
- def add_object_category_value(self, category_id, category_value):
- if category_value in self.configuration.get_object_category_values()[category_id]:
- return "[ERROR] Add Object Category Value: Object Category Value Exists"
- else:
- self.configuration.get_object_category_values()[category_id].append(category_value)
- return self.configuration.get_object_category_values()[category_id]
-
- def del_object_category_value(self, category_id, category_value):
- if category_value in self.configuration.get_object_category_values()[category_id]:
- self.configuration.get_object_category_values()[category_id].remove(category_value)
- return self.configuration.get_object_category_values()[category_id]
- else:
- return "[ERROR] Del Object Category Value: Object Category Value Unknown"
-
- def get_meta_rules(self):
- return self.metadata.get_meta_rule()
-
- def _build_rule_from_list(self, relation, rule):
- rule = list(rule)
- _rule = dict()
- _rule["sub_cat_value"] = dict()
- _rule["obj_cat_value"] = dict()
- if relation in self.metadata.get_meta_rule()["sub_meta_rules"]:
- _rule["sub_cat_value"][relation] = dict()
- _rule["obj_cat_value"][relation] = dict()
- for s_category in self.metadata.get_meta_rule()["sub_meta_rules"][relation]["subject_categories"]:
- _rule["sub_cat_value"][relation][s_category] = rule.pop(0)
- for o_category in self.metadata.get_meta_rule()["sub_meta_rules"][relation]["object_categories"]:
- _rule["obj_cat_value"][relation][o_category] = rule.pop(0)
- return _rule
-
- def get_rules(self, full=False):
- if not full:
- return self.configuration.get_rules()
- rules = dict()
- for key in self.configuration.get_rules():
- rules[key] = map(lambda x: self._build_rule_from_list(key, x), self.configuration.get_rules()[key])
- return rules
-
- def add_rule(self, sub_cat_value_dict, obj_cat_value_dict):
- for _relation in self.metadata.get_meta_rule()["sub_meta_rules"]:
- _sub_rule = list()
- for sub_subject_category in self.metadata.get_meta_rule()["sub_meta_rules"][_relation]["subject_categories"]:
- try:
- if sub_cat_value_dict[_relation][sub_subject_category] \
- in self.configuration.get_subject_category_values()[sub_subject_category]:
- _sub_rule.append(sub_cat_value_dict[_relation][sub_subject_category])
- else:
- return "[Error] Add Rule: Subject Category Value Unknown"
- except KeyError as e:
- # DThom: sometimes relation attribute is buggy, I don't know why...
- print(e)
-
- #BUG: when adding a new category in rules despite it was previously adding
- # data = {
- # "sub_cat_value":
- # {"relation_super":
- # {"subject_security_level": "high", "AMH_CAT": "AMH_VAL"}
- # },
- # "obj_cat_value":
- # {"relation_super":
- # {"object_security_level": "medium"}
- # }
- # }
- # traceback = """
- # Traceback (most recent call last):
- # File "/moon/gui/views_json.py", line 20, in wrapped
- # result = function(*args, **kwargs)
- # File "/moon/gui/views_json.py", line 429, in rules
- # obj_cat_value=filter_input(data["obj_cat_value"]))
- # File "/usr/local/lib/python2.7/dist-packages/moon/core/pap/core.py", line 380, in add_rule
- # obj_cat_value)
- # File "/usr/local/lib/python2.7/dist-packages/moon/core/pdp/extension.py", line 414, in add_rule
- # if obj_cat_value_dict[_relation][sub_object_category] \
- # KeyError: u'action'
- # """
- for sub_object_category in self.metadata.get_meta_rule()["sub_meta_rules"][_relation]["object_categories"]:
- if obj_cat_value_dict[_relation][sub_object_category] \
- in self.configuration.get_object_category_values()[sub_object_category]:
- _sub_rule.append(obj_cat_value_dict[_relation][sub_object_category])
- else:
- return "[Error] Add Rule: Object Category Value Unknown"
-
- if _sub_rule in self.configuration.get_rules()[_relation]:
- return "[Error] Add Rule: Rule Exists"
- else:
- self.configuration.get_rules()[_relation].append(_sub_rule)
- return {
- sub_cat_value_dict.keys()[0]: ({
- "sub_cat_value": copy.deepcopy(sub_cat_value_dict),
- "obj_cat_value": copy.deepcopy(obj_cat_value_dict)
- }, )
- }
- return self.configuration.get_rules()
-
- def del_rule(self, sub_cat_value_dict, obj_cat_value_dict):
- for _relation in self.metadata.get_meta_rule()["sub_meta_rules"]:
- _sub_rule = list()
- for sub_subject_category in self.metadata.get_meta_rule()["sub_meta_rules"][_relation]["subject_categories"]:
- _sub_rule.append(sub_cat_value_dict[_relation][sub_subject_category])
-
- for sub_object_category in self.metadata.get_meta_rule()["sub_meta_rules"][_relation]["object_categories"]:
- _sub_rule.append(obj_cat_value_dict[_relation][sub_object_category])
-
- if _sub_rule in self.configuration.get_rules()[_relation]:
- self.configuration.get_rules()[_relation].remove(_sub_rule)
- else:
- return "[Error] Del Rule: Rule Unknown"
- return self.configuration.get_rules()
-
- # ---------------- perimeter api ----------------
-
- def get_subjects(self):
- return self.perimeter.get_subjects()
-
- def get_objects(self):
- return self.perimeter.get_objects()
-
- def add_subject(self, subject_id):
- if subject_id in self.perimeter.get_subjects():
- return "[ERROR] Add Subject: Subject Exists"
- else:
- self.perimeter.get_subjects().append(subject_id)
- return self.perimeter.get_subjects()
-
- def del_subject(self, subject_id):
- if subject_id in self.perimeter.get_subjects():
- self.perimeter.get_subjects().remove(subject_id)
- return self.perimeter.get_subjects()
- else:
- return "[ERROR] Del Subject: Subject Unknown"
-
- def add_object(self, object_id):
- if object_id in self.perimeter.get_objects():
- return "[ERROR] Add Object: Object Exists"
- else:
- self.perimeter.get_objects().append(object_id)
- return self.perimeter.get_objects()
-
- def del_object(self, object_id):
- if object_id in self.perimeter.get_objects():
- self.perimeter.get_objects().remove(object_id)
- return self.perimeter.get_objects()
- else:
- return "[ERROR] Del Object: Object Unknown"
-
- # ---------------- assignment api ----------------
-
- def get_subject_assignments(self, category_id):
- if category_id in self.metadata.get_subject_categories():
- return self.assignment.get_subject_category_assignments()[category_id]
- else:
- return "[ERROR] Get Subject Assignment: Subject Category Unknown"
-
- def add_subject_assignment(self, category_id, subject_id, category_value):
- if category_id in self.metadata.get_subject_categories():
- if subject_id in self.perimeter.get_subjects():
- if category_value in self.configuration.get_subject_category_values()[category_id]:
- if category_id in self.assignment.get_subject_category_assignments().keys():
- if subject_id in self.assignment.get_subject_category_assignments()[category_id].keys():
- if category_value in self.assignment.get_subject_category_assignments()[category_id][subject_id]:
- return "[ERROR] Add Subject Assignment: Subject Assignment Exists"
- else:
- self.assignment.get_subject_category_assignments()[category_id][subject_id].extend([category_value])
- else:
- self.assignment.get_subject_category_assignments()[category_id][subject_id] = [category_value]
- else:
- self.assignment.get_subject_category_assignments()[category_id] = {subject_id: [category_value]}
- return self.assignment.get_subject_category_assignments()
- else:
- return "[ERROR] Add Subject Assignment: Subject Category Value Unknown"
- else:
- return "[ERROR] Add Subject Assignment: Subject Unknown"
- else:
- return "[ERROR] Add Subject Assignment: Subject Category Unknown"
-
- def del_subject_assignment(self, category_id, subject_id, category_value):
- if category_id in self.metadata.get_subject_categories():
- if subject_id in self.perimeter.get_subjects():
- if category_value in self.configuration.get_subject_category_values()[category_id]:
- if len(self.assignment.get_subject_category_assignments()[category_id][subject_id]) >= 2:
- self.assignment.get_subject_category_assignments()[category_id][subject_id].remove(category_value)
- else:
- self.assignment.get_subject_category_assignments()[category_id].pop(subject_id)
- return self.assignment.get_subject_category_assignments()
- else:
- return "[ERROR] Del Subject Assignment: Assignment Unknown"
- else:
- return "[ERROR] Del Subject Assignment: Subject Unknown"
- else:
- return "[ERROR] Del Subject Assignment: Subject Category Unknown"
-
- def get_object_assignments(self, category_id):
- if category_id in self.metadata.get_object_categories():
- return self.assignment.get_object_category_assignments()[category_id]
- else:
- return "[ERROR] Get Object Assignment: Object Category Unknown"
-
- def add_object_assignment(self, category_id, object_id, category_value):
- if category_id in self.metadata.get_object_categories():
- if object_id in self.perimeter.get_objects():
- if category_value in self.configuration.get_object_category_values()[category_id]:
- if category_id in self.assignment.get_object_category_assignments().keys():
- if object_id in self.assignment.get_object_category_assignments()[category_id].keys():
- if category_value in self.assignment.get_object_category_assignments()[category_id][object_id]:
- return "[ERROR] Add Object Assignment: Object Assignment Exists"
- else:
- self.assignment.get_object_category_assignments()[category_id][object_id].extend([category_value])
- else:
- self.assignment.get_object_category_assignments()[category_id][object_id] = [category_value]
- else:
- self.assignment.get_object_category_assignments()[category_id] = {object_id: [category_value]}
- return self.assignment.get_object_category_assignments()
- else:
- return "[ERROR] Add Object Assignment: Object Category Value Unknown"
- else:
- return "[ERROR] Add Object Assignment: Object Unknown"
- else:
- return "[ERROR] Add Object Assignment: Object Category Unknown"
-
- def del_object_assignment(self, category_id, object_id, category_value):
- if category_id in self.metadata.get_object_categories():
- if object_id in self.perimeter.get_objects():
- if category_value in self.configuration.get_object_category_values()[category_id]:
- if len(self.assignment.get_object_category_assignments()[category_id][object_id]) >= 2:
- self.assignment.get_object_category_assignments()[category_id][object_id].remove(category_value)
- else:
- self.assignment.get_object_category_assignments()[category_id].pop(object_id)
- return self.assignment.get_object_category_assignments()
- else:
- return "[ERROR] Del Object Assignment: Assignment Unknown"
- else:
- return "[ERROR] Del Object Assignment: Object Unknown"
- else:
- return "[ERROR] Del Object Assignment: Object Category Unknown"
-
- # ---------------- inter-extension API ----------------
-
- def create_requesting_collaboration(self, sub_list, vent_uuid, act):
- _sub_cat_values = dict()
- _obj_cat_values = dict()
-
- if type(self.add_object(vent_uuid)) is not list:
- return "[Error] Create Requesting Collaboration: No Success"
- for _relation in self.get_meta_rule()["sub_meta_rules"]:
- for _sub_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["subject_categories"]:
- _sub_cat_value = str(uuid4())
- if type(self.add_subject_category_value(_sub_cat_id, _sub_cat_value)) is not list:
- return "[Error] Create Requesting Collaboration: No Success"
- _sub_cat_values[_relation] = {_sub_cat_id: _sub_cat_value}
- for _sub in sub_list:
- if type(self.add_subject_assignment(_sub_cat_id, _sub, _sub_cat_value)) is not dict:
- return "[Error] Create Requesting Collaboration: No Success"
-
- for _obj_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["object_categories"]:
- if _obj_cat_id == 'action':
- _obj_cat_values[_relation][_obj_cat_id] = act
- else:
- _obj_cat_value = str(uuid4())
- if type(self.add_object_category_value(_obj_cat_id, _obj_cat_value)) is not list:
- return "[Error] Create Requesting Collaboration: No Success"
- if type(self.add_object_assignment(_obj_cat_id, vent_uuid, _obj_cat_value)) is not dict:
- return "[Error] Create Requesting Collaboration: No Success"
- _obj_cat_values[_relation] = {_obj_cat_id: _obj_cat_value}
-
- _rule = self.add_rule(_sub_cat_values, _obj_cat_values)
- if type(_rule) is not dict:
- return "[Error] Create Requesting Collaboration: No Success"
- return {"subject_category_value_dict": _sub_cat_values, "object_category_value_dict": _obj_cat_values,
- "rule": _rule}
-
- def destroy_requesting_collaboration(self, sub_list, vent_uuid, sub_cat_value_dict, obj_cat_value_dict):
- for _relation in self.get_meta_rule()["sub_meta_rules"]:
- for _sub_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["subject_categories"]:
- for _sub in sub_list:
- if type(self.del_subject_assignment(_sub_cat_id, _sub, sub_cat_value_dict[_relation][_sub_cat_id]))\
- is not dict:
- return "[Error] Destroy Requesting Collaboration: No Success"
- if type(self.del_subject_category_value(_sub_cat_id, sub_cat_value_dict[_relation][_sub_cat_id])) \
- is not list:
- return "[Error] Destroy Requesting Collaboration: No Success"
-
- for _obj_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["object_categories"]:
- if _obj_cat_id == "action":
- pass # TODO: reconsidering the action as object attribute
- else:
- if type(self.del_object_assignment(_obj_cat_id, vent_uuid, obj_cat_value_dict[_relation][_obj_cat_id])) is not dict:
- return "[Error] Destroy Requesting Collaboration: No Success"
- if type(self.del_object_category_value(_obj_cat_id, obj_cat_value_dict[_relation][_obj_cat_id])) is not list:
- return "[Error] Destroy Requesting Collaboration: No Success"
-
- if type(self.del_rule(sub_cat_value_dict, obj_cat_value_dict)) is not dict:
- return "[Error] Destroy Requesting Collaboration: No Success"
- if type(self.del_object(vent_uuid)) is not list:
- return "[Error] Destroy Requesting Collaboration: No Success"
- return "[Destroy Requesting Collaboration] OK"
-
- def create_requested_collaboration(self, vent_uuid, obj_list, act):
- _sub_cat_values = dict()
- _obj_cat_values = dict()
-
- if type(self.add_subject(vent_uuid)) is not list:
- return "[Error] Create Requested Collaboration: No Success"
-
- for _relation in self.get_meta_rule()["sub_meta_rules"]:
- for _sub_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["subject_categories"]:
- _sub_cat_value = str(uuid4())
- if type(self.add_subject_category_value(_sub_cat_id, _sub_cat_value)) is not list:
- return "[Error] Create Requested Collaboration: No Success"
- _sub_cat_values[_relation] = {_sub_cat_id: _sub_cat_value}
- if type(self.add_subject_assignment(_sub_cat_id, vent_uuid, _sub_cat_value)) is not dict:
- return "[Error] Create Requested Collaboration: No Success"
-
- for _obj_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["object_categories"]:
- if _obj_cat_id == 'action':
- _obj_cat_values[_relation][_obj_cat_id] = act
- else:
- _obj_cat_value = str(uuid4())
- if type(self.add_object_category_value(_obj_cat_id, _obj_cat_value)) is not list:
- return "[Error] Create Requested Collaboration: No Success"
- _obj_cat_values[_relation] = {_obj_cat_id: _obj_cat_value}
- for _obj in obj_list:
- if type(self.add_object_assignment(_obj_cat_id, _obj, _obj_cat_value)) is not dict:
- return "[Error] Create Requested Collaboration: No Success"
-
- _rule = self.add_rule(_sub_cat_values, _obj_cat_values)
- if type(_rule) is not dict:
- return "[Error] Create Requested Collaboration: No Success"
- return {"subject_category_value_dict": _sub_cat_values, "object_category_value_dict": _obj_cat_values,
- "rule": _rule}
-
- def destroy_requested_collaboration(self, vent_uuid, obj_list, sub_cat_value_dict, obj_cat_value_dict):
- for _relation in self.get_meta_rule()["sub_meta_rules"]:
- for _sub_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["subject_categories"]:
- if type(self.del_subject_assignment(_sub_cat_id, vent_uuid, sub_cat_value_dict[_relation][_sub_cat_id])) is not dict:
- return "[Error] Destroy Requested Collaboration: No Success"
- if type(self.del_subject_category_value(_sub_cat_id, sub_cat_value_dict[_relation][_sub_cat_id])) is not list:
- return "[Error] Destroy Requested Collaboration: No Success"
-
- for _obj_cat_id in self.get_meta_rule()["sub_meta_rules"][_relation]["object_categories"]:
- if _obj_cat_id == "action":
- pass # TODO: reconsidering the action as object attribute
- else:
- for _obj in obj_list:
- if type(self.del_object_assignment(_obj_cat_id, _obj, obj_cat_value_dict[_relation][_obj_cat_id])) is not dict:
- return "[Error] Destroy Requested Collaboration: No Success"
- if type(self.del_object_category_value(_obj_cat_id, obj_cat_value_dict[_relation][_obj_cat_id])) is not list:
- return "[Error] Destroy Requested Collaboration: No Success"
-
- if type(self.del_rule(sub_cat_value_dict, obj_cat_value_dict)) is not dict:
- return "[Error] Destroy Requested Collaboration: No Success"
- if type(self.del_subject(vent_uuid)) is not list:
- return "[Error] Destroy Requested Collaboration: No Success"
- return "[Destroy Requested Collaboration] OK"
-
- # ---------------- sync_db api ----------------
-
- def get_data(self):
- data = dict()
- data["metadata"] = self.metadata.get_data()
- data["configuration"] = self.configuration.get_data()
- data["perimeter"] = self.perimeter.get_data()
- data["assignment"] = self.assignment.get_data()
- return data
-
- def set_data(self, extension_data):
- self.metadata.set_data(extension_data["metadata"])
- self.configuration.set_data(extension_data["configuration"])
- self.perimeter.set_data(extension_data["perimeter"])
- self.assignment.set_data(extension_data["assignment"])
diff --git a/keystone-moon/keystone/contrib/moon/migrate_repo/versions/001_moon.py b/keystone-moon/keystone/contrib/moon/migrate_repo/versions/001_moon.py
index af4d80bb..4fd26bef 100644
--- a/keystone-moon/keystone/contrib/moon/migrate_repo/versions/001_moon.py
+++ b/keystone-moon/keystone/contrib/moon/migrate_repo/versions/001_moon.py
@@ -21,11 +21,11 @@ def upgrade(migrate_engine):
mysql_charset='utf8')
intra_extension_table.create(migrate_engine, checkfirst=True)
- intra_extension_table.insert().values(id=uuid4().hex, intra_extension={
- 'name': "Root Extension",
- 'description': "The root intra extension",
- 'model': 'admin'
- })
+ # intra_extension_table.insert().values(id=uuid4().hex, intra_extension={
+ # 'name': "Root Extension",
+ # 'description': "The root intra extension",
+ # 'model': 'admin'
+ # })
tenant_table = sql.Table(
'tenants',
@@ -195,8 +195,6 @@ def upgrade(migrate_engine):
mysql_charset='utf8')
rules_table.create(migrate_engine, checkfirst=True)
- # TODO: load root_extension
-
def downgrade(migrate_engine):
meta = sql.MetaData()
diff --git a/keystone-moon/keystone/tests/moon/unit/__init__.py b/keystone-moon/keystone/tests/moon/unit/__init__.py
index 0cd835ce..54c9252e 100644
--- a/keystone-moon/keystone/tests/moon/unit/__init__.py
+++ b/keystone-moon/keystone/tests/moon/unit/__init__.py
@@ -3,7 +3,6 @@
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
import uuid
-from keystone.contrib.moon.core import ADMIN_ID
USER = {
'name': 'admin',
@@ -25,10 +24,8 @@ def create_intra_extension(self, policy_model="policy_authz"):
if "authz" in policy_model:
genre = "authz"
IE["genre"] = genre
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- ref = self.admin_api.load_intra_extension_dict(self.ADMIN_ID, intra_extension_dict=IE)
+ ref = self.admin_api.load_intra_extension_dict(self.root_api.get_root_admin_id(),
+ intra_extension_dict=IE)
self.assertIsInstance(ref, dict)
return ref
@@ -62,7 +59,6 @@ def create_user(self, username="TestAdminIntraExtensionManagerUser"):
def create_mapping(self, tenant_name=None, authz_id=None, admin_id=None):
- from keystone.contrib.moon.core import ADMIN_ID
if not tenant_name:
tenant_name = uuid.uuid4().hex
@@ -76,7 +72,7 @@ def create_mapping(self, tenant_name=None, authz_id=None, admin_id=None):
"domain_id": "default"
}
keystone_tenant = self.resource_api.create_project(tenant["id"], tenant)
- mapping = self.tenant_api.add_tenant_dict(ADMIN_ID, tenant)
+ mapping = self.tenant_api.add_tenant_dict(self.root_api.get_root_admin_id(), tenant)
self.assertIsInstance(mapping, dict)
self.assertIn("intra_authz_extension_id", mapping[tenant["id"]])
self.assertIn("intra_admin_extension_id", mapping[tenant["id"]])
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_configuration.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_configuration.py
index 1d612b7d..0be52c18 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_configuration.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_configuration.py
@@ -12,7 +12,6 @@ from keystone.contrib.moon.core import ConfigurationManager
from keystone.tests.unit.ksfixtures import database
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
-from keystone.contrib.moon.core import ADMIN_ID
from keystone.contrib.moon.core import LogManager
from keystone.contrib.moon.core import IntraExtensionAdminManager
from keystone.tests.moon.unit import *
@@ -26,15 +25,18 @@ class TestConfigurationManager(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestConfigurationManager, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
self.admin = create_user(self, username="admin")
self.demo = create_user(self, username="demo")
- self.root_intra_extension = create_intra_extension(self, policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = self.configuration_api
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
+ self.admin_manager = self.admin_api
+ self.configuration_manager = self.configuration_api
def load_extra_backends(self):
return {
@@ -60,10 +62,9 @@ class TestConfigurationManager(tests.TestCase):
policy_directory=self.policy_directory)
def test_get_policy_template_dict(self):
- data = self.manager.get_policy_templates_dict(self.ADMIN_ID)
+ data = self.configuration_manager.get_policy_templates_dict(self.ADMIN_ID)
self.assertIsInstance(data, dict)
- self.assertIn("authz_templates", data)
- self.assertIn("policy_root", data["authz_templates"])
+ self.assertIn("policy_root", data)
# def test_get_aggregation_algorithm_dict(self):
# admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py
index 60122b9d..e76173e7 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py
@@ -16,7 +16,6 @@ from keystone import resource
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
from keystone.contrib.moon.core import LogManager, TenantManager
-from keystone.contrib.moon.core import ADMIN_ID
from keystone.tests.moon.unit import *
CONF = cfg.CONF
@@ -33,6 +32,7 @@ IE = {
"description": "a simple description."
}
+
@dependency.requires('admin_api', 'authz_api', 'tenant_api', 'configuration_api', 'moonlog_api')
class TestIntraExtensionAdminManagerOK(tests.TestCase):
@@ -40,15 +40,16 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestIntraExtensionAdminManagerOK, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
self.admin = create_user(self, username="admin")
self.demo = create_user(self, username="demo")
- self.root_intra_extension = create_intra_extension(self, policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = self.authz_api
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
self.admin_manager = self.admin_api
def __get_key_from_value(self, value, values_dict):
@@ -74,7 +75,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
policy_directory=self.policy_directory)
def delete_admin_intra_extension(self):
- self.manager.del_intra_extension(self.ref["id"])
+ self.authz_manager.del_intra_extension(self.ref["id"])
def test_subjects(self):
authz_ie_dict = create_intra_extension(self, "policy_authz")
@@ -82,12 +83,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subjects, dict)
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
@@ -112,7 +111,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
# Delete the new subject
self.admin_manager.del_subject(admin_subject_id, authz_ie_dict["id"], new_subject["id"])
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -125,12 +124,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ objects = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
objects_id_list = []
self.assertIsInstance(objects, dict)
for key, value in objects.iteritems():
@@ -145,12 +142,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ actions = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
actions_id_list = []
self.assertIsInstance(actions, dict)
for key, value in actions.iteritems():
@@ -165,12 +160,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subject_categories, dict)
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -192,7 +185,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
# Delete the new subject_category
self.admin_manager.del_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category["id"])
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -205,12 +198,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(object_categories, dict)
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -233,7 +224,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
# Delete the new object_category
self.admin_manager.del_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category["id"])
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -246,12 +237,10 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(action_categories, dict)
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -274,7 +263,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
# Delete the new action_category
self.admin_manager.del_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category["id"])
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -287,11 +276,11 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -304,7 +293,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -350,11 +339,11 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
object_categories = self.admin_manager.add_object_category_dict(
admin_subject_id,
@@ -367,7 +356,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -413,11 +402,11 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
action_categories = self.admin_manager.add_action_category_dict(
admin_subject_id,
@@ -430,7 +419,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -476,17 +465,17 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
admin_authz_subject_id, admin_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
demo_authz_subject_id, demo_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
- subjects_dict = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects_dict = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -498,7 +487,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
)
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -529,7 +518,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
new_subject_category_scope_2)
subject_category_scope_2_id = subject_category_scope_2.keys()[0]
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
admin_authz_subject_id,
@@ -538,7 +527,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIsInstance(subject_category_assignments, list)
self.assertEqual([], subject_category_assignments)
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
demo_authz_subject_id,
@@ -599,13 +588,13 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects_dict = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ objects_dict = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
object_vm1_id = None
object_vm2_id = None
@@ -627,7 +616,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
)
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -658,7 +647,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
new_object_category_scope_2)
object_category_scope_2_id = object_category_scope_2.keys()[0]
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm1_id,
@@ -667,7 +656,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIsInstance(object_category_assignments, list)
self.assertEqual([], object_category_assignments)
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm2_id,
@@ -728,13 +717,13 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions_dict = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ actions_dict = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
action_upload_id = None
action_list_id = None
@@ -756,7 +745,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
)
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -787,7 +776,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
new_action_category_scope_2)
action_category_scope_2_id = action_category_scope_2.keys()[0]
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_upload_id,
@@ -796,7 +785,7 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
self.assertIsInstance(action_category_assignments, list)
self.assertEqual([], action_category_assignments)
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_list_id,
@@ -857,11 +846,11 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
aggregation_algorithms = self.admin_manager.get_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in aggregation_algorithms.iteritems():
@@ -899,11 +888,11 @@ class TestIntraExtensionAdminManagerOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(sub_meta_rules, dict)
@@ -978,15 +967,16 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestIntraExtensionAdminManagerKO, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
self.admin = create_user(self, username="admin")
self.demo = create_user(self, username="demo")
- self.root_intra_extension = create_intra_extension(self, policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = self.authz_api
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
self.admin_manager = self.admin_api
def __get_key_from_value(self, value, values_dict):
@@ -1017,12 +1007,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subjects, dict)
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
@@ -1035,7 +1025,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_subject = {"name": "subject_test", "description": "subject_test"}
self.assertRaises(
AuthzException,
- self.manager.add_subject_dict,
+ self.authz_manager.add_subject_dict,
demo_subject_id, admin_ie_dict["id"], new_subject)
subjects = self.admin_manager.add_subject_dict(admin_subject_id, authz_ie_dict["id"], new_subject)
@@ -1052,11 +1042,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
# Delete the new subject
self.assertRaises(
AuthzException,
- self.manager.del_subject,
+ self.authz_manager.del_subject,
demo_subject_id, authz_ie_dict["id"], new_subject["id"])
self.admin_manager.del_subject(admin_subject_id, authz_ie_dict["id"], new_subject["id"])
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1069,12 +1059,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ objects = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
objects_id_list = []
self.assertIsInstance(objects, dict)
for key, value in objects.iteritems():
@@ -1087,35 +1077,35 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_object = {"name": "object_test", "description": "object_test"}
self.assertRaises(
AuthzException,
- self.manager.add_object_dict,
+ self.authz_manager.add_object_dict,
demo_subject_id, admin_ie_dict["id"], new_object)
self.assertRaises(
ObjectsWriteNoAuthorized,
self.admin_manager.add_object_dict,
- admin_subject_id, authz_ie_dict["id"], new_object
+ admin_subject_id, admin_ie_dict["id"], new_object
)
# Delete the new object
for key in objects_id_list:
self.assertRaises(
AuthzException,
- self.manager.del_object,
+ self.authz_manager.del_object,
demo_subject_id, authz_ie_dict["id"], key)
self.assertRaises(
AuthzException,
- self.manager.del_object,
+ self.authz_manager.del_object,
admin_subject_id, authz_ie_dict["id"], key)
for key in objects_id_list:
self.assertRaises(
ObjectsWriteNoAuthorized,
self.admin_manager.del_object,
- demo_subject_id, authz_ie_dict["id"], key)
+ demo_subject_id, admin_ie_dict["id"], key)
self.assertRaises(
ObjectsWriteNoAuthorized,
self.admin_manager.del_object,
- admin_subject_id, authz_ie_dict["id"], key)
+ admin_subject_id, admin_ie_dict["id"], key)
def test_actions(self):
authz_ie_dict = create_intra_extension(self, "policy_authz")
@@ -1123,12 +1113,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ actions = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
actions_id_list = []
self.assertIsInstance(actions, dict)
for key, value in actions.iteritems():
@@ -1141,35 +1131,35 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_action = {"name": "action_test", "description": "action_test"}
self.assertRaises(
AuthzException,
- self.manager.add_action_dict,
+ self.authz_manager.add_action_dict,
demo_subject_id, admin_ie_dict["id"], new_action)
self.assertRaises(
ActionsWriteNoAuthorized,
self.admin_manager.add_action_dict,
- admin_subject_id, authz_ie_dict["id"], new_action
+ admin_subject_id, admin_ie_dict["id"], new_action
)
# Delete all actions
for key in actions_id_list:
self.assertRaises(
AuthzException,
- self.manager.del_action,
+ self.authz_manager.del_action,
demo_subject_id, authz_ie_dict["id"], key)
self.assertRaises(
AuthzException,
- self.manager.del_action,
+ self.authz_manager.del_action,
admin_subject_id, authz_ie_dict["id"], key)
for key in actions_id_list:
self.assertRaises(
ActionsWriteNoAuthorized,
self.admin_manager.del_action,
- demo_subject_id, authz_ie_dict["id"], key)
+ demo_subject_id, admin_ie_dict["id"], key)
self.assertRaises(
ActionsWriteNoAuthorized,
self.admin_manager.del_action,
- admin_subject_id, authz_ie_dict["id"], key)
+ admin_subject_id, admin_ie_dict["id"], key)
def test_subject_categories(self):
authz_ie_dict = create_intra_extension(self, "policy_authz")
@@ -1177,12 +1167,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subject_categories, dict)
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -1192,7 +1182,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_subject_category = {"name": "subject_category_test", "description": "subject_category_test"}
self.assertRaises(
AuthzException,
- self.manager.add_subject_category_dict,
+ self.authz_manager.add_subject_category_dict,
demo_subject_id, admin_ie_dict["id"], new_subject_category)
subject_categories = self.admin_manager.add_subject_category_dict(admin_subject_id, authz_ie_dict["id"], new_subject_category)
@@ -1209,11 +1199,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
# Delete the new subject_category
self.assertRaises(
AuthzException,
- self.manager.del_subject_category,
+ self.authz_manager.del_subject_category,
demo_subject_id, authz_ie_dict["id"], new_subject_category["id"])
self.admin_manager.del_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category["id"])
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1226,12 +1216,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(object_categories, dict)
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -1241,7 +1231,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_object_category = {"name": "object_category_test", "description": "object_category_test"}
self.assertRaises(
AuthzException,
- self.manager.add_object_category_dict,
+ self.authz_manager.add_object_category_dict,
demo_subject_id, admin_ie_dict["id"], new_object_category)
object_categories = self.admin_manager.add_object_category_dict(admin_subject_id, authz_ie_dict["id"], new_object_category)
@@ -1258,11 +1248,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
# Delete the new object_category
self.assertRaises(
AuthzException,
- self.manager.del_object_category,
+ self.authz_manager.del_object_category,
demo_subject_id, authz_ie_dict["id"], new_object_category["id"])
self.admin_manager.del_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category["id"])
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1275,12 +1265,12 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(action_categories, dict)
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -1290,7 +1280,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_action_category = {"name": "action_category_test", "description": "action_category_test"}
self.assertRaises(
AuthzException,
- self.manager.add_action_category_dict,
+ self.authz_manager.add_action_category_dict,
demo_subject_id, admin_ie_dict["id"], new_action_category)
action_categories = self.admin_manager.add_action_category_dict(admin_subject_id, authz_ie_dict["id"], new_action_category)
@@ -1307,11 +1297,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
# Delete the new action_category
self.assertRaises(
AuthzException,
- self.manager.del_action_category,
+ self.authz_manager.del_action_category,
demo_subject_id, authz_ie_dict["id"], new_action_category["id"])
self.admin_manager.del_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category["id"])
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1324,11 +1314,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -1341,7 +1331,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -1396,11 +1386,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
object_categories = self.admin_manager.add_object_category_dict(
admin_subject_id,
@@ -1413,7 +1403,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -1468,11 +1458,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
action_categories = self.admin_manager.add_action_category_dict(
admin_subject_id,
@@ -1485,7 +1475,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -1540,17 +1530,17 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
admin_authz_subject_id, admin_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
demo_authz_subject_id, demo_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
- subjects_dict = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects_dict = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -1562,7 +1552,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
)
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -1593,7 +1583,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_subject_category_scope_2)
subject_category_scope_2_id = subject_category_scope_2.keys()[0]
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
admin_authz_subject_id,
@@ -1602,7 +1592,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertIsInstance(subject_category_assignments, list)
self.assertEqual([], subject_category_assignments)
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
demo_authz_subject_id,
@@ -1613,14 +1603,14 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.add_subject_assignment_list,
+ self.authz_manager.add_subject_assignment_list,
demo_subject_id, authz_ie_dict["id"],
admin_authz_subject_id, subject_category_id, subject_category_scope_1_id
)
self.assertRaises(
AuthzException,
- self.manager.add_subject_assignment_list,
+ self.authz_manager.add_subject_assignment_list,
demo_subject_id, authz_ie_dict["id"],
demo_authz_subject_id, subject_category_id, subject_category_scope_2_id
)
@@ -1692,13 +1682,13 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects_dict = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ objects_dict = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
object_vm1_id = None
object_vm2_id = None
@@ -1720,7 +1710,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
)
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -1751,7 +1741,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_object_category_scope_2)
object_category_scope_2_id = object_category_scope_2.keys()[0]
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm1_id,
@@ -1760,7 +1750,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertIsInstance(object_category_assignments, list)
self.assertEqual([], object_category_assignments)
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm2_id,
@@ -1771,14 +1761,14 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.add_object_assignment_list,
+ self.authz_manager.add_object_assignment_list,
demo_subject_id, authz_ie_dict["id"],
object_vm1_id, object_category_id, object_category_scope_1_id
)
self.assertRaises(
AuthzException,
- self.manager.add_object_assignment_list,
+ self.authz_manager.add_object_assignment_list,
demo_subject_id, authz_ie_dict["id"],
object_vm2_id, object_category_id, object_category_scope_2_id
)
@@ -1850,13 +1840,13 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions_dict = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ actions_dict = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
action_upload_id = None
action_list_id = None
@@ -1878,7 +1868,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
)
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -1909,7 +1899,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
new_action_category_scope_2)
action_category_scope_2_id = action_category_scope_2.keys()[0]
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_upload_id,
@@ -1918,7 +1908,7 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertIsInstance(action_category_assignments, list)
self.assertEqual([], action_category_assignments)
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_list_id,
@@ -1929,14 +1919,14 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.add_action_assignment_list,
+ self.authz_manager.add_action_assignment_list,
demo_subject_id, authz_ie_dict["id"],
action_upload_id, action_category_id, action_category_scope_1_id
)
self.assertRaises(
AuthzException,
- self.manager.add_action_assignment_list,
+ self.authz_manager.add_action_assignment_list,
demo_subject_id, authz_ie_dict["id"],
action_list_id, action_category_id, action_category_scope_2_id
)
@@ -2008,11 +1998,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
aggregation_algorithms = self.admin_manager.get_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in aggregation_algorithms.iteritems():
@@ -2050,11 +2040,11 @@ class TestIntraExtensionAdminManagerKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ # demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
+ # {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(sub_meta_rules, dict)
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py
index 2f75acaf..c96c00b5 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py
@@ -10,12 +10,12 @@ import os
import uuid
from oslo_config import cfg
from keystone.tests import unit as tests
-from keystone.contrib.moon.core import IntraExtensionAdminManager, IntraExtensionAuthzManager
+from keystone.contrib.moon.core import IntraExtensionAdminManager, IntraExtensionAuthzManager, IntraExtensionRootManager
from keystone.tests.unit.ksfixtures import database
from keystone import resource
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
-from keystone.contrib.moon.core import LogManager, TenantManager, ADMIN_ID
+from keystone.contrib.moon.core import LogManager, TenantManager
from keystone.tests.moon.unit import *
CONF = cfg.CONF
@@ -38,15 +38,16 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestIntraExtensionAuthzManagerAuthzOK, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
self.admin = create_user(self, username="admin")
self.demo = create_user(self, username="demo")
- self.root_intra_extension = create_intra_extension(self, policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = self.authz_api
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
self.admin_manager = self.admin_api
def __get_key_from_value(self, value, values_dict):
@@ -72,7 +73,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
policy_directory=self.policy_directory)
def delete_admin_intra_extension(self):
- self.manager.del_intra_extension(self.ref["id"])
+ self.authz_manager.del_intra_extension(self.ref["id"])
def test_subjects(self):
authz_ie_dict = create_intra_extension(self, "policy_authz")
@@ -80,12 +81,10 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subjects, dict)
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
@@ -110,7 +109,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
# Delete the new subject
self.admin_manager.del_subject(admin_subject_id, authz_ie_dict["id"], new_subject["id"])
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -123,12 +122,10 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ objects = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
objects_id_list = []
self.assertIsInstance(objects, dict)
for key, value in objects.iteritems():
@@ -143,12 +140,10 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ actions = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
actions_id_list = []
self.assertIsInstance(actions, dict)
for key, value in actions.iteritems():
@@ -163,12 +158,10 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subject_categories, dict)
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -190,7 +183,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
# Delete the new subject_category
self.admin_manager.del_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category["id"])
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -203,12 +196,10 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(object_categories, dict)
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -231,7 +222,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
# Delete the new object_category
self.admin_manager.del_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category["id"])
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -244,12 +235,10 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(action_categories, dict)
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -272,7 +261,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
# Delete the new action_category
self.admin_manager.del_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category["id"])
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -285,11 +274,9 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -302,7 +289,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -348,11 +335,9 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
object_categories = self.admin_manager.add_object_category_dict(
admin_subject_id,
@@ -365,7 +350,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -411,11 +396,9 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
action_categories = self.admin_manager.add_action_category_dict(
admin_subject_id,
@@ -428,7 +411,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -474,17 +457,15 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
admin_authz_subject_id, admin_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
demo_authz_subject_id, demo_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
- subjects_dict = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects_dict = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -496,7 +477,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
)
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -527,7 +508,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
new_subject_category_scope_2)
subject_category_scope_2_id = subject_category_scope_2.keys()[0]
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
admin_authz_subject_id,
@@ -536,7 +517,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
self.assertIsInstance(subject_category_assignments, list)
self.assertEqual([], subject_category_assignments)
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
demo_authz_subject_id,
@@ -597,13 +578,11 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects_dict = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ objects_dict = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
object_vm1_id = None
object_vm2_id = None
@@ -625,7 +604,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
)
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -656,7 +635,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
new_object_category_scope_2)
object_category_scope_2_id = object_category_scope_2.keys()[0]
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm1_id,
@@ -665,7 +644,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
self.assertIsInstance(object_category_assignments, list)
self.assertEqual([], object_category_assignments)
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm2_id,
@@ -726,13 +705,11 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions_dict = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ actions_dict = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
action_upload_id = None
action_list_id = None
@@ -754,7 +731,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
)
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -785,7 +762,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
new_action_category_scope_2)
action_category_scope_2_id = action_category_scope_2.keys()[0]
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_upload_id,
@@ -794,7 +771,7 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
self.assertIsInstance(action_category_assignments, list)
self.assertEqual([], action_category_assignments)
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_list_id,
@@ -855,11 +832,9 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
aggregation_algorithms = self.admin_manager.get_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in aggregation_algorithms.iteritems():
@@ -897,11 +872,9 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(sub_meta_rules, dict)
@@ -969,23 +942,28 @@ class TestIntraExtensionAuthzManagerAuthzOK(tests.TestCase):
# TODO: add test for the delete function
-@dependency.requires('admin_api', 'authz_api', 'tenant_api', 'configuration_api', 'moonlog_api')
+@dependency.requires('admin_api', 'authz_api', 'tenant_api', 'configuration_api', 'moonlog_api', 'identity_api', 'root_api')
class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestIntraExtensionAuthzManagerAuthzKO, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
self.admin = create_user(self, username="admin")
self.demo = create_user(self, username="demo")
- self.root_intra_extension = create_intra_extension(self, policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = self.authz_api
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
self.admin_manager = self.admin_api
+ def tearDown(self):
+ # self.admin_manager.del_intra_extension(self.ADMIN_ID, self.root_intra_extension["id"])
+ tests.TestCase.tearDown(self)
+
def __get_key_from_value(self, value, values_dict):
return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0]
@@ -995,70 +973,41 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
"tenant_api": TenantManager(),
"admin_api": IntraExtensionAdminManager(),
"authz_api": IntraExtensionAuthzManager(),
+ "root_api": IntraExtensionRootManager(),
# "resource_api": resource.Manager(),
}
def config_overrides(self):
super(TestIntraExtensionAuthzManagerAuthzKO, self).config_overrides()
self.policy_directory = 'examples/moon/policies'
+ self.root_policy_directory = 'policy_root'
self.config_fixture.config(
group='moon',
intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector')
self.config_fixture.config(
group='moon',
policy_directory=self.policy_directory)
-
- def test_tenant_exceptions(self):
- self.assertRaises(
- TenantUnknown,
- self.manager.get_tenant_dict
- )
- self.assertRaises(
- TenantUnknown,
- self.manager.get_tenant_name,
- uuid.uuid4().hex
- )
- self.assertRaises(
- TenantUnknown,
- self.manager.set_tenant_name,
- uuid.uuid4().hex, uuid.uuid4().hex
- )
- self.assertRaises(
- TenantUnknown,
- self.manager.get_extension_uuid,
- uuid.uuid4().hex, "authz"
- )
- self.assertRaises(
- TenantUnknown,
- self.manager.get_extension_uuid,
- uuid.uuid4().hex, "admin"
- )
-
- def test_intra_extension_exceptions(self):
-
- tenant = self.create_tenant()
- self.assertRaises(
- IntraExtensionUnknown,
- self.manager.get_extension_uuid,
- tenant["id"], "authz"
- )
- self.assertRaises(
- IntraExtensionUnknown,
- self.manager.get_extension_uuid,
- tenant["id"], "admin"
- )
- # TODO
+ self.config_fixture.config(
+ group='moon',
+ root_policy_directory=self.root_policy_directory)
def test_delete_admin_intra_extension(self):
+ authz_ie_dict = create_intra_extension(self, "policy_authz")
+ admin_ie_dict = create_intra_extension(self, "policy_admin")
+ tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
+
+ admin_subject_id, admin_subject_dict = \
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
self.assertRaises(
- AdminException,
- self.manager.del_intra_extension,
- self.ref["id"])
+ SubjectUnknown,
+ self.authz_manager.del_intra_extension,
+ uuid.uuid4().hex,
+ admin_ie_dict["id"])
def test_authz_exceptions(self):
self.assertRaises(
TenantUnknown,
- self.manager.authz,
+ self.authz_manager.authz,
uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex
)
@@ -1067,19 +1016,17 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
# Test when subject is unknown
self.assertRaises(
SubjectUnknown,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex
)
# Test when subject is known but not the object
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
# self.manager.add_subject_dict(
# admin_subject_id,
@@ -1089,13 +1036,13 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
ObjectUnknown,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], uuid.uuid4().hex, uuid.uuid4().hex
)
# Test when subject and object are known but not the action
my_object = {"name": "my_object", "description": "my_object description"}
- _tmp = self.manager.add_object_dict(
+ _tmp = self.admin_manager.add_object_dict(
admin_subject_id,
authz_ie_dict["id"],
my_object
@@ -1104,13 +1051,13 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
ActionUnknown,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], uuid.uuid4().hex
)
# Test when subject and object and action are known
my_action = {"name": "my_action", "description": "my_action description"}
- _tmp = self.manager.add_action_dict(
+ _tmp = self.admin_manager.add_action_dict(
admin_subject_id,
authz_ie_dict["id"],
my_action
@@ -1119,13 +1066,13 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"]
)
# Add a subject scope and test ObjectCategoryAssignmentOutOfScope
my_subject_category = {"name": "my_subject_category", "description": "my_subject_category description"}
- _tmp = self.manager.add_subject_category_dict(
+ _tmp = self.admin_manager.add_subject_category_dict(
admin_subject_id,
authz_ie_dict["id"],
my_subject_category
@@ -1133,7 +1080,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
my_subject_category["id"] = _tmp.keys()[0]
my_subject_scope = {"name": "my_subject_scope", "description": "my_subject_scope description"}
- _tmp = self.manager.add_subject_scope_dict(
+ _tmp = self.admin_manager.add_subject_scope_dict(
admin_subject_id,
authz_ie_dict["id"],
my_subject_category["id"],
@@ -1143,13 +1090,13 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"]
)
# Add an object scope and test ActionCategoryAssignmentOutOfScope
my_object_category = {"name": "my_object_category", "description": "my_object_category description"}
- _tmp = self.manager.add_object_category_dict(
+ _tmp = self.admin_manager.add_object_category_dict(
admin_subject_id,
authz_ie_dict["id"],
my_object_category
@@ -1157,7 +1104,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
my_object_category["id"] = _tmp.keys()[0]
my_object_scope = {"name": "my_object_scope", "description": "my_object_scope description"}
- _tmp = self.manager.add_object_scope_dict(
+ _tmp = self.admin_manager.add_object_scope_dict(
admin_subject_id,
authz_ie_dict["id"],
my_object_category["id"],
@@ -1167,13 +1114,13 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"]
)
# Add an action scope and test SubjectCategoryAssignmentUnknown
my_action_category = {"name": "my_action_category", "description": "my_action_category description"}
- _tmp = self.manager.add_action_category_dict(
+ _tmp = self.admin_manager.add_action_category_dict(
admin_subject_id,
authz_ie_dict["id"],
my_action_category
@@ -1181,7 +1128,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
my_action_category["id"] = _tmp.keys()[0]
my_action_scope = {"name": "my_action_scope", "description": "my_action_scope description"}
- _tmp = self.manager.add_action_scope_dict(
+ _tmp = self.admin_manager.add_action_scope_dict(
admin_subject_id,
authz_ie_dict["id"],
my_action_category["id"],
@@ -1191,12 +1138,12 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"]
)
# Add a subject assignment and test ObjectCategoryAssignmentUnknown
- self.manager.add_subject_assignment_list(
+ self.admin_manager.add_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
demo_subject_id,
@@ -1206,12 +1153,12 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"]
)
# Add an object assignment and test ActionCategoryAssignmentUnknown
- self.manager.add_object_assignment_list(
+ self.admin_manager.add_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
my_object["id"],
@@ -1221,12 +1168,12 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"]
)
# Add an action assignment and test RuleUnknown
- self.manager.add_action_assignment_list(
+ self.admin_manager.add_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
my_action["id"],
@@ -1236,7 +1183,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], admin_subject_dict["name"], my_object["name"], my_action["name"]
)
@@ -1248,15 +1195,15 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
"action_categories": [my_action_category["id"], ],
"object_categories": [my_object_category["id"], ]
}
- print("my_meta_rule", my_meta_rule)
- sub_meta_rules_dict = self.manager.get_sub_meta_rules_dict(
+ sub_meta_rules_dict = self.authz_manager.get_sub_meta_rules_dict(
admin_subject_id,
authz_ie_dict["id"]
)
+ print("authz_ie_dict[\"id\"]", authz_ie_dict["id"])
self.assertRaises(
SubMetaRuleAlgorithmNotExisting,
- self.manager.add_sub_meta_rule_dict,
+ self.admin_manager.add_sub_meta_rule_dict,
admin_subject_id,
authz_ie_dict["id"],
my_meta_rule
@@ -1264,19 +1211,31 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
# TODO: the next request should be called with demo_subject_id
# but the demo user has no right in the root intra_extension
- algorithms = self.configuration_api.get_sub_meta_rule_algorithms_dict(admin_subject_id)
- for algorithm_id in algorithms:
- if algorithms[algorithm_id]["name"] == "inclusion":
- my_meta_rule["algorithm"] = algorithm_id
+ # algorithms = self.configuration_api.get_sub_meta_rule_algorithms_dict(admin_subject_id)
+ # for algorithm_id in algorithms:
+ # if algorithms[algorithm_id]["name"] == "inclusion":
+ # my_meta_rule["algorithm"] = algorithm_id
+ my_meta_rule['algorithm'] = 'inclusion'
- sub_meta_rule = self.manager.add_sub_meta_rule_dict(
+ sub_meta_rule = self.admin_manager.add_sub_meta_rule_dict(
admin_subject_id,
authz_ie_dict["id"],
my_meta_rule
)
- sub_meta_rule_id, sub_meta_rule_dict = sub_meta_rule.iteritems().next()
-
- rule = self.manager.add_rule_dict(
+ sub_meta_rule_id, sub_meta_rule_dict = None, None
+ for key, value in sub_meta_rule.iteritems():
+ if value["name"] == my_meta_rule["name"]:
+ sub_meta_rule_id, sub_meta_rule_dict = key, value
+ break
+
+ aggregation_algorithms = self.configuration_api.get_aggregation_algorithms_dict(admin_subject_id)
+ for _id in aggregation_algorithms:
+ if aggregation_algorithms[_id]["name"] == "one_true":
+ agg = self.admin_manager.set_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"],
+ _id,
+ aggregation_algorithms[_id])
+
+ rule = self.admin_manager.add_rule_dict(
admin_subject_id,
authz_ie_dict["id"],
sub_meta_rule_id,
@@ -1285,11 +1244,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.authz,
+ self.authz_manager.authz,
tenant["name"], admin_subject_dict["name"], my_object["name"], my_action["name"]
)
- result = self.manager.authz(tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"])
+ result = self.authz_manager.authz(tenant["name"], demo_subject_dict["name"], my_object["name"], my_action["name"])
self.assertEqual(True, result)
def test_subjects(self):
@@ -1298,12 +1257,10 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subjects, dict)
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
@@ -1316,7 +1273,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_subject = {"name": "subject_test", "description": "subject_test"}
self.assertRaises(
AuthzException,
- self.manager.add_subject_dict,
+ self.admin_manager.add_subject_dict,
demo_subject_id, admin_ie_dict["id"], new_subject)
subjects = self.admin_manager.add_subject_dict(admin_subject_id, authz_ie_dict["id"], new_subject)
@@ -1333,11 +1290,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
# Delete the new subject
self.assertRaises(
AuthzException,
- self.manager.del_subject,
+ self.authz_manager.del_subject,
demo_subject_id, authz_ie_dict["id"], new_subject["id"])
self.admin_manager.del_subject(admin_subject_id, authz_ie_dict["id"], new_subject["id"])
- subjects = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subjects.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1350,12 +1307,10 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ objects = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
objects_id_list = []
self.assertIsInstance(objects, dict)
for key, value in objects.iteritems():
@@ -1364,39 +1319,39 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertIn("name", value)
self.assertIn("description", value)
- create_user(self, "subject_test")
+ # create_user(self, "subject_test")
new_object = {"name": "object_test", "description": "object_test"}
self.assertRaises(
AuthzException,
- self.manager.add_object_dict,
+ self.authz_manager.add_object_dict,
demo_subject_id, admin_ie_dict["id"], new_object)
self.assertRaises(
ObjectsWriteNoAuthorized,
self.admin_manager.add_object_dict,
- admin_subject_id, authz_ie_dict["id"], new_object
+ admin_subject_id, admin_ie_dict["id"], new_object
)
# Delete the new object
for key in objects_id_list:
self.assertRaises(
AuthzException,
- self.manager.del_object,
+ self.authz_manager.del_object,
demo_subject_id, authz_ie_dict["id"], key)
self.assertRaises(
AuthzException,
- self.manager.del_object,
+ self.authz_manager.del_object,
admin_subject_id, authz_ie_dict["id"], key)
for key in objects_id_list:
self.assertRaises(
ObjectsWriteNoAuthorized,
self.admin_manager.del_object,
- demo_subject_id, authz_ie_dict["id"], key)
+ demo_subject_id, admin_ie_dict["id"], key)
self.assertRaises(
ObjectsWriteNoAuthorized,
self.admin_manager.del_object,
- admin_subject_id, authz_ie_dict["id"], key)
+ admin_subject_id, admin_ie_dict["id"], key)
def test_actions(self):
authz_ie_dict = create_intra_extension(self, "policy_authz")
@@ -1404,12 +1359,10 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ actions = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
actions_id_list = []
self.assertIsInstance(actions, dict)
for key, value in actions.iteritems():
@@ -1422,35 +1375,35 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_action = {"name": "action_test", "description": "action_test"}
self.assertRaises(
AuthzException,
- self.manager.add_action_dict,
+ self.authz_manager.add_action_dict,
demo_subject_id, admin_ie_dict["id"], new_action)
self.assertRaises(
ActionsWriteNoAuthorized,
self.admin_manager.add_action_dict,
- admin_subject_id, authz_ie_dict["id"], new_action
+ admin_subject_id, admin_ie_dict["id"], new_action
)
# Delete all actions
for key in actions_id_list:
self.assertRaises(
AuthzException,
- self.manager.del_action,
+ self.authz_manager.del_action,
demo_subject_id, authz_ie_dict["id"], key)
self.assertRaises(
AuthzException,
- self.manager.del_action,
+ self.authz_manager.del_action,
admin_subject_id, authz_ie_dict["id"], key)
for key in actions_id_list:
self.assertRaises(
ActionsWriteNoAuthorized,
self.admin_manager.del_action,
- demo_subject_id, authz_ie_dict["id"], key)
+ demo_subject_id, admin_ie_dict["id"], key)
self.assertRaises(
ActionsWriteNoAuthorized,
self.admin_manager.del_action,
- admin_subject_id, authz_ie_dict["id"], key)
+ admin_subject_id, admin_ie_dict["id"], key)
def test_subject_categories(self):
authz_ie_dict = create_intra_extension(self, "policy_authz")
@@ -1458,12 +1411,10 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(subject_categories, dict)
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -1473,7 +1424,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_subject_category = {"name": "subject_category_test", "description": "subject_category_test"}
self.assertRaises(
AuthzException,
- self.manager.add_subject_category_dict,
+ self.authz_manager.add_subject_category_dict,
demo_subject_id, admin_ie_dict["id"], new_subject_category)
subject_categories = self.admin_manager.add_subject_category_dict(admin_subject_id, authz_ie_dict["id"], new_subject_category)
@@ -1490,11 +1441,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
# Delete the new subject_category
self.assertRaises(
AuthzException,
- self.manager.del_subject_category,
+ self.authz_manager.del_subject_category,
demo_subject_id, authz_ie_dict["id"], new_subject_category["id"])
self.admin_manager.del_subject_category(admin_subject_id, authz_ie_dict["id"], new_subject_category["id"])
- subject_categories = self.manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ subject_categories = self.authz_manager.get_subject_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in subject_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1507,12 +1458,10 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(object_categories, dict)
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -1522,7 +1471,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_object_category = {"name": "object_category_test", "description": "object_category_test"}
self.assertRaises(
AuthzException,
- self.manager.add_object_category_dict,
+ self.authz_manager.add_object_category_dict,
demo_subject_id, admin_ie_dict["id"], new_object_category)
object_categories = self.admin_manager.add_object_category_dict(admin_subject_id, authz_ie_dict["id"], new_object_category)
@@ -1539,11 +1488,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
# Delete the new object_category
self.assertRaises(
AuthzException,
- self.manager.del_object_category,
+ self.authz_manager.del_object_category,
demo_subject_id, authz_ie_dict["id"], new_object_category["id"])
self.admin_manager.del_object_category(admin_subject_id, authz_ie_dict["id"], new_object_category["id"])
- object_categories = self.manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ object_categories = self.authz_manager.get_object_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in object_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1556,12 +1505,10 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(action_categories, dict)
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
@@ -1571,7 +1518,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_action_category = {"name": "action_category_test", "description": "action_category_test"}
self.assertRaises(
AuthzException,
- self.manager.add_action_category_dict,
+ self.authz_manager.add_action_category_dict,
demo_subject_id, admin_ie_dict["id"], new_action_category)
action_categories = self.admin_manager.add_action_category_dict(admin_subject_id, authz_ie_dict["id"], new_action_category)
@@ -1588,11 +1535,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
# Delete the new action_category
self.assertRaises(
AuthzException,
- self.manager.del_action_category,
+ self.authz_manager.del_action_category,
demo_subject_id, authz_ie_dict["id"], new_action_category["id"])
self.admin_manager.del_action_category(admin_subject_id, authz_ie_dict["id"], new_action_category["id"])
- action_categories = self.manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
+ action_categories = self.authz_manager.get_action_categories_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in action_categories.iteritems():
self.assertIsInstance(value, dict)
self.assertIn("name", value)
@@ -1605,11 +1552,9 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -1622,7 +1567,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -1677,11 +1622,9 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
object_categories = self.admin_manager.add_object_category_dict(
admin_subject_id,
@@ -1694,7 +1637,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -1749,11 +1692,9 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
action_categories = self.admin_manager.add_action_category_dict(
admin_subject_id,
@@ -1766,7 +1707,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -1821,17 +1762,15 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
admin_authz_subject_id, admin_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
demo_authz_subject_id, demo_authz_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], authz_ie_dict['id'], 'demo').iteritems().next()
- subjects_dict = self.manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
+ subjects_dict = self.authz_manager.get_subjects_dict(admin_subject_id, authz_ie_dict["id"])
subject_categories = self.admin_manager.add_subject_category_dict(
admin_subject_id,
@@ -1843,7 +1782,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
)
for subject_category_id in subject_categories:
- subject_category_scope = self.manager.get_subject_scopes_dict(
+ subject_category_scope = self.authz_manager.get_subject_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
subject_category_id)
@@ -1874,7 +1813,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_subject_category_scope_2)
subject_category_scope_2_id = subject_category_scope_2.keys()[0]
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
admin_authz_subject_id,
@@ -1883,7 +1822,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertIsInstance(subject_category_assignments, list)
self.assertEqual([], subject_category_assignments)
- subject_category_assignments = self.manager.get_subject_assignment_list(
+ subject_category_assignments = self.authz_manager.get_subject_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
demo_authz_subject_id,
@@ -1894,14 +1833,14 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.add_subject_assignment_list,
+ self.authz_manager.add_subject_assignment_list,
demo_subject_id, authz_ie_dict["id"],
admin_authz_subject_id, subject_category_id, subject_category_scope_1_id
)
self.assertRaises(
AuthzException,
- self.manager.add_subject_assignment_list,
+ self.authz_manager.add_subject_assignment_list,
demo_subject_id, authz_ie_dict["id"],
demo_authz_subject_id, subject_category_id, subject_category_scope_2_id
)
@@ -1973,13 +1912,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- objects_dict = self.manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
+ objects_dict = self.authz_manager.get_objects_dict(admin_subject_id, authz_ie_dict["id"])
object_vm1_id = None
object_vm2_id = None
@@ -2001,7 +1938,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
)
for object_category_id in object_categories:
- object_category_scope = self.manager.get_object_scopes_dict(
+ object_category_scope = self.authz_manager.get_object_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
object_category_id)
@@ -2032,7 +1969,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_object_category_scope_2)
object_category_scope_2_id = object_category_scope_2.keys()[0]
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm1_id,
@@ -2041,7 +1978,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertIsInstance(object_category_assignments, list)
self.assertEqual([], object_category_assignments)
- object_category_assignments = self.manager.get_object_assignment_list(
+ object_category_assignments = self.authz_manager.get_object_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
object_vm2_id,
@@ -2052,14 +1989,14 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.add_object_assignment_list,
+ self.authz_manager.add_object_assignment_list,
demo_subject_id, authz_ie_dict["id"],
object_vm1_id, object_category_id, object_category_scope_1_id
)
self.assertRaises(
AuthzException,
- self.manager.add_object_assignment_list,
+ self.authz_manager.add_object_assignment_list,
demo_subject_id, authz_ie_dict["id"],
object_vm2_id, object_category_id, object_category_scope_2_id
)
@@ -2131,13 +2068,11 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
- actions_dict = self.manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
+ actions_dict = self.authz_manager.get_actions_dict(admin_subject_id, authz_ie_dict["id"])
action_upload_id = None
action_list_id = None
@@ -2159,7 +2094,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
)
for action_category_id in action_categories:
- action_category_scope = self.manager.get_action_scopes_dict(
+ action_category_scope = self.authz_manager.get_action_scopes_dict(
admin_subject_id,
authz_ie_dict["id"],
action_category_id)
@@ -2190,7 +2125,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
new_action_category_scope_2)
action_category_scope_2_id = action_category_scope_2.keys()[0]
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_upload_id,
@@ -2199,7 +2134,7 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertIsInstance(action_category_assignments, list)
self.assertEqual([], action_category_assignments)
- action_category_assignments = self.manager.get_action_assignment_list(
+ action_category_assignments = self.authz_manager.get_action_assignment_list(
admin_subject_id,
authz_ie_dict["id"],
action_list_id,
@@ -2210,14 +2145,14 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
self.assertRaises(
AuthzException,
- self.manager.add_action_assignment_list,
+ self.authz_manager.add_action_assignment_list,
demo_subject_id, authz_ie_dict["id"],
action_upload_id, action_category_id, action_category_scope_1_id
)
self.assertRaises(
AuthzException,
- self.manager.add_action_assignment_list,
+ self.authz_manager.add_action_assignment_list,
demo_subject_id, authz_ie_dict["id"],
action_list_id, action_category_id, action_category_scope_2_id
)
@@ -2289,11 +2224,9 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
aggregation_algorithms = self.admin_manager.get_aggregation_algorithm_dict(admin_subject_id, authz_ie_dict["id"])
for key, value in aggregation_algorithms.iteritems():
@@ -2331,11 +2264,9 @@ class TestIntraExtensionAuthzManagerAuthzKO(tests.TestCase):
tenant, mapping = create_mapping(self, "demo", authz_ie_dict['id'], admin_ie_dict['id'])
admin_subject_id, admin_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
- demo_subject_dict = self.admin_manager.add_subject_dict(admin_subject_id, admin_ie_dict["id"],
- {"name": "demo", "description": "demo"})
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'admin').iteritems().next()
demo_subject_id, demo_subject_dict = \
- self.tenant_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
+ self.admin_api.get_subject_dict_from_keystone_name(tenant['id'], admin_ie_dict['id'], 'demo').iteritems().next()
sub_meta_rules = self.admin_manager.get_sub_meta_rules_dict(admin_subject_id, authz_ie_dict["id"])
self.assertIsInstance(sub_meta_rules, dict)
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py
index 17e70018..37d210aa 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py
@@ -17,7 +17,7 @@ from keystone import resource
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
from keystone.contrib.moon.core import LogManager, TenantManager
-from keystone.contrib.moon.core import ADMIN_ID
+from keystone.tests.moon.unit import *
CONF = cfg.CONF
@@ -41,15 +41,18 @@ class TestIntraExtensionAdminManager(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestIntraExtensionAdminManager, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
- self.admin = self.create_user(username="admin")
- self.demo = self.create_user(username="demo")
- self.root_intra_extension = self.create_intra_extension(policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = IntraExtensionAdminManager()
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
+ self.admin = create_user(self, username="admin")
+ self.demo = create_user(self, username="demo")
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
+ self.admin_manager = self.admin_api
+ self.tenant_manager = self.tenant_api
def __get_key_from_value(self, value, values_dict):
return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0]
@@ -71,43 +74,6 @@ class TestIntraExtensionAdminManager(tests.TestCase):
group='moon',
policy_directory=self.policy_directory)
- def create_intra_extension(self, policy_model="policy_rbac_admin"):
- # Create the admin user because IntraExtension needs it
- self.admin = self.identity_api.create_user(USER_ADMIN)
- IE["policymodel"] = policy_model
- self.ref = self.manager.load_intra_extension_dict(ADMIN_ID, intra_extension_dict=IE)
- self.assertIsInstance(self.ref, dict)
- self.create_tenant(self.ref["id"])
-
- def create_tenant(self, authz_uuid):
- tenant = {
- "id": uuid.uuid4().hex,
- "name": "TestAuthzIntraExtensionManager",
- "enabled": True,
- "description": "",
- "domain_id": "default"
- }
- project = self.resource_api.create_project(tenant["id"], tenant)
- mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None)
- self.assertIsInstance(mapping, dict)
- self.assertIn("authz", mapping)
- self.assertEqual(mapping["authz"], authz_uuid)
- return mapping
-
- def create_user(self, username="TestAdminIntraExtensionManagerUser"):
- user = {
- "id": uuid.uuid4().hex,
- "name": username,
- "enabled": True,
- "description": "",
- "domain_id": "default"
- }
- _user = self.identity_api.create_user(user)
- return _user
-
- def delete_admin_intra_extension(self):
- self.manager.del_intra_extension(self.ref["id"])
-
def send_logs(self):
log_authz = "Test for authz " + uuid.uuid4().hex
logs = []
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py
index 995b6a54..3c136ccd 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py
@@ -14,8 +14,8 @@ from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
from keystone.contrib.moon.core import LogManager
from keystone.contrib.moon.core import ConfigurationManager
-from keystone.contrib.moon.core import ADMIN_ID
from keystone.common import dependency
+from keystone.tests.moon.unit import *
CONF = cfg.CONF
@@ -37,17 +37,18 @@ class TestTenantManager(tests.TestCase):
def setUp(self):
self.useFixture(database.Database())
super(TestTenantManager, self).setUp()
- self.load_backends()
self.load_fixtures(default_fixtures)
- self.admin = self.create_user(username="admin")
- self.demo = self.create_user(username="demo")
- self.root_intra_extension = self.create_intra_extension(policy_model="policy_root")
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- self.manager = self.tenant_api
- # self.configuration_api = self.configuration_api
- # self.configuration_api.init_default_variables()
+ self.load_backends()
+ domain = {'id': "default", 'name': "default"}
+ self.resource_api.create_domain(domain['id'], domain)
+ self.admin = create_user(self, username="admin")
+ self.demo = create_user(self, username="demo")
+ self.root_intra_extension = self.root_api.get_root_extension_dict()
+ self.root_intra_extension_id = self.root_intra_extension.keys()[0]
+ self.ADMIN_ID = self.root_api.get_root_admin_id()
+ self.authz_manager = self.authz_api
+ self.admin_manager = self.admin_api
+ self.tenant_manager = self.tenant_api
def load_extra_backends(self):
return {
@@ -67,30 +68,9 @@ class TestTenantManager(tests.TestCase):
group='moon',
policy_directory=self.policy_directory)
- def create_user(self, username="admin"):
-
- _USER = dict(USER)
- _USER["name"] = username
- return self.identity_api.create_user(_USER)
-
- def create_intra_extension(self, policy_model="policy_authz"):
-
- IE["model"] = policy_model
- IE["name"] = uuid.uuid4().hex
- genre = "admin"
- if "authz" in policy_model:
- genre = "authz"
- IE["genre"] = genre
- # force re-initialization of the ADMIN_ID variable
- from keystone.contrib.moon.core import ADMIN_ID
- self.ADMIN_ID = ADMIN_ID
- ref = self.admin_api.load_intra_extension_dict(self.ADMIN_ID, intra_extension_dict=IE)
- self.assertIsInstance(ref, dict)
- return ref
-
def test_add_tenant(self):
- authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
- admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ authz_intra_extension = create_intra_extension(self, policy_model="policy_authz")
+ admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
new_tenant = {
"id": uuid.uuid4().hex,
"name": "demo",
@@ -98,129 +78,128 @@ class TestTenantManager(tests.TestCase):
"intra_authz_extension_id": authz_intra_extension['id'],
"intra_admin_extension_id": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ data = self.tenant_manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["id"], data_id)
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data["id"])
+ data_id = data["id"]
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data_id)
data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_admin_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data["id"])
+ data_id = data["id"]
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data_id)
def test_del_tenant(self):
- authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
- admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ authz_intra_extension = create_intra_extension(self, policy_model="policy_authz")
+ admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
new_tenant = {
- "id": uuid.uuid4().hex,
"name": "demo",
"description": uuid.uuid4().hex,
"intra_authz_extension_id": authz_intra_extension['id'],
"intra_admin_extension_id": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ data = self.tenant_manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- self.manager.del_tenant(self.ADMIN_ID, new_tenant["id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ self.tenant_manager.del_tenant(self.ADMIN_ID, data_id)
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertEqual(data, {})
def test_set_tenant(self):
- authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
- admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ authz_intra_extension = create_intra_extension(self, policy_model="policy_authz")
+ admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
new_tenant = {
- "id": uuid.uuid4().hex,
"name": "demo",
"description": uuid.uuid4().hex,
"intra_authz_extension_id": authz_intra_extension['id'],
"intra_admin_extension_id": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ data = self.tenant_manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
new_tenant["name"] = "demo2"
- data = self.manager.set_tenant_dict(user_id=self.ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
+ print(new_tenant)
+ data = self.tenant_manager.set_tenant_dict(user_id=self.ADMIN_ID, tenant_id=data_id, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
def test_exception_tenant_unknown(self):
- self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
- self.assertRaises(TenantUnknown, self.manager.del_tenant, self.ADMIN_ID, uuid.uuid4().hex)
- self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex, {})
+ self.assertRaises(TenantUnknown, self.tenant_manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.tenant_manager.del_tenant, self.ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.tenant_manager.set_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex, {})
- authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
- admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ authz_intra_extension = create_intra_extension(self, policy_model="policy_authz")
+ admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
new_tenant = {
- "id": uuid.uuid4().hex,
"name": "demo",
"description": uuid.uuid4().hex,
"intra_authz_extension_id": authz_intra_extension['id'],
"intra_admin_extension_id": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ data = self.tenant_manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.tenant_manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
def test_exception_tenant_added_name_existing(self):
- authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
- admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ authz_intra_extension = create_intra_extension(self, policy_model="policy_authz")
+ admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
new_tenant = {
- "id": uuid.uuid4().hex,
"name": "demo",
"description": uuid.uuid4().hex,
"intra_authz_extension_id": authz_intra_extension['id'],
"intra_admin_extension_id": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ data = self.tenant_manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
+ self.assertRaises(TenantAddedNameExisting, self.tenant_manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
def test_exception_tenant_no_intra_extension(self):
- authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
- admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
+ authz_intra_extension = create_intra_extension(self, policy_model="policy_authz")
+ admin_intra_extension = create_intra_extension(self, policy_model="policy_admin")
new_tenant = {
- "id": uuid.uuid4().hex,
"name": "demo",
"description": uuid.uuid4().hex,
"intra_authz_extension_id": authz_intra_extension['id'],
"intra_admin_extension_id": admin_intra_extension['id'],
}
new_tenant['intra_authz_extension_id'] = None
- self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
+ self.assertRaises(TenantNoIntraAuthzExtension, self.tenant_manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
new_tenant['intra_authz_extension_id'] = authz_intra_extension['id']
- data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
- self.assertEquals(new_tenant["id"], data["id"])
- self.assertEquals(new_tenant["name"], data['tenant']["name"])
- self.assertEquals(new_tenant["intra_authz_extension_id"], data['tenant']["intra_authz_extension_id"])
- self.assertEquals(new_tenant["intra_admin_extension_id"], data['tenant']["intra_admin_extension_id"])
- data = self.manager.get_tenants_dict(self.ADMIN_ID)
+ data = self.tenant_manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
+ data_id = data.keys()[0]
+ self.assertEquals(new_tenant["name"], data[data_id]["name"])
+ self.assertEquals(new_tenant["intra_authz_extension_id"], data[data_id]["intra_authz_extension_id"])
+ self.assertEquals(new_tenant["intra_admin_extension_id"], data[data_id]["intra_admin_extension_id"])
+ data = self.tenant_manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
new_tenant['intra_authz_extension_id'] = None
new_tenant['name'] = "demo2"
- self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, self.ADMIN_ID, new_tenant["id"], new_tenant)
+ self.assertRaises(TenantNoIntraAuthzExtension, self.tenant_manager.set_tenant_dict, self.ADMIN_ID, data_id, new_tenant)