diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/backend')
10 files changed, 1109 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/backend/__init__.py b/keystone-moon/keystone/tests/unit/backend/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/__init__.py diff --git a/keystone-moon/keystone/tests/unit/backend/core_ldap.py b/keystone-moon/keystone/tests/unit/backend/core_ldap.py new file mode 100644 index 00000000..9d6b23e1 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/core_ldap.py @@ -0,0 +1,161 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import ldap + +from oslo_config import cfg + +from keystone.common import cache +from keystone.common import ldap as common_ldap +from keystone.common.ldap import core as common_ldap_core +from keystone.common import sql +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures +from keystone.tests.unit import fakeldap +from keystone.tests.unit.ksfixtures import database + + +CONF = cfg.CONF + + +def create_group_container(identity_api): + # Create the groups base entry (ou=Groups,cn=example,cn=com) + group_api = identity_api.driver.group + conn = group_api.get_connection() + dn = 'ou=Groups,cn=example,cn=com' + conn.add_s(dn, [('objectclass', ['organizationalUnit']), + ('ou', ['Groups'])]) + + +class BaseBackendLdapCommon(object): + """Mixin class to set up generic LDAP backends.""" + + def setUp(self): + super(BaseBackendLdapCommon, self).setUp() + + common_ldap.register_handler('fake://', fakeldap.FakeLdap) + self.load_backends() + self.load_fixtures(default_fixtures) + + self.addCleanup(common_ldap_core._HANDLERS.clear) + self.addCleanup(self.clear_database) + + def _get_domain_fixture(self): + """Domains in LDAP are read-only, so just return the static one.""" + return self.resource_api.get_domain(CONF.identity.default_domain_id) + + def clear_database(self): + for shelf in fakeldap.FakeShelves: + fakeldap.FakeShelves[shelf].clear() + + def reload_backends(self, domain_id): + # Only one backend unless we are using separate domain backends + self.load_backends() + + def get_config(self, domain_id): + # Only one conf structure unless we are using separate domain backends + return CONF + + def config_overrides(self): + super(BaseBackendLdapCommon, self).config_overrides() + self.config_fixture.config( + group='identity', + driver='keystone.identity.backends.ldap.Identity') + + def config_files(self): + config_files = super(BaseBackendLdapCommon, self).config_files() + config_files.append(tests.dirs.tests_conf('backend_ldap.conf')) + return config_files + + def get_user_enabled_vals(self, user): + user_dn = ( + self.identity_api.driver.user._id_to_dn_string(user['id'])) + enabled_attr_name = CONF.ldap.user_enabled_attribute + + ldap_ = self.identity_api.driver.user.get_connection() + res = ldap_.search_s(user_dn, + ldap.SCOPE_BASE, + u'(sn=%s)' % user['name']) + if enabled_attr_name in res[0][1]: + return res[0][1][enabled_attr_name] + else: + return None + + +class BaseBackendLdap(object): + """Mixin class to set up an all-LDAP configuration.""" + def setUp(self): + # NOTE(dstanek): The database must be setup prior to calling the + # parent's setUp. The parent's setUp uses services (like + # credentials) that require a database. + self.useFixture(database.Database()) + super(BaseBackendLdap, self).setUp() + + def load_fixtures(self, fixtures): + # Override super impl since need to create group container. + create_group_container(self.identity_api) + super(BaseBackendLdap, self).load_fixtures(fixtures) + + +class BaseBackendLdapIdentitySqlEverythingElse(tests.SQLDriverOverrides): + """Mixin base for Identity LDAP, everything else SQL backend tests.""" + + def config_files(self): + config_files = super(BaseBackendLdapIdentitySqlEverythingElse, + self).config_files() + config_files.append(tests.dirs.tests_conf('backend_ldap_sql.conf')) + return config_files + + def setUp(self): + self.useFixture(database.Database()) + super(BaseBackendLdapIdentitySqlEverythingElse, self).setUp() + self.clear_database() + self.load_backends() + cache.configure_cache_region(cache.REGION) + self.engine = sql.get_engine() + self.addCleanup(sql.cleanup) + + sql.ModelBase.metadata.create_all(bind=self.engine) + self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine) + + self.load_fixtures(default_fixtures) + # defaulted by the data load + self.user_foo['enabled'] = True + + def config_overrides(self): + super(BaseBackendLdapIdentitySqlEverythingElse, + self).config_overrides() + self.config_fixture.config( + group='identity', + driver='keystone.identity.backends.ldap.Identity') + self.config_fixture.config( + group='resource', + driver='keystone.resource.backends.sql.Resource') + self.config_fixture.config( + group='assignment', + driver='keystone.assignment.backends.sql.Assignment') + + +class BaseBackendLdapIdentitySqlEverythingElseWithMapping(object): + """Mixin base class to test mapping of default LDAP backend. + + The default configuration is not to enable mapping when using a single + backend LDAP driver. However, a cloud provider might want to enable + the mapping, hence hiding the LDAP IDs from any clients of keystone. + Setting backward_compatible_ids to False will enable this mapping. + + """ + def config_overrides(self): + super(BaseBackendLdapIdentitySqlEverythingElseWithMapping, + self).config_overrides() + self.config_fixture.config(group='identity_mapping', + backward_compatible_ids=False) diff --git a/keystone-moon/keystone/tests/unit/backend/core_sql.py b/keystone-moon/keystone/tests/unit/backend/core_sql.py new file mode 100644 index 00000000..9cbd858e --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/core_sql.py @@ -0,0 +1,53 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sqlalchemy + +from keystone.common import sql +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures +from keystone.tests.unit.ksfixtures import database + + +class BaseBackendSqlTests(tests.SQLDriverOverrides, tests.TestCase): + + def setUp(self): + super(BaseBackendSqlTests, self).setUp() + self.useFixture(database.Database()) + self.load_backends() + + # populate the engine with tables & fixtures + self.load_fixtures(default_fixtures) + # defaulted by the data load + self.user_foo['enabled'] = True + + def config_files(self): + config_files = super(BaseBackendSqlTests, self).config_files() + config_files.append(tests.dirs.tests_conf('backend_sql.conf')) + return config_files + + +class BaseBackendSqlModels(BaseBackendSqlTests): + + def select_table(self, name): + table = sqlalchemy.Table(name, + sql.ModelBase.metadata, + autoload=True) + s = sqlalchemy.select([table]) + return s + + def assertExpectedSchema(self, table, cols): + table = self.select_table(table) + for col, type_, length in cols: + self.assertIsInstance(table.c[col].type, type_) + if length: + self.assertEqual(length, table.c[col].type.length) diff --git a/keystone-moon/keystone/tests/unit/backend/domain_config/__init__.py b/keystone-moon/keystone/tests/unit/backend/domain_config/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/domain_config/__init__.py diff --git a/keystone-moon/keystone/tests/unit/backend/domain_config/core.py b/keystone-moon/keystone/tests/unit/backend/domain_config/core.py new file mode 100644 index 00000000..da2e9bd9 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/domain_config/core.py @@ -0,0 +1,523 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +import mock +from testtools import matchers + +from keystone import exception + + +class DomainConfigTests(object): + + def setUp(self): + self.domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.resource_api.create_domain(self.domain['id'], self.domain) + self.addCleanup(self.clean_up_domain) + + def clean_up_domain(self): + # NOTE(henry-nash): Deleting the domain will also delete any domain + # configs for this domain. + self.domain['enabled'] = False + self.resource_api.update_domain(self.domain['id'], self.domain) + self.resource_api.delete_domain(self.domain['id']) + del self.domain + + def _domain_config_crud(self, sensitive): + group = uuid.uuid4().hex + option = uuid.uuid4().hex + value = uuid.uuid4().hex + self.domain_config_api.create_config_option( + self.domain['id'], group, option, value, sensitive) + res = self.domain_config_api.get_config_option( + self.domain['id'], group, option, sensitive) + config = {'group': group, 'option': option, 'value': value} + self.assertEqual(config, res) + + value = uuid.uuid4().hex + self.domain_config_api.update_config_option( + self.domain['id'], group, option, value, sensitive) + res = self.domain_config_api.get_config_option( + self.domain['id'], group, option, sensitive) + config = {'group': group, 'option': option, 'value': value} + self.assertEqual(config, res) + + self.domain_config_api.delete_config_options( + self.domain['id'], group, option, sensitive) + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config_option, + self.domain['id'], group, option, sensitive) + # ...and silent if we try to delete it again + self.domain_config_api.delete_config_options( + self.domain['id'], group, option, sensitive) + + def test_whitelisted_domain_config_crud(self): + self._domain_config_crud(sensitive=False) + + def test_sensitive_domain_config_crud(self): + self._domain_config_crud(sensitive=True) + + def _list_domain_config(self, sensitive): + """Test listing by combination of domain, group & option.""" + + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': 100} + for config in [config1, config2, config3]: + self.domain_config_api.create_config_option( + self.domain['id'], config['group'], config['option'], + config['value'], sensitive) + + # Try listing all items from a domain + res = self.domain_config_api.list_config_options( + self.domain['id'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(3)) + for res_entry in res: + self.assertIn(res_entry, [config1, config2, config3]) + + # Try listing by domain and group + res = self.domain_config_api.list_config_options( + self.domain['id'], group=config1['group'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(2)) + for res_entry in res: + self.assertIn(res_entry, [config1, config2]) + + # Try listing by domain, group and option + res = self.domain_config_api.list_config_options( + self.domain['id'], group=config2['group'], + option=config2['option'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(1)) + self.assertEqual(config2, res[0]) + + def test_list_whitelisted_domain_config_crud(self): + self._list_domain_config(False) + + def test_list_sensitive_domain_config_crud(self): + self._list_domain_config(True) + + def _delete_domain_configs(self, sensitive): + """Test deleting by combination of domain, group & option.""" + + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 and config3 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config3 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + for config in [config1, config2, config3, config4]: + self.domain_config_api.create_config_option( + self.domain['id'], config['group'], config['option'], + config['value'], sensitive) + + # Try deleting by domain, group and option + res = self.domain_config_api.delete_config_options( + self.domain['id'], group=config2['group'], + option=config2['option'], sensitive=sensitive) + res = self.domain_config_api.list_config_options( + self.domain['id'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(3)) + for res_entry in res: + self.assertIn(res_entry, [config1, config3, config4]) + + # Try deleting by domain and group + res = self.domain_config_api.delete_config_options( + self.domain['id'], group=config4['group'], sensitive=sensitive) + res = self.domain_config_api.list_config_options( + self.domain['id'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(2)) + for res_entry in res: + self.assertIn(res_entry, [config1, config3]) + + # Try deleting all items from a domain + res = self.domain_config_api.delete_config_options( + self.domain['id'], sensitive=sensitive) + res = self.domain_config_api.list_config_options( + self.domain['id'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(0)) + + def test_delete_whitelisted_domain_configs(self): + self._delete_domain_configs(False) + + def test_delete_sensitive_domain_configs(self): + self._delete_domain_configs(True) + + def _create_domain_config_twice(self, sensitive): + """Test conflict error thrown if create the same option twice.""" + + config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + + self.domain_config_api.create_config_option( + self.domain['id'], config['group'], config['option'], + config['value'], sensitive=sensitive) + self.assertRaises(exception.Conflict, + self.domain_config_api.create_config_option, + self.domain['id'], config['group'], config['option'], + config['value'], sensitive=sensitive) + + def test_create_whitelisted_domain_config_twice(self): + self._create_domain_config_twice(False) + + def test_create_sensitive_domain_config_twice(self): + self._create_domain_config_twice(True) + + def test_delete_domain_deletes_configs(self): + """Test domain deletion clears the domain configs.""" + + domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.resource_api.create_domain(domain['id'], domain) + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + self.domain_config_api.create_config_option( + domain['id'], config1['group'], config1['option'], + config1['value']) + self.domain_config_api.create_config_option( + domain['id'], config2['group'], config2['option'], + config2['value'], sensitive=True) + res = self.domain_config_api.list_config_options( + domain['id']) + self.assertThat(res, matchers.HasLength(1)) + res = self.domain_config_api.list_config_options( + domain['id'], sensitive=True) + self.assertThat(res, matchers.HasLength(1)) + + # Now delete the domain + domain['enabled'] = False + self.resource_api.update_domain(domain['id'], domain) + self.resource_api.delete_domain(domain['id']) + + # Check domain configs have also been deleted + res = self.domain_config_api.list_config_options( + domain['id']) + self.assertThat(res, matchers.HasLength(0)) + res = self.domain_config_api.list_config_options( + domain['id'], sensitive=True) + self.assertThat(res, matchers.HasLength(0)) + + def test_create_domain_config_including_sensitive_option(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # password is sensitive, so check that the whitelisted portion and + # the sensitive piece have been stored in the appropriate locations. + res = self.domain_config_api.get_config(self.domain['id']) + config_whitelisted = copy.deepcopy(config) + config_whitelisted['ldap'].pop('password') + self.assertEqual(config_whitelisted, res) + res = self.domain_config_api.get_config_option( + self.domain['id'], 'ldap', 'password', sensitive=True) + self.assertEqual(config['ldap']['password'], res['value']) + + # Finally, use the non-public API to get back the whole config + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + def test_get_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + res = self.domain_config_api.get_config(self.domain['id'], + group='identity') + config_partial = copy.deepcopy(config) + config_partial.pop('ldap') + self.assertEqual(config_partial, res) + res = self.domain_config_api.get_config( + self.domain['id'], group='ldap', option='user_tree_dn') + self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res) + # ...but we should fail to get a sensitive option + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='ldap', option='password') + + def test_delete_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + self.domain_config_api.delete_config( + self.domain['id'], group='identity') + config_partial = copy.deepcopy(config) + config_partial.pop('identity') + config_partial['ldap'].pop('password') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(config_partial, res) + + self.domain_config_api.delete_config( + self.domain['id'], group='ldap', option='url') + config_partial = copy.deepcopy(config_partial) + config_partial['ldap'].pop('url') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(config_partial, res) + + def test_get_options_not_in_domain_config(self): + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id']) + config = {'ldap': {'url': uuid.uuid4().hex}} + + self.domain_config_api.create_config(self.domain['id'], config) + + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='identity') + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='ldap', option='user_tree_dn') + + def test_get_sensitive_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual({}, res) + self.domain_config_api.create_config(self.domain['id'], config) + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + def test_update_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # Try updating a group + new_config = {'ldap': {'url': uuid.uuid4().hex, + 'user_filter': uuid.uuid4().hex}} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap') + expected_config = copy.deepcopy(config) + expected_config['ldap']['url'] = new_config['ldap']['url'] + expected_config['ldap']['user_filter'] = ( + new_config['ldap']['user_filter']) + expected_full_config = copy.deepcopy(expected_config) + expected_config['ldap'].pop('password') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_config, res) + # The sensitive option should still existsss + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(expected_full_config, res) + + # Try updating a single whitelisted option + self.domain_config_api.delete_config(self.domain['id']) + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'url': uuid.uuid4().hex} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap', option='url') + + # Make sure whitelisted and full config is updated + expected_whitelisted_config = copy.deepcopy(config) + expected_whitelisted_config['ldap']['url'] = new_config['url'] + expected_full_config = copy.deepcopy(expected_whitelisted_config) + expected_whitelisted_config['ldap'].pop('password') + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(expected_full_config, res) + + # Try updating a single sensitive option + self.domain_config_api.delete_config(self.domain['id']) + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'password': uuid.uuid4().hex} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap', option='password') + # The whitelisted config should not have changed... + expected_whitelisted_config = copy.deepcopy(config) + expected_full_config = copy.deepcopy(config) + expected_whitelisted_config['ldap'].pop('password') + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_whitelisted_config, res) + expected_full_config['ldap']['password'] = new_config['password'] + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + # ...but the sensitive piece should have. + self.assertEqual(expected_full_config, res) + + def test_update_invalid_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + # An extra group, when specifying one group should fail + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='ldap') + # An extra option, when specifying one option should fail + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config['ldap'], + group='ldap', option='url') + + # Now try the right number of groups/options, but just not + # ones that are in the config provided + config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='identity') + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config['ldap'], group='ldap', + option='url') + + # Now some valid groups/options, but just not ones that are in the + # existing config + config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}} + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.update_config, + self.domain['id'], config_wrong_group, + group='identity') + config_wrong_option = {'url': uuid.uuid4().hex} + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.update_config, + self.domain['id'], config_wrong_option, + group='ldap', option='url') + + # And finally just some bad groups/options + bad_group = uuid.uuid4().hex + config = {bad_group: {'user': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group=bad_group, + option='user') + bad_option = uuid.uuid4().hex + config = {'ldap': {bad_option: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='ldap', + option=bad_option) + + def test_create_invalid_domain_config(self): + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], {}) + config = {uuid.uuid4().hex: uuid.uuid4().hex} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + # Try an option that IS in the standard conf, but neither whitelisted + # or marked as sensitive + config = {'ldap': {'role_tree_dn': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + + def test_delete_invalid_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + # Try deleting a group not in the config + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.delete_config, + self.domain['id'], group='identity') + # Try deleting an option not in the config + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.delete_config, + self.domain['id'], + group='ldap', option='user_tree_dn') + + def test_sensitive_substitution_in_domain_config(self): + # Create a config that contains a whitelisted option that requires + # substitution of a sensitive option. + config = {'ldap': {'url': 'my_url/%(password)s', + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # Read back the config with the internal method and ensure that the + # substitution has taken place. + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + expected_url = ( + config['ldap']['url'] % {'password': config['ldap']['password']}) + self.assertEqual(expected_url, res['ldap']['url']) + + def test_invalid_sensitive_substitution_in_domain_config(self): + """Check that invalid substitutions raise warnings.""" + + mock_log = mock.Mock() + + invalid_option_config = { + 'ldap': {'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + for invalid_option in ['my_url/%(passssword)s', + 'my_url/%(password', + 'my_url/%(password)', + 'my_url/%(password)d']: + invalid_option_config['ldap']['url'] = invalid_option + self.domain_config_api.create_config( + self.domain['id'], invalid_option_config) + + with mock.patch('keystone.resource.core.LOG', mock_log): + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + mock_log.warn.assert_any_call(mock.ANY) + self.assertEqual( + invalid_option_config['ldap']['url'], res['ldap']['url']) + + def test_escaped_sequence_in_domain_config(self): + """Check that escaped '%(' doesn't get interpreted.""" + + mock_log = mock.Mock() + + escaped_option_config = { + 'ldap': {'url': 'my_url/%%(password)s', + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + self.domain_config_api.create_config( + self.domain['id'], escaped_option_config) + + with mock.patch('keystone.resource.core.LOG', mock_log): + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertFalse(mock_log.warn.called) + # The escaping '%' should have been removed + self.assertEqual('my_url/%(password)s', res['ldap']['url']) diff --git a/keystone-moon/keystone/tests/unit/backend/domain_config/test_sql.py b/keystone-moon/keystone/tests/unit/backend/domain_config/test_sql.py new file mode 100644 index 00000000..6459ede1 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/domain_config/test_sql.py @@ -0,0 +1,41 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from keystone.common import sql +from keystone.tests.unit.backend import core_sql +from keystone.tests.unit.backend.domain_config import core + + +class SqlDomainConfigModels(core_sql.BaseBackendSqlModels): + + def test_whitelisted_model(self): + cols = (('domain_id', sql.String, 64), + ('group', sql.String, 255), + ('option', sql.String, 255), + ('value', sql.JsonBlob, None)) + self.assertExpectedSchema('whitelisted_config', cols) + + def test_sensitive_model(self): + cols = (('domain_id', sql.String, 64), + ('group', sql.String, 255), + ('option', sql.String, 255), + ('value', sql.JsonBlob, None)) + self.assertExpectedSchema('sensitive_config', cols) + + +class SqlDomainConfig(core_sql.BaseBackendSqlTests, core.DomainConfigTests): + def setUp(self): + super(SqlDomainConfig, self).setUp() + # core.DomainConfigTests is effectively a mixin class, so make sure we + # call its setup + core.DomainConfigTests.setUp(self) diff --git a/keystone-moon/keystone/tests/unit/backend/role/__init__.py b/keystone-moon/keystone/tests/unit/backend/role/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/role/__init__.py diff --git a/keystone-moon/keystone/tests/unit/backend/role/core.py b/keystone-moon/keystone/tests/unit/backend/role/core.py new file mode 100644 index 00000000..f6e47fe9 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/role/core.py @@ -0,0 +1,130 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from keystone import exception +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures + + +class RoleTests(object): + + def test_get_role_404(self): + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + uuid.uuid4().hex) + + def test_create_duplicate_role_name_fails(self): + role = {'id': 'fake1', + 'name': 'fake1name'} + self.role_api.create_role('fake1', role) + role['id'] = 'fake2' + self.assertRaises(exception.Conflict, + self.role_api.create_role, + 'fake2', + role) + + def test_rename_duplicate_role_name_fails(self): + role1 = { + 'id': 'fake1', + 'name': 'fake1name' + } + role2 = { + 'id': 'fake2', + 'name': 'fake2name' + } + self.role_api.create_role('fake1', role1) + self.role_api.create_role('fake2', role2) + role1['name'] = 'fake2name' + self.assertRaises(exception.Conflict, + self.role_api.update_role, + 'fake1', + role1) + + def test_role_crud(self): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.role_api.create_role(role['id'], role) + role_ref = self.role_api.get_role(role['id']) + role_ref_dict = {x: role_ref[x] for x in role_ref} + self.assertDictEqual(role_ref_dict, role) + + role['name'] = uuid.uuid4().hex + updated_role_ref = self.role_api.update_role(role['id'], role) + role_ref = self.role_api.get_role(role['id']) + role_ref_dict = {x: role_ref[x] for x in role_ref} + self.assertDictEqual(role_ref_dict, role) + self.assertDictEqual(role_ref_dict, updated_role_ref) + + self.role_api.delete_role(role['id']) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role['id']) + + def test_update_role_404(self): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assertRaises(exception.RoleNotFound, + self.role_api.update_role, + role['id'], + role) + + def test_list_roles(self): + roles = self.role_api.list_roles() + self.assertEqual(len(default_fixtures.ROLES), len(roles)) + role_ids = set(role['id'] for role in roles) + expected_role_ids = set(role['id'] for role in default_fixtures.ROLES) + self.assertEqual(expected_role_ids, role_ids) + + @tests.skip_if_cache_disabled('role') + def test_cache_layer_role_crud(self): + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role_id = role['id'] + # Create role + self.role_api.create_role(role_id, role) + role_ref = self.role_api.get_role(role_id) + updated_role_ref = copy.deepcopy(role_ref) + updated_role_ref['name'] = uuid.uuid4().hex + # Update role, bypassing the role api manager + self.role_api.driver.update_role(role_id, updated_role_ref) + # Verify get_role still returns old ref + self.assertDictEqual(role_ref, self.role_api.get_role(role_id)) + # Invalidate Cache + self.role_api.get_role.invalidate(self.role_api, role_id) + # Verify get_role returns the new role_ref + self.assertDictEqual(updated_role_ref, + self.role_api.get_role(role_id)) + # Update role back to original via the assignment api manager + self.role_api.update_role(role_id, role_ref) + # Verify get_role returns the original role ref + self.assertDictEqual(role_ref, self.role_api.get_role(role_id)) + # Delete role bypassing the role api manager + self.role_api.driver.delete_role(role_id) + # Verify get_role still returns the role_ref + self.assertDictEqual(role_ref, self.role_api.get_role(role_id)) + # Invalidate cache + self.role_api.get_role.invalidate(self.role_api, role_id) + # Verify RoleNotFound is now raised + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role_id) + # recreate role + self.role_api.create_role(role_id, role) + self.role_api.get_role(role_id) + # delete role via the assignment api manager + self.role_api.delete_role(role_id) + # verity RoleNotFound is now raised + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role_id) diff --git a/keystone-moon/keystone/tests/unit/backend/role/test_ldap.py b/keystone-moon/keystone/tests/unit/backend/role/test_ldap.py new file mode 100644 index 00000000..ba4b7c6e --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/role/test_ldap.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from oslo_config import cfg + +from keystone import exception +from keystone.tests import unit as tests +from keystone.tests.unit.backend import core_ldap +from keystone.tests.unit.backend.role import core as core_role +from keystone.tests.unit import default_fixtures + + +CONF = cfg.CONF + + +class LdapRoleCommon(core_ldap.BaseBackendLdapCommon, core_role.RoleTests): + """Tests that should be run in every LDAP configuration. + + Include additional tests that are unique to LDAP (or need to be overridden) + which should be run for all the various LDAP configurations we test. + + """ + pass + + +class LdapRole(LdapRoleCommon, core_ldap.BaseBackendLdap, tests.TestCase): + """Test in an all-LDAP configuration. + + Include additional tests that are unique to LDAP (or need to be overridden) + which only need to be run in a basic LDAP configurations. + + """ + def test_configurable_allowed_role_actions(self): + role = {'id': u'fäké1', 'name': u'fäké1'} + self.role_api.create_role(u'fäké1', role) + role_ref = self.role_api.get_role(u'fäké1') + self.assertEqual(u'fäké1', role_ref['id']) + + role['name'] = u'fäké2' + self.role_api.update_role(u'fäké1', role) + + self.role_api.delete_role(u'fäké1') + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + u'fäké1') + + def test_configurable_forbidden_role_actions(self): + self.config_fixture.config( + group='ldap', role_allow_create=False, role_allow_update=False, + role_allow_delete=False) + self.load_backends() + + role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.assertRaises(exception.ForbiddenAction, + self.role_api.create_role, + role['id'], + role) + + self.role_member['name'] = uuid.uuid4().hex + self.assertRaises(exception.ForbiddenAction, + self.role_api.update_role, + self.role_member['id'], + self.role_member) + + self.assertRaises(exception.ForbiddenAction, + self.role_api.delete_role, + self.role_member['id']) + + def test_role_filter(self): + role_ref = self.role_api.get_role(self.role_member['id']) + self.assertDictEqual(role_ref, self.role_member) + + self.config_fixture.config(group='ldap', + role_filter='(CN=DOES_NOT_MATCH)') + self.load_backends() + # NOTE(morganfainberg): CONF.ldap.role_filter will not be + # dynamically changed at runtime. This invalidate is a work-around for + # the expectation that it is safe to change config values in tests that + # could affect what the drivers would return up to the manager. This + # solves this assumption when working with aggressive (on-create) + # cache population. + self.role_api.get_role.invalidate(self.role_api, + self.role_member['id']) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + self.role_member['id']) + + def test_role_attribute_mapping(self): + self.config_fixture.config(group='ldap', role_name_attribute='ou') + self.clear_database() + self.load_backends() + self.load_fixtures(default_fixtures) + # NOTE(morganfainberg): CONF.ldap.role_name_attribute will not be + # dynamically changed at runtime. This invalidate is a work-around for + # the expectation that it is safe to change config values in tests that + # could affect what the drivers would return up to the manager. This + # solves this assumption when working with aggressive (on-create) + # cache population. + self.role_api.get_role.invalidate(self.role_api, + self.role_member['id']) + role_ref = self.role_api.get_role(self.role_member['id']) + self.assertEqual(self.role_member['id'], role_ref['id']) + self.assertEqual(self.role_member['name'], role_ref['name']) + + self.config_fixture.config(group='ldap', role_name_attribute='sn') + self.load_backends() + # NOTE(morganfainberg): CONF.ldap.role_name_attribute will not be + # dynamically changed at runtime. This invalidate is a work-around for + # the expectation that it is safe to change config values in tests that + # could affect what the drivers would return up to the manager. This + # solves this assumption when working with aggressive (on-create) + # cache population. + self.role_api.get_role.invalidate(self.role_api, + self.role_member['id']) + role_ref = self.role_api.get_role(self.role_member['id']) + self.assertEqual(self.role_member['id'], role_ref['id']) + self.assertNotIn('name', role_ref) + + def test_role_attribute_ignore(self): + self.config_fixture.config(group='ldap', + role_attribute_ignore=['name']) + self.clear_database() + self.load_backends() + self.load_fixtures(default_fixtures) + # NOTE(morganfainberg): CONF.ldap.role_attribute_ignore will not be + # dynamically changed at runtime. This invalidate is a work-around for + # the expectation that it is safe to change config values in tests that + # could affect what the drivers would return up to the manager. This + # solves this assumption when working with aggressive (on-create) + # cache population. + self.role_api.get_role.invalidate(self.role_api, + self.role_member['id']) + role_ref = self.role_api.get_role(self.role_member['id']) + self.assertEqual(self.role_member['id'], role_ref['id']) + self.assertNotIn('name', role_ref) + + +class LdapIdentitySqlEverythingElseRole( + core_ldap.BaseBackendLdapIdentitySqlEverythingElse, LdapRoleCommon, + tests.TestCase): + """Test Identity in LDAP, Everything else in SQL.""" + pass + + +class LdapIdentitySqlEverythingElseWithMappingRole( + LdapIdentitySqlEverythingElseRole, + core_ldap.BaseBackendLdapIdentitySqlEverythingElseWithMapping): + """Test ID mapping of default LDAP backend.""" + pass diff --git a/keystone-moon/keystone/tests/unit/backend/role/test_sql.py b/keystone-moon/keystone/tests/unit/backend/role/test_sql.py new file mode 100644 index 00000000..79ff148a --- /dev/null +++ b/keystone-moon/keystone/tests/unit/backend/role/test_sql.py @@ -0,0 +1,40 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystone.common import sql +from keystone import exception +from keystone.tests.unit.backend import core_sql +from keystone.tests.unit.backend.role import core + + +class SqlRoleModels(core_sql.BaseBackendSqlModels): + + def test_role_model(self): + cols = (('id', sql.String, 64), + ('name', sql.String, 255)) + self.assertExpectedSchema('role', cols) + + +class SqlRole(core_sql.BaseBackendSqlTests, core.RoleTests): + + def test_create_null_role_name(self): + role = {'id': uuid.uuid4().hex, + 'name': None} + self.assertRaises(exception.UnexpectedError, + self.role_api.create_role, + role['id'], + role) + self.assertRaises(exception.RoleNotFound, + self.role_api.get_role, + role['id']) |