From 920a49cfa055733d575282973e23558c33087a4a Mon Sep 17 00:00:00 2001 From: RHE Date: Fri, 24 Nov 2017 13:54:26 +0100 Subject: remove keystone-moon Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE --- .../keystone/tests/unit/resource/test_core.py | 692 --------------------- 1 file changed, 692 deletions(-) delete mode 100644 keystone-moon/keystone/tests/unit/resource/test_core.py (limited to 'keystone-moon/keystone/tests/unit/resource/test_core.py') diff --git a/keystone-moon/keystone/tests/unit/resource/test_core.py b/keystone-moon/keystone/tests/unit/resource/test_core.py deleted file mode 100644 index 2eb87e4c..00000000 --- a/keystone-moon/keystone/tests/unit/resource/test_core.py +++ /dev/null @@ -1,692 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -import mock -from testtools import matchers - -from oslo_config import cfg -from oslotest import mockpatch - -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit.ksfixtures import database - - -CONF = cfg.CONF - - -class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase): - - def setUp(self): - super(TestResourceManagerNoFixtures, self).setUp() - self.useFixture(database.Database(self.sql_driver_version_overrides)) - self.load_backends() - - def test_ensure_default_domain_exists(self): - # When there's no default domain, ensure_default_domain_exists creates - # it. - - # First make sure there's no default domain. - self.assertRaises( - exception.DomainNotFound, - self.resource_api.get_domain, CONF.identity.default_domain_id) - - self.resource_api.ensure_default_domain_exists() - default_domain = self.resource_api.get_domain( - CONF.identity.default_domain_id) - - expected_domain = { - 'id': CONF.identity.default_domain_id, - 'name': 'Default', - 'enabled': True, - 'description': 'Domain created automatically to support V2.0 ' - 'operations.', - } - self.assertEqual(expected_domain, default_domain) - - def test_ensure_default_domain_exists_already_exists(self): - # When there's already a default domain, ensure_default_domain_exists - # doesn't do anything. - - name = uuid.uuid4().hex - description = uuid.uuid4().hex - domain_attrs = { - 'id': CONF.identity.default_domain_id, - 'name': name, - 'description': description, - } - self.resource_api.create_domain(CONF.identity.default_domain_id, - domain_attrs) - - self.resource_api.ensure_default_domain_exists() - - default_domain = self.resource_api.get_domain( - CONF.identity.default_domain_id) - - expected_domain = { - 'id': CONF.identity.default_domain_id, - 'name': name, - 'enabled': True, - 'description': description, - } - - self.assertEqual(expected_domain, default_domain) - - def test_ensure_default_domain_exists_fails(self): - # When there's an unexpected exception creating domain it's passed on. - - self.useFixture(mockpatch.PatchObject( - self.resource_api, 'create_domain', - side_effect=exception.UnexpectedError)) - - self.assertRaises(exception.UnexpectedError, - self.resource_api.ensure_default_domain_exists) - - def test_update_project_name_conflict(self): - name = uuid.uuid4().hex - description = uuid.uuid4().hex - domain_attrs = { - 'id': CONF.identity.default_domain_id, - 'name': name, - 'description': description, - } - domain = self.resource_api.create_domain( - CONF.identity.default_domain_id, domain_attrs) - project1 = unit.new_project_ref(domain_id=domain['id'], - name=uuid.uuid4().hex) - self.resource_api.create_project(project1['id'], project1) - project2 = unit.new_project_ref(domain_id=domain['id'], - name=uuid.uuid4().hex) - project = self.resource_api.create_project(project2['id'], project2) - - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project['id'], {'name': project1['name']}) - - -class DomainConfigDriverTests(object): - - def _domain_config_crud(self, sensitive): - domain = uuid.uuid4().hex - group = uuid.uuid4().hex - option = uuid.uuid4().hex - value = uuid.uuid4().hex - self.driver.create_config_option( - domain, group, option, value, sensitive) - res = self.driver.get_config_option( - domain, group, option, sensitive) - config = {'group': group, 'option': option, 'value': value} - self.assertEqual(config, res) - - value = uuid.uuid4().hex - self.driver.update_config_option( - domain, group, option, value, sensitive) - res = self.driver.get_config_option( - domain, group, option, sensitive) - config = {'group': group, 'option': option, 'value': value} - self.assertEqual(config, res) - - self.driver.delete_config_options( - domain, group, option, sensitive) - self.assertRaises(exception.DomainConfigNotFound, - self.driver.get_config_option, - domain, group, option, sensitive) - # ...and silent if we try to delete it again - self.driver.delete_config_options( - domain, group, option, sensitive) - - def test_whitelisted_domain_config_crud(self): - self._domain_config_crud(sensitive=False) - - def test_sensitive_domain_config_crud(self): - self._domain_config_crud(sensitive=True) - - def _list_domain_config(self, sensitive): - """Test listing by combination of domain, group & option.""" - config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - # Put config2 in the same group as config1 - config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': 100} - domain = uuid.uuid4().hex - - for config in [config1, config2, config3]: - self.driver.create_config_option( - domain, config['group'], config['option'], - config['value'], sensitive) - - # Try listing all items from a domain - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(3)) - for res_entry in res: - self.assertIn(res_entry, [config1, config2, config3]) - - # Try listing by domain and group - res = self.driver.list_config_options( - domain, group=config1['group'], sensitive=sensitive) - self.assertThat(res, matchers.HasLength(2)) - for res_entry in res: - self.assertIn(res_entry, [config1, config2]) - - # Try listing by domain, group and option - res = self.driver.list_config_options( - domain, group=config2['group'], - option=config2['option'], sensitive=sensitive) - self.assertThat(res, matchers.HasLength(1)) - self.assertEqual(config2, res[0]) - - def test_list_whitelisted_domain_config_crud(self): - self._list_domain_config(False) - - def test_list_sensitive_domain_config_crud(self): - self._list_domain_config(True) - - def _delete_domain_configs(self, sensitive): - """Test deleting by combination of domain, group & option.""" - config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - # Put config2 and config3 in the same group as config1 - config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - config3 = {'group': config1['group'], 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - domain = uuid.uuid4().hex - - for config in [config1, config2, config3, config4]: - self.driver.create_config_option( - domain, config['group'], config['option'], - config['value'], sensitive) - - # Try deleting by domain, group and option - res = self.driver.delete_config_options( - domain, group=config2['group'], - option=config2['option'], sensitive=sensitive) - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(3)) - for res_entry in res: - self.assertIn(res_entry, [config1, config3, config4]) - - # Try deleting by domain and group - res = self.driver.delete_config_options( - domain, group=config4['group'], sensitive=sensitive) - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(2)) - for res_entry in res: - self.assertIn(res_entry, [config1, config3]) - - # Try deleting all items from a domain - res = self.driver.delete_config_options( - domain, sensitive=sensitive) - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(0)) - - def test_delete_whitelisted_domain_configs(self): - self._delete_domain_configs(False) - - def test_delete_sensitive_domain_configs(self): - self._delete_domain_configs(True) - - def _create_domain_config_twice(self, sensitive): - """Test conflict error thrown if create the same option twice.""" - config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - domain = uuid.uuid4().hex - - self.driver.create_config_option( - domain, config['group'], config['option'], - config['value'], sensitive=sensitive) - self.assertRaises(exception.Conflict, - self.driver.create_config_option, - domain, config['group'], config['option'], - config['value'], sensitive=sensitive) - - def test_create_whitelisted_domain_config_twice(self): - self._create_domain_config_twice(False) - - def test_create_sensitive_domain_config_twice(self): - self._create_domain_config_twice(True) - - -class DomainConfigTests(object): - - def setUp(self): - self.domain = unit.new_domain_ref() - self.resource_api.create_domain(self.domain['id'], self.domain) - self.addCleanup(self.clean_up_domain) - - def clean_up_domain(self): - # NOTE(henry-nash): Deleting the domain will also delete any domain - # configs for this domain. - self.domain['enabled'] = False - self.resource_api.update_domain(self.domain['id'], self.domain) - self.resource_api.delete_domain(self.domain['id']) - del self.domain - - def test_create_domain_config_including_sensitive_option(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - # password is sensitive, so check that the whitelisted portion and - # the sensitive piece have been stored in the appropriate locations. - res = self.domain_config_api.get_config(self.domain['id']) - config_whitelisted = copy.deepcopy(config) - config_whitelisted['ldap'].pop('password') - self.assertEqual(config_whitelisted, res) - res = self.domain_config_api.driver.get_config_option( - self.domain['id'], 'ldap', 'password', sensitive=True) - self.assertEqual(config['ldap']['password'], res['value']) - - # Finally, use the non-public API to get back the whole config - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(config, res) - - def test_get_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - res = self.domain_config_api.get_config(self.domain['id'], - group='identity') - config_partial = copy.deepcopy(config) - config_partial.pop('ldap') - self.assertEqual(config_partial, res) - res = self.domain_config_api.get_config( - self.domain['id'], group='ldap', option='user_tree_dn') - self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res) - # ...but we should fail to get a sensitive option - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id'], - group='ldap', option='password') - - def test_delete_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - self.domain_config_api.delete_config( - self.domain['id'], group='identity') - config_partial = copy.deepcopy(config) - config_partial.pop('identity') - config_partial['ldap'].pop('password') - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(config_partial, res) - - self.domain_config_api.delete_config( - self.domain['id'], group='ldap', option='url') - config_partial = copy.deepcopy(config_partial) - config_partial['ldap'].pop('url') - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(config_partial, res) - - def test_get_options_not_in_domain_config(self): - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id']) - config = {'ldap': {'url': uuid.uuid4().hex}} - - self.domain_config_api.create_config(self.domain['id'], config) - - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id'], - group='identity') - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id'], - group='ldap', option='user_tree_dn') - - def test_get_sensitive_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual({}, res) - self.domain_config_api.create_config(self.domain['id'], config) - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(config, res) - - def test_update_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - # Try updating a group - new_config = {'ldap': {'url': uuid.uuid4().hex, - 'user_filter': uuid.uuid4().hex}} - res = self.domain_config_api.update_config( - self.domain['id'], new_config, group='ldap') - expected_config = copy.deepcopy(config) - expected_config['ldap']['url'] = new_config['ldap']['url'] - expected_config['ldap']['user_filter'] = ( - new_config['ldap']['user_filter']) - expected_full_config = copy.deepcopy(expected_config) - expected_config['ldap'].pop('password') - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(expected_config, res) - # The sensitive option should still exist - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(expected_full_config, res) - - # Try updating a single whitelisted option - self.domain_config_api.delete_config(self.domain['id']) - self.domain_config_api.create_config(self.domain['id'], config) - new_config = {'url': uuid.uuid4().hex} - res = self.domain_config_api.update_config( - self.domain['id'], new_config, group='ldap', option='url') - - # Make sure whitelisted and full config is updated - expected_whitelisted_config = copy.deepcopy(config) - expected_whitelisted_config['ldap']['url'] = new_config['url'] - expected_full_config = copy.deepcopy(expected_whitelisted_config) - expected_whitelisted_config['ldap'].pop('password') - self.assertEqual(expected_whitelisted_config, res) - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(expected_whitelisted_config, res) - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(expected_full_config, res) - - # Try updating a single sensitive option - self.domain_config_api.delete_config(self.domain['id']) - self.domain_config_api.create_config(self.domain['id'], config) - new_config = {'password': uuid.uuid4().hex} - res = self.domain_config_api.update_config( - self.domain['id'], new_config, group='ldap', option='password') - # The whitelisted config should not have changed... - expected_whitelisted_config = copy.deepcopy(config) - expected_full_config = copy.deepcopy(config) - expected_whitelisted_config['ldap'].pop('password') - self.assertEqual(expected_whitelisted_config, res) - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(expected_whitelisted_config, res) - expected_full_config['ldap']['password'] = new_config['password'] - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - # ...but the sensitive piece should have. - self.assertEqual(expected_full_config, res) - - def test_update_invalid_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - # An extra group, when specifying one group should fail - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group='ldap') - # An extra option, when specifying one option should fail - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config['ldap'], - group='ldap', option='url') - - # Now try the right number of groups/options, but just not - # ones that are in the config provided - config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group='identity') - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config['ldap'], group='ldap', - option='url') - - # Now some valid groups/options, but just not ones that are in the - # existing config - config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}} - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.update_config, - self.domain['id'], config_wrong_group, - group='identity') - config_wrong_option = {'url': uuid.uuid4().hex} - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.update_config, - self.domain['id'], config_wrong_option, - group='ldap', option='url') - - # And finally just some bad groups/options - bad_group = uuid.uuid4().hex - config = {bad_group: {'user': uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group=bad_group, - option='user') - bad_option = uuid.uuid4().hex - config = {'ldap': {bad_option: uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group='ldap', - option=bad_option) - - def test_create_invalid_domain_config(self): - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], {}) - config = {uuid.uuid4().hex: uuid.uuid4().hex} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - # Try an option that IS in the standard conf, but neither whitelisted - # or marked as sensitive - config = {'identity': {'user_tree_dn': uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - - def test_delete_invalid_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - # Try deleting a group not in the config - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.delete_config, - self.domain['id'], group='identity') - # Try deleting an option not in the config - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.delete_config, - self.domain['id'], - group='ldap', option='user_tree_dn') - - def test_sensitive_substitution_in_domain_config(self): - # Create a config that contains a whitelisted option that requires - # substitution of a sensitive option. - config = {'ldap': {'url': 'my_url/%(password)s', - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - # Read back the config with the internal method and ensure that the - # substitution has taken place. - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - expected_url = ( - config['ldap']['url'] % {'password': config['ldap']['password']}) - self.assertEqual(expected_url, res['ldap']['url']) - - def test_invalid_sensitive_substitution_in_domain_config(self): - """Check that invalid substitutions raise warnings.""" - mock_log = mock.Mock() - - invalid_option_config = { - 'ldap': {'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - - for invalid_option in ['my_url/%(passssword)s', - 'my_url/%(password', - 'my_url/%(password)', - 'my_url/%(password)d']: - invalid_option_config['ldap']['url'] = invalid_option - self.domain_config_api.create_config( - self.domain['id'], invalid_option_config) - - with mock.patch('keystone.resource.core.LOG', mock_log): - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - mock_log.warning.assert_any_call(mock.ANY) - self.assertEqual( - invalid_option_config['ldap']['url'], res['ldap']['url']) - - def test_escaped_sequence_in_domain_config(self): - """Check that escaped '%(' doesn't get interpreted.""" - mock_log = mock.Mock() - - escaped_option_config = { - 'ldap': {'url': 'my_url/%%(password)s', - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - - self.domain_config_api.create_config( - self.domain['id'], escaped_option_config) - - with mock.patch('keystone.resource.core.LOG', mock_log): - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertFalse(mock_log.warn.called) - # The escaping '%' should have been removed - self.assertEqual('my_url/%(password)s', res['ldap']['url']) - - @unit.skip_if_cache_disabled('domain_config') - def test_cache_layer_get_sensitive_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - # cache the result - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(config, res) - - # delete, bypassing domain config manager api - self.domain_config_api.delete_config_options(self.domain['id']) - self.domain_config_api.delete_config_options(self.domain['id'], - sensitive=True) - - self.assertDictEqual( - res, self.domain_config_api.get_config_with_sensitive_info( - self.domain['id'])) - self.domain_config_api.get_config_with_sensitive_info.invalidate( - self.domain_config_api, self.domain['id']) - self.assertDictEqual( - {}, - self.domain_config_api.get_config_with_sensitive_info( - self.domain['id'])) - - def test_delete_domain_deletes_configs(self): - """Test domain deletion clears the domain configs.""" - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}} - self.domain_config_api.create_config(domain['id'], config) - - # Now delete the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - self.resource_api.delete_domain(domain['id']) - - # Check domain configs have also been deleted - self.assertRaises( - exception.DomainConfigNotFound, - self.domain_config_api.get_config, - domain['id']) - - # The get_config_with_sensitive_info does not throw an exception if - # the config is empty, it just returns an empty dict - self.assertDictEqual( - {}, - self.domain_config_api.get_config_with_sensitive_info( - domain['id'])) - - def test_config_registration(self): - type = uuid.uuid4().hex - self.domain_config_api.obtain_registration( - self.domain['id'], type) - self.domain_config_api.release_registration( - self.domain['id'], type=type) - - # Make sure that once someone has it, nobody else can get it. - # This includes the domain who already has it. - self.domain_config_api.obtain_registration( - self.domain['id'], type) - self.assertFalse( - self.domain_config_api.obtain_registration( - self.domain['id'], type)) - - # Make sure we can read who does have it - self.assertEqual( - self.domain['id'], - self.domain_config_api.read_registration(type)) - - # Make sure releasing it is silent if the domain specified doesn't - # have the registration - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - self.domain_config_api.release_registration( - domain2['id'], type=type) - - # If nobody has the type registered, then trying to read it should - # raise ConfigRegistrationNotFound - self.domain_config_api.release_registration( - self.domain['id'], type=type) - self.assertRaises(exception.ConfigRegistrationNotFound, - self.domain_config_api.read_registration, - type) - - # Finally check multiple registrations are cleared if you free the - # registration without specifying the type - type2 = uuid.uuid4().hex - self.domain_config_api.obtain_registration( - self.domain['id'], type) - self.domain_config_api.obtain_registration( - self.domain['id'], type2) - self.domain_config_api.release_registration(self.domain['id']) - self.assertRaises(exception.ConfigRegistrationNotFound, - self.domain_config_api.read_registration, - type) - self.assertRaises(exception.ConfigRegistrationNotFound, - self.domain_config_api.read_registration, - type2) -- cgit 1.2.3-korg