From 2e7b4f2027a1147ca28301e4f88adf8274b39a1f Mon Sep 17 00:00:00 2001 From: DUVAL Thomas Date: Thu, 9 Jun 2016 09:11:50 +0200 Subject: Update Keystone core to Mitaka. Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db --- .../keystone/tests/unit/resource/test_core.py | 692 +++++++++++++++++++++ 1 file changed, 692 insertions(+) create mode 100644 keystone-moon/keystone/tests/unit/resource/test_core.py (limited to 'keystone-moon/keystone/tests/unit/resource/test_core.py') diff --git a/keystone-moon/keystone/tests/unit/resource/test_core.py b/keystone-moon/keystone/tests/unit/resource/test_core.py new file mode 100644 index 00000000..2eb87e4c --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/test_core.py @@ -0,0 +1,692 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +import mock +from testtools import matchers + +from oslo_config import cfg +from oslotest import mockpatch + +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit.ksfixtures import database + + +CONF = cfg.CONF + + +class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase): + + def setUp(self): + super(TestResourceManagerNoFixtures, self).setUp() + self.useFixture(database.Database(self.sql_driver_version_overrides)) + self.load_backends() + + def test_ensure_default_domain_exists(self): + # When there's no default domain, ensure_default_domain_exists creates + # it. + + # First make sure there's no default domain. + self.assertRaises( + exception.DomainNotFound, + self.resource_api.get_domain, CONF.identity.default_domain_id) + + self.resource_api.ensure_default_domain_exists() + default_domain = self.resource_api.get_domain( + CONF.identity.default_domain_id) + + expected_domain = { + 'id': CONF.identity.default_domain_id, + 'name': 'Default', + 'enabled': True, + 'description': 'Domain created automatically to support V2.0 ' + 'operations.', + } + self.assertEqual(expected_domain, default_domain) + + def test_ensure_default_domain_exists_already_exists(self): + # When there's already a default domain, ensure_default_domain_exists + # doesn't do anything. + + name = uuid.uuid4().hex + description = uuid.uuid4().hex + domain_attrs = { + 'id': CONF.identity.default_domain_id, + 'name': name, + 'description': description, + } + self.resource_api.create_domain(CONF.identity.default_domain_id, + domain_attrs) + + self.resource_api.ensure_default_domain_exists() + + default_domain = self.resource_api.get_domain( + CONF.identity.default_domain_id) + + expected_domain = { + 'id': CONF.identity.default_domain_id, + 'name': name, + 'enabled': True, + 'description': description, + } + + self.assertEqual(expected_domain, default_domain) + + def test_ensure_default_domain_exists_fails(self): + # When there's an unexpected exception creating domain it's passed on. + + self.useFixture(mockpatch.PatchObject( + self.resource_api, 'create_domain', + side_effect=exception.UnexpectedError)) + + self.assertRaises(exception.UnexpectedError, + self.resource_api.ensure_default_domain_exists) + + def test_update_project_name_conflict(self): + name = uuid.uuid4().hex + description = uuid.uuid4().hex + domain_attrs = { + 'id': CONF.identity.default_domain_id, + 'name': name, + 'description': description, + } + domain = self.resource_api.create_domain( + CONF.identity.default_domain_id, domain_attrs) + project1 = unit.new_project_ref(domain_id=domain['id'], + name=uuid.uuid4().hex) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain['id'], + name=uuid.uuid4().hex) + project = self.resource_api.create_project(project2['id'], project2) + + self.assertRaises(exception.Conflict, + self.resource_api.update_project, + project['id'], {'name': project1['name']}) + + +class DomainConfigDriverTests(object): + + def _domain_config_crud(self, sensitive): + domain = uuid.uuid4().hex + group = uuid.uuid4().hex + option = uuid.uuid4().hex + value = uuid.uuid4().hex + self.driver.create_config_option( + domain, group, option, value, sensitive) + res = self.driver.get_config_option( + domain, group, option, sensitive) + config = {'group': group, 'option': option, 'value': value} + self.assertEqual(config, res) + + value = uuid.uuid4().hex + self.driver.update_config_option( + domain, group, option, value, sensitive) + res = self.driver.get_config_option( + domain, group, option, sensitive) + config = {'group': group, 'option': option, 'value': value} + self.assertEqual(config, res) + + self.driver.delete_config_options( + domain, group, option, sensitive) + self.assertRaises(exception.DomainConfigNotFound, + self.driver.get_config_option, + domain, group, option, sensitive) + # ...and silent if we try to delete it again + self.driver.delete_config_options( + domain, group, option, sensitive) + + def test_whitelisted_domain_config_crud(self): + self._domain_config_crud(sensitive=False) + + def test_sensitive_domain_config_crud(self): + self._domain_config_crud(sensitive=True) + + def _list_domain_config(self, sensitive): + """Test listing by combination of domain, group & option.""" + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': 100} + domain = uuid.uuid4().hex + + for config in [config1, config2, config3]: + self.driver.create_config_option( + domain, config['group'], config['option'], + config['value'], sensitive) + + # Try listing all items from a domain + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(3)) + for res_entry in res: + self.assertIn(res_entry, [config1, config2, config3]) + + # Try listing by domain and group + res = self.driver.list_config_options( + domain, group=config1['group'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(2)) + for res_entry in res: + self.assertIn(res_entry, [config1, config2]) + + # Try listing by domain, group and option + res = self.driver.list_config_options( + domain, group=config2['group'], + option=config2['option'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(1)) + self.assertEqual(config2, res[0]) + + def test_list_whitelisted_domain_config_crud(self): + self._list_domain_config(False) + + def test_list_sensitive_domain_config_crud(self): + self._list_domain_config(True) + + def _delete_domain_configs(self, sensitive): + """Test deleting by combination of domain, group & option.""" + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 and config3 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config3 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + domain = uuid.uuid4().hex + + for config in [config1, config2, config3, config4]: + self.driver.create_config_option( + domain, config['group'], config['option'], + config['value'], sensitive) + + # Try deleting by domain, group and option + res = self.driver.delete_config_options( + domain, group=config2['group'], + option=config2['option'], sensitive=sensitive) + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(3)) + for res_entry in res: + self.assertIn(res_entry, [config1, config3, config4]) + + # Try deleting by domain and group + res = self.driver.delete_config_options( + domain, group=config4['group'], sensitive=sensitive) + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(2)) + for res_entry in res: + self.assertIn(res_entry, [config1, config3]) + + # Try deleting all items from a domain + res = self.driver.delete_config_options( + domain, sensitive=sensitive) + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(0)) + + def test_delete_whitelisted_domain_configs(self): + self._delete_domain_configs(False) + + def test_delete_sensitive_domain_configs(self): + self._delete_domain_configs(True) + + def _create_domain_config_twice(self, sensitive): + """Test conflict error thrown if create the same option twice.""" + config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + domain = uuid.uuid4().hex + + self.driver.create_config_option( + domain, config['group'], config['option'], + config['value'], sensitive=sensitive) + self.assertRaises(exception.Conflict, + self.driver.create_config_option, + domain, config['group'], config['option'], + config['value'], sensitive=sensitive) + + def test_create_whitelisted_domain_config_twice(self): + self._create_domain_config_twice(False) + + def test_create_sensitive_domain_config_twice(self): + self._create_domain_config_twice(True) + + +class DomainConfigTests(object): + + def setUp(self): + self.domain = unit.new_domain_ref() + self.resource_api.create_domain(self.domain['id'], self.domain) + self.addCleanup(self.clean_up_domain) + + def clean_up_domain(self): + # NOTE(henry-nash): Deleting the domain will also delete any domain + # configs for this domain. + self.domain['enabled'] = False + self.resource_api.update_domain(self.domain['id'], self.domain) + self.resource_api.delete_domain(self.domain['id']) + del self.domain + + def test_create_domain_config_including_sensitive_option(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # password is sensitive, so check that the whitelisted portion and + # the sensitive piece have been stored in the appropriate locations. + res = self.domain_config_api.get_config(self.domain['id']) + config_whitelisted = copy.deepcopy(config) + config_whitelisted['ldap'].pop('password') + self.assertEqual(config_whitelisted, res) + res = self.domain_config_api.driver.get_config_option( + self.domain['id'], 'ldap', 'password', sensitive=True) + self.assertEqual(config['ldap']['password'], res['value']) + + # Finally, use the non-public API to get back the whole config + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + def test_get_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + res = self.domain_config_api.get_config(self.domain['id'], + group='identity') + config_partial = copy.deepcopy(config) + config_partial.pop('ldap') + self.assertEqual(config_partial, res) + res = self.domain_config_api.get_config( + self.domain['id'], group='ldap', option='user_tree_dn') + self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res) + # ...but we should fail to get a sensitive option + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='ldap', option='password') + + def test_delete_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + self.domain_config_api.delete_config( + self.domain['id'], group='identity') + config_partial = copy.deepcopy(config) + config_partial.pop('identity') + config_partial['ldap'].pop('password') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(config_partial, res) + + self.domain_config_api.delete_config( + self.domain['id'], group='ldap', option='url') + config_partial = copy.deepcopy(config_partial) + config_partial['ldap'].pop('url') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(config_partial, res) + + def test_get_options_not_in_domain_config(self): + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id']) + config = {'ldap': {'url': uuid.uuid4().hex}} + + self.domain_config_api.create_config(self.domain['id'], config) + + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='identity') + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='ldap', option='user_tree_dn') + + def test_get_sensitive_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual({}, res) + self.domain_config_api.create_config(self.domain['id'], config) + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + def test_update_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # Try updating a group + new_config = {'ldap': {'url': uuid.uuid4().hex, + 'user_filter': uuid.uuid4().hex}} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap') + expected_config = copy.deepcopy(config) + expected_config['ldap']['url'] = new_config['ldap']['url'] + expected_config['ldap']['user_filter'] = ( + new_config['ldap']['user_filter']) + expected_full_config = copy.deepcopy(expected_config) + expected_config['ldap'].pop('password') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_config, res) + # The sensitive option should still exist + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(expected_full_config, res) + + # Try updating a single whitelisted option + self.domain_config_api.delete_config(self.domain['id']) + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'url': uuid.uuid4().hex} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap', option='url') + + # Make sure whitelisted and full config is updated + expected_whitelisted_config = copy.deepcopy(config) + expected_whitelisted_config['ldap']['url'] = new_config['url'] + expected_full_config = copy.deepcopy(expected_whitelisted_config) + expected_whitelisted_config['ldap'].pop('password') + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(expected_full_config, res) + + # Try updating a single sensitive option + self.domain_config_api.delete_config(self.domain['id']) + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'password': uuid.uuid4().hex} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap', option='password') + # The whitelisted config should not have changed... + expected_whitelisted_config = copy.deepcopy(config) + expected_full_config = copy.deepcopy(config) + expected_whitelisted_config['ldap'].pop('password') + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_whitelisted_config, res) + expected_full_config['ldap']['password'] = new_config['password'] + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + # ...but the sensitive piece should have. + self.assertEqual(expected_full_config, res) + + def test_update_invalid_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + # An extra group, when specifying one group should fail + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='ldap') + # An extra option, when specifying one option should fail + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config['ldap'], + group='ldap', option='url') + + # Now try the right number of groups/options, but just not + # ones that are in the config provided + config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='identity') + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config['ldap'], group='ldap', + option='url') + + # Now some valid groups/options, but just not ones that are in the + # existing config + config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}} + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.update_config, + self.domain['id'], config_wrong_group, + group='identity') + config_wrong_option = {'url': uuid.uuid4().hex} + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.update_config, + self.domain['id'], config_wrong_option, + group='ldap', option='url') + + # And finally just some bad groups/options + bad_group = uuid.uuid4().hex + config = {bad_group: {'user': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group=bad_group, + option='user') + bad_option = uuid.uuid4().hex + config = {'ldap': {bad_option: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='ldap', + option=bad_option) + + def test_create_invalid_domain_config(self): + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], {}) + config = {uuid.uuid4().hex: uuid.uuid4().hex} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + # Try an option that IS in the standard conf, but neither whitelisted + # or marked as sensitive + config = {'identity': {'user_tree_dn': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + + def test_delete_invalid_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + # Try deleting a group not in the config + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.delete_config, + self.domain['id'], group='identity') + # Try deleting an option not in the config + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.delete_config, + self.domain['id'], + group='ldap', option='user_tree_dn') + + def test_sensitive_substitution_in_domain_config(self): + # Create a config that contains a whitelisted option that requires + # substitution of a sensitive option. + config = {'ldap': {'url': 'my_url/%(password)s', + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # Read back the config with the internal method and ensure that the + # substitution has taken place. + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + expected_url = ( + config['ldap']['url'] % {'password': config['ldap']['password']}) + self.assertEqual(expected_url, res['ldap']['url']) + + def test_invalid_sensitive_substitution_in_domain_config(self): + """Check that invalid substitutions raise warnings.""" + mock_log = mock.Mock() + + invalid_option_config = { + 'ldap': {'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + for invalid_option in ['my_url/%(passssword)s', + 'my_url/%(password', + 'my_url/%(password)', + 'my_url/%(password)d']: + invalid_option_config['ldap']['url'] = invalid_option + self.domain_config_api.create_config( + self.domain['id'], invalid_option_config) + + with mock.patch('keystone.resource.core.LOG', mock_log): + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + mock_log.warning.assert_any_call(mock.ANY) + self.assertEqual( + invalid_option_config['ldap']['url'], res['ldap']['url']) + + def test_escaped_sequence_in_domain_config(self): + """Check that escaped '%(' doesn't get interpreted.""" + mock_log = mock.Mock() + + escaped_option_config = { + 'ldap': {'url': 'my_url/%%(password)s', + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + self.domain_config_api.create_config( + self.domain['id'], escaped_option_config) + + with mock.patch('keystone.resource.core.LOG', mock_log): + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertFalse(mock_log.warn.called) + # The escaping '%' should have been removed + self.assertEqual('my_url/%(password)s', res['ldap']['url']) + + @unit.skip_if_cache_disabled('domain_config') + def test_cache_layer_get_sensitive_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + # cache the result + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + # delete, bypassing domain config manager api + self.domain_config_api.delete_config_options(self.domain['id']) + self.domain_config_api.delete_config_options(self.domain['id'], + sensitive=True) + + self.assertDictEqual( + res, self.domain_config_api.get_config_with_sensitive_info( + self.domain['id'])) + self.domain_config_api.get_config_with_sensitive_info.invalidate( + self.domain_config_api, self.domain['id']) + self.assertDictEqual( + {}, + self.domain_config_api.get_config_with_sensitive_info( + self.domain['id'])) + + def test_delete_domain_deletes_configs(self): + """Test domain deletion clears the domain configs.""" + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}} + self.domain_config_api.create_config(domain['id'], config) + + # Now delete the domain + domain['enabled'] = False + self.resource_api.update_domain(domain['id'], domain) + self.resource_api.delete_domain(domain['id']) + + # Check domain configs have also been deleted + self.assertRaises( + exception.DomainConfigNotFound, + self.domain_config_api.get_config, + domain['id']) + + # The get_config_with_sensitive_info does not throw an exception if + # the config is empty, it just returns an empty dict + self.assertDictEqual( + {}, + self.domain_config_api.get_config_with_sensitive_info( + domain['id'])) + + def test_config_registration(self): + type = uuid.uuid4().hex + self.domain_config_api.obtain_registration( + self.domain['id'], type) + self.domain_config_api.release_registration( + self.domain['id'], type=type) + + # Make sure that once someone has it, nobody else can get it. + # This includes the domain who already has it. + self.domain_config_api.obtain_registration( + self.domain['id'], type) + self.assertFalse( + self.domain_config_api.obtain_registration( + self.domain['id'], type)) + + # Make sure we can read who does have it + self.assertEqual( + self.domain['id'], + self.domain_config_api.read_registration(type)) + + # Make sure releasing it is silent if the domain specified doesn't + # have the registration + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.resource_api.create_domain(domain2['id'], domain2) + self.domain_config_api.release_registration( + domain2['id'], type=type) + + # If nobody has the type registered, then trying to read it should + # raise ConfigRegistrationNotFound + self.domain_config_api.release_registration( + self.domain['id'], type=type) + self.assertRaises(exception.ConfigRegistrationNotFound, + self.domain_config_api.read_registration, + type) + + # Finally check multiple registrations are cleared if you free the + # registration without specifying the type + type2 = uuid.uuid4().hex + self.domain_config_api.obtain_registration( + self.domain['id'], type) + self.domain_config_api.obtain_registration( + self.domain['id'], type2) + self.domain_config_api.release_registration(self.domain['id']) + self.assertRaises(exception.ConfigRegistrationNotFound, + self.domain_config_api.read_registration, + type) + self.assertRaises(exception.ConfigRegistrationNotFound, + self.domain_config_api.read_registration, + type2) -- cgit 1.2.3-korg