summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--keystone-moon/keystone/contrib/moon/core.py142
-rw-r--r--keystone-moon/keystone/contrib/moon/exception.py6
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py62
3 files changed, 134 insertions, 76 deletions
diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py
index 1b37b6df..f69da788 100644
--- a/keystone-moon/keystone/contrib/moon/core.py
+++ b/keystone-moon/keystone/contrib/moon/core.py
@@ -25,9 +25,9 @@ from keystone.contrib.moon.algorithms import *
CONF = config.CONF
LOG = log.getLogger(__name__)
-# TODO: call functions to get these 2 variables
-ADMIN_ID = uuid4().hex # default user_id for internal invocation
-ROOT_EXTENSION_ID = uuid4().hex
+ADMIN_ID = None # default user_id for internal invocation
+ROOT_EXTENSION_ID = None
+ROOT_EXTENSION_MODEL = "policy_root"
_OPTS = [
@@ -107,9 +107,31 @@ def enforce(action_names, object_name, **extra):
_action_name_list = action_names
_object_name = object_name
+ def get_root_extension(self, args, kwargs):
+ if not ROOT_EXTENSION_ID:
+ global ROOT_EXTENSION_MODEL, ROOT_EXTENSION_ID, ADMIN_ID
+ try:
+ # if it is the first time we passed here, the root extension may be not initialized
+ # specially during unittest. So we raise RootExtensionNotInitialized to authorize the
+ # current creation process
+ if 'intra_extension_dict' in kwargs:
+ intra_extension_dict = kwargs['intra_extension_dict']
+ else:
+ intra_extension_dict = args[2]
+ print(intra_extension_dict)
+ if isinstance(intra_extension_dict, dict) and \
+ "model" in intra_extension_dict and \
+ intra_extension_dict["model"] == "policy_root":
+ raise RootExtensionNotInitialized()
+ except KeyError:
+ pass
+ return ROOT_EXTENSION_ID
+
def wrap(func):
def wrapped(*args, **kwargs):
+ global ADMIN_ID, ROOT_EXTENSION_ID
+ returned_value_for_func = None
self = args[0]
try:
user_id = args[1]
@@ -118,64 +140,85 @@ def enforce(action_names, object_name, **extra):
intra_extension_id = None
intra_admin_extension_id = None
- if user_id == ADMIN_ID:
- # TODO: check if there is no security hole here
- return func(*args, **kwargs)
-
+ try:
+ intra_admin_extension_id = get_root_extension(self, args, kwargs)
+ except RootExtensionNotInitialized:
+ returned_value_for_func = func(*args, **kwargs)
+ intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
+ for ext in intra_extensions_dict:
+ if intra_extensions_dict[ext]["model"] == ROOT_EXTENSION_MODEL:
+ ROOT_EXTENSION_ID = ext
+ break
+ if not ROOT_EXTENSION_ID:
+ raise RootExtensionUnknown()
+ print(returned_value_for_func)
+ subjects_dict = self.admin_api.driver.get_subjects_dict(returned_value_for_func['id'])
+ for subject_id in subjects_dict:
+ if subjects_dict[subject_id]["name"] == "admin":
+ ADMIN_ID = subject_id
+ break
+ if not ADMIN_ID:
+ raise RootExtensionUnknown()
+ return returned_value_for_func
try:
intra_extension_id = args[2]
except IndexError:
if 'intra_extension_id' in kwargs:
intra_extension_id = kwargs['intra_extension_id']
- else:
- intra_admin_extension_id = ROOT_EXTENSION_ID
-
- intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
- if intra_extension_id not in intra_extensions_dict:
- raise IntraExtensionUnknown()
- tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID)
- for _tenant_id in tenants_dict:
- if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \
- tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id:
- intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id']
- if not intra_admin_extension_id:
- self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.")
- return func(*args, **kwargs)
+ # else:
+ # intra_admin_extension_id = get_root_extension(self)
+
+ if ADMIN_ID and user_id == ADMIN_ID:
+ # TODO: check if there is no security hole here
+ returned_value_for_func = func(*args, **kwargs)
else:
- objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id)
- object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name
- object_id = None
- for _object_id in objects_dict:
- if objects_dict[_object_id]['name'] is object_name:
- object_id = _object_id
- break
- if type(_action_name_list) in (str, unicode):
- action_name_list = (_action_name_list, )
+ intra_extensions_dict = self.admin_api.driver.get_intra_extensions_dict()
+ if intra_extension_id not in intra_extensions_dict:
+ raise IntraExtensionUnknown()
+ tenants_dict = self.tenant_api.driver.get_tenants_dict(ADMIN_ID)
+ for _tenant_id in tenants_dict:
+ if tenants_dict[_tenant_id]['intra_authz_extension_id'] is intra_extension_id or \
+ tenants_dict[_tenant_id]['intra_admin_extension_id'] is intra_extension_id:
+ intra_admin_extension_id = tenants_dict[_tenant_id]['intra_admin_extension_id']
+ if not intra_admin_extension_id:
+ self.moonlog_api.driver.warning("No Intra_Admin_Extension found, authorization granted by default.")
+ returned_value_for_func = func(*args, **kwargs)
else:
- action_name_list = _action_name_list
- actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id)
- action_id_list = list()
- for _action_name in action_name_list:
- for _action_id in actions_dict:
- if actions_dict[_action_id]['name'] is _action_name:
- action_id_list.append(_action_id)
+ objects_dict = self.admin_api.driver.get_objects_dict(ADMIN_ID, intra_admin_extension_id)
+ object_name = intra_extensions_dict[intra_extension_id]['genre'] + '.' + _object_name
+ object_id = None
+ for _object_id in objects_dict:
+ if objects_dict[_object_id]['name'] is object_name:
+ object_id = _object_id
break
-
- authz_result = False
- for action_id in action_id_list:
- if self.driver.authz(intra_admin_extension_id, user_id, object_id, action_id):
- authz_result = True
+ if type(_action_name_list) in (str, unicode):
+ action_name_list = (_action_name_list, )
else:
- authz_result = False
- break
- if authz_result:
- return func(*args, **kwargs)
+ action_name_list = _action_name_list
+ actions_dict = self.admin_api.driver.get_actions_dict(ADMIN_ID, intra_admin_extension_id)
+ action_id_list = list()
+ for _action_name in action_name_list:
+ for _action_id in actions_dict:
+ if actions_dict[_action_id]['name'] is _action_name:
+ action_id_list.append(_action_id)
+ break
+
+ authz_result = False
+ for action_id in action_id_list:
+ if self.driver.authz(intra_admin_extension_id, user_id, object_id, action_id):
+ authz_result = True
+ else:
+ authz_result = False
+ break
+ if authz_result:
+ returned_value_for_func = func(*args, **kwargs)
+ return returned_value_for_func
return wrapped
return wrap
@dependency.provider('configuration_api')
-@dependency.requires('moonlog_api')
+@dependency.requires('moonlog_api', 'admin_api')
class ConfigurationManager(manager.Manager):
def __init__(self):
@@ -230,7 +273,7 @@ class ConfigurationManager(manager.Manager):
return None
@dependency.provider('tenant_api')
-@dependency.requires('moonlog_api', 'admin_api')
+@dependency.requires('moonlog_api', 'admin_api', 'configuration_api')
class TenantManager(manager.Manager):
def __init__(self):
@@ -259,8 +302,6 @@ class TenantManager(manager.Manager):
def add_tenant_dict(self, user_id, tenant_dict):
tenants_dict = self.driver.get_tenants_dict()
for tenant_id in tenants_dict:
- print(tenants_dict[tenant_id])
- print(tenant_dict)
if tenants_dict[tenant_id]['name'] == tenant_dict['name']:
raise TenantAddedNameExisting()
@@ -1580,6 +1621,7 @@ class IntraExtensionAuthzManager(IntraExtensionManager):
@dependency.provider('admin_api')
+# @dependency.requires('configuration_api')
class IntraExtensionAdminManager(IntraExtensionManager):
def __init__(self):
diff --git a/keystone-moon/keystone/contrib/moon/exception.py b/keystone-moon/keystone/contrib/moon/exception.py
index a53a3397..75ccd187 100644
--- a/keystone-moon/keystone/contrib/moon/exception.py
+++ b/keystone-moon/keystone/contrib/moon/exception.py
@@ -97,6 +97,12 @@ class RootExtensionUnknown(IntraExtensionUnknown):
title = 'Root Extension Unknown'
logger = "Error"
+class RootExtensionNotInitialized(IntraExtensionException):
+ message_format = _("The root_extension is not initialized.")
+ code = 400
+ title = 'Root Extension Not Initialized'
+ logger = "Error"
+
class IntraExtensionCreationError(IntraExtensionException):
message_format = _("The arguments for the creation of this Extension were malformed.")
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py
index d122b25b..6d656488 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_tenant.py
@@ -13,6 +13,7 @@ from keystone.tests.unit.ksfixtures import database
from keystone.contrib.moon.exception import *
from keystone.tests.unit import default_fixtures
from keystone.contrib.moon.core import LogManager
+from keystone.contrib.moon.core import ConfigurationManager
from keystone.contrib.moon.core import ADMIN_ID
from keystone.common import dependency
@@ -31,6 +32,7 @@ IE = {
@dependency.requires('admin_api')
class TestTenantManager(tests.TestCase):
+ ADMIN_ID = None
def setUp(self):
self.useFixture(database.Database())
@@ -39,8 +41,13 @@ class TestTenantManager(tests.TestCase):
self.load_fixtures(default_fixtures)
self.admin = self.create_user(username="admin")
self.demo = self.create_user(username="demo")
- self.manager = TenantManager()
self.root_intra_extension = self.create_intra_extension(policy_model="policy_root")
+ # force re-initialization of the ADMIN_ID variable
+ from keystone.contrib.moon.core import ADMIN_ID
+ self.ADMIN_ID = ADMIN_ID
+ self.manager = TenantManager()
+ self.configuration_api = ConfigurationManager()
+ # self.configuration_api.init_default_variables()
def load_extra_backends(self):
return {
@@ -74,7 +81,10 @@ class TestTenantManager(tests.TestCase):
if "authz" in policy_model:
genre = "authz"
IE["genre"] = genre
- ref = self.admin_api.load_intra_extension_dict(ADMIN_ID, intra_extension_dict=IE)
+ # force re-initialization of the ADMIN_ID variable
+ from keystone.contrib.moon.core import ADMIN_ID
+ self.ADMIN_ID = ADMIN_ID
+ ref = self.admin_api.load_intra_extension_dict(self.ADMIN_ID, intra_extension_dict=IE)
self.assertIsInstance(ref, dict)
return ref
@@ -88,16 +98,16 @@ class TestTenantManager(tests.TestCase):
"intra_authz_extension": authz_intra_extension['id'],
"intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+ data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_tenant["intra_authz_extension"])
+ data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_authz_extension"])
self.assertEquals(new_tenant["intra_authz_extension"], data["id"])
- data = self.admin_api.get_intra_extension_dict(ADMIN_ID, new_tenant["intra_admin_extension"])
+ data = self.admin_api.get_intra_extension_dict(self.ADMIN_ID, new_tenant["intra_admin_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data["id"])
def test_del_tenant(self):
@@ -110,15 +120,15 @@ class TestTenantManager(tests.TestCase):
"intra_authz_extension": authz_intra_extension['id'],
"intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+ data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- self.manager.del_tenant(ADMIN_ID, new_tenant["id"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ self.manager.del_tenant(self.ADMIN_ID, new_tenant["id"])
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertEqual(data, {})
def test_set_tenant(self):
@@ -131,25 +141,25 @@ class TestTenantManager(tests.TestCase):
"intra_authz_extension": authz_intra_extension['id'],
"intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+ data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
new_tenant["name"] = "demo2"
- data = self.manager.set_tenant_dict(user_id=ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant)
+ data = self.manager.set_tenant_dict(user_id=self.ADMIN_ID, tenant_id=new_tenant["id"], tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
def test_exception_tenant_unknown(self):
- self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex)
- self.assertRaises(TenantUnknown, self.manager.del_tenant, ADMIN_ID, uuid.uuid4().hex)
- self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, ADMIN_ID, uuid.uuid4().hex, {})
+ self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.manager.del_tenant, self.ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.manager.set_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex, {})
authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
admin_intra_extension = self.create_intra_extension(policy_model="policy_admin")
@@ -160,15 +170,15 @@ class TestTenantManager(tests.TestCase):
"intra_authz_extension": authz_intra_extension['id'],
"intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+ data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, ADMIN_ID, uuid.uuid4().hex)
+ self.assertRaises(TenantUnknown, self.manager.get_tenant_dict, self.ADMIN_ID, uuid.uuid4().hex)
def test_exception_tenant_added_name_existing(self):
authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
@@ -180,15 +190,15 @@ class TestTenantManager(tests.TestCase):
"intra_authz_extension": authz_intra_extension['id'],
"intra_admin_extension": admin_intra_extension['id'],
}
- data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+ data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
- self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, ADMIN_ID, new_tenant)
+ self.assertRaises(TenantAddedNameExisting, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
def test_exception_tenant_no_intra_extension(self):
authz_intra_extension = self.create_intra_extension(policy_model="policy_authz")
@@ -201,16 +211,16 @@ class TestTenantManager(tests.TestCase):
"intra_admin_extension": admin_intra_extension['id'],
}
new_tenant['intra_authz_extension'] = None
- self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, ADMIN_ID, new_tenant)
+ self.assertRaises(TenantNoIntraAuthzExtension, self.manager.add_tenant_dict, self.ADMIN_ID, new_tenant)
new_tenant['intra_authz_extension'] = authz_intra_extension['id']
- data = self.manager.add_tenant_dict(user_id=ADMIN_ID, tenant_dict=new_tenant)
+ data = self.manager.add_tenant_dict(user_id=self.ADMIN_ID, tenant_dict=new_tenant)
self.assertEquals(new_tenant["id"], data["id"])
self.assertEquals(new_tenant["name"], data['tenant']["name"])
self.assertEquals(new_tenant["intra_authz_extension"], data['tenant']["intra_authz_extension"])
self.assertEquals(new_tenant["intra_admin_extension"], data['tenant']["intra_admin_extension"])
- data = self.manager.get_tenants_dict(ADMIN_ID)
+ data = self.manager.get_tenants_dict(self.ADMIN_ID)
self.assertNotEqual(data, {})
new_tenant['intra_authz_extension'] = None
new_tenant['name'] = "demo2"
- self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, ADMIN_ID, new_tenant["id"], new_tenant)
+ self.assertRaises(TenantNoIntraAuthzExtension, self.manager.set_tenant_dict, self.ADMIN_ID, new_tenant["id"], new_tenant)