From 920a49cfa055733d575282973e23558c33087a4a Mon Sep 17 00:00:00 2001 From: RHE Date: Fri, 24 Nov 2017 13:54:26 +0100 Subject: remove keystone-moon Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE --- .../keystone/tests/unit/catalog/__init__.py | 0 .../keystone/tests/unit/catalog/test_backends.py | 588 --------------------- .../keystone/tests/unit/catalog/test_core.py | 100 ---- 3 files changed, 688 deletions(-) delete mode 100644 keystone-moon/keystone/tests/unit/catalog/__init__.py delete mode 100644 keystone-moon/keystone/tests/unit/catalog/test_backends.py delete mode 100644 keystone-moon/keystone/tests/unit/catalog/test_core.py (limited to 'keystone-moon/keystone/tests/unit/catalog') diff --git a/keystone-moon/keystone/tests/unit/catalog/__init__.py b/keystone-moon/keystone/tests/unit/catalog/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/keystone-moon/keystone/tests/unit/catalog/test_backends.py b/keystone-moon/keystone/tests/unit/catalog/test_backends.py deleted file mode 100644 index 55898015..00000000 --- a/keystone-moon/keystone/tests/unit/catalog/test_backends.py +++ /dev/null @@ -1,588 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -import mock -from six.moves import range -from testtools import matchers - -from keystone.catalog import core -from keystone.common import driver_hints -from keystone import exception -from keystone.tests import unit - - -class CatalogTests(object): - - _legacy_endpoint_id_in_endpoint = True - _enabled_default_to_true_when_creating_endpoint = False - - def test_region_crud(self): - # create - region_id = '0' * 255 - new_region = unit.new_region_ref(id=region_id) - res = self.catalog_api.create_region(new_region) - - # Ensure that we don't need to have a - # parent_region_id in the original supplied - # ref dict, but that it will be returned from - # the endpoint, with None value. - expected_region = new_region.copy() - expected_region['parent_region_id'] = None - self.assertDictEqual(expected_region, res) - - # Test adding another region with the one above - # as its parent. We will check below whether deleting - # the parent successfully deletes any child regions. - parent_region_id = region_id - new_region = unit.new_region_ref(parent_region_id=parent_region_id) - region_id = new_region['id'] - res = self.catalog_api.create_region(new_region) - self.assertDictEqual(new_region, res) - - # list - regions = self.catalog_api.list_regions() - self.assertThat(regions, matchers.HasLength(2)) - region_ids = [x['id'] for x in regions] - self.assertIn(parent_region_id, region_ids) - self.assertIn(region_id, region_ids) - - # update - region_desc_update = {'description': uuid.uuid4().hex} - res = self.catalog_api.update_region(region_id, region_desc_update) - expected_region = new_region.copy() - expected_region['description'] = region_desc_update['description'] - self.assertDictEqual(expected_region, res) - - # delete - self.catalog_api.delete_region(parent_region_id) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.delete_region, - parent_region_id) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - parent_region_id) - # Ensure the child is also gone... - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_id) - - def _create_region_with_parent_id(self, parent_id=None): - new_region = unit.new_region_ref(parent_region_id=parent_id) - self.catalog_api.create_region(new_region) - return new_region - - def test_list_regions_filtered_by_parent_region_id(self): - new_region = self._create_region_with_parent_id() - parent_id = new_region['id'] - new_region = self._create_region_with_parent_id(parent_id) - new_region = self._create_region_with_parent_id(parent_id) - - # filter by parent_region_id - hints = driver_hints.Hints() - hints.add_filter('parent_region_id', parent_id) - regions = self.catalog_api.list_regions(hints) - for region in regions: - self.assertEqual(parent_id, region['parent_region_id']) - - @unit.skip_if_cache_disabled('catalog') - def test_cache_layer_region_crud(self): - new_region = unit.new_region_ref() - region_id = new_region['id'] - self.catalog_api.create_region(new_region.copy()) - updated_region = copy.deepcopy(new_region) - updated_region['description'] = uuid.uuid4().hex - # cache the result - self.catalog_api.get_region(region_id) - # update the region bypassing catalog_api - self.catalog_api.driver.update_region(region_id, updated_region) - self.assertDictContainsSubset(new_region, - self.catalog_api.get_region(region_id)) - self.catalog_api.get_region.invalidate(self.catalog_api, region_id) - self.assertDictContainsSubset(updated_region, - self.catalog_api.get_region(region_id)) - # delete the region - self.catalog_api.driver.delete_region(region_id) - # still get the old region - self.assertDictContainsSubset(updated_region, - self.catalog_api.get_region(region_id)) - self.catalog_api.get_region.invalidate(self.catalog_api, region_id) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, region_id) - - @unit.skip_if_cache_disabled('catalog') - def test_invalidate_cache_when_updating_region(self): - new_region = unit.new_region_ref() - region_id = new_region['id'] - self.catalog_api.create_region(new_region) - - # cache the region - self.catalog_api.get_region(region_id) - - # update the region via catalog_api - new_description = {'description': uuid.uuid4().hex} - self.catalog_api.update_region(region_id, new_description) - - # assert that we can get the new region - current_region = self.catalog_api.get_region(region_id) - self.assertEqual(new_description['description'], - current_region['description']) - - def test_create_region_with_duplicate_id(self): - new_region = unit.new_region_ref() - self.catalog_api.create_region(new_region) - # Create region again with duplicate id - self.assertRaises(exception.Conflict, - self.catalog_api.create_region, - new_region) - - def test_get_region_returns_not_found(self): - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - uuid.uuid4().hex) - - def test_delete_region_returns_not_found(self): - self.assertRaises(exception.RegionNotFound, - self.catalog_api.delete_region, - uuid.uuid4().hex) - - def test_create_region_invalid_parent_region_returns_not_found(self): - new_region = unit.new_region_ref(parent_region_id='nonexisting') - self.assertRaises(exception.RegionNotFound, - self.catalog_api.create_region, - new_region) - - def test_avoid_creating_circular_references_in_regions_update(self): - region_one = self._create_region_with_parent_id() - - # self circle: region_one->region_one - self.assertRaises(exception.CircularRegionHierarchyError, - self.catalog_api.update_region, - region_one['id'], - {'parent_region_id': region_one['id']}) - - # region_one->region_two->region_one - region_two = self._create_region_with_parent_id(region_one['id']) - self.assertRaises(exception.CircularRegionHierarchyError, - self.catalog_api.update_region, - region_one['id'], - {'parent_region_id': region_two['id']}) - - # region_one region_two->region_three->region_four->region_two - region_three = self._create_region_with_parent_id(region_two['id']) - region_four = self._create_region_with_parent_id(region_three['id']) - self.assertRaises(exception.CircularRegionHierarchyError, - self.catalog_api.update_region, - region_two['id'], - {'parent_region_id': region_four['id']}) - - @mock.patch.object(core.CatalogDriverV8, - "_ensure_no_circle_in_hierarchical_regions") - def test_circular_regions_can_be_deleted(self, mock_ensure_on_circle): - # turn off the enforcement so that cycles can be created for the test - mock_ensure_on_circle.return_value = None - - region_one = self._create_region_with_parent_id() - - # self circle: region_one->region_one - self.catalog_api.update_region( - region_one['id'], - {'parent_region_id': region_one['id']}) - self.catalog_api.delete_region(region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_one['id']) - - # region_one->region_two->region_one - region_one = self._create_region_with_parent_id() - region_two = self._create_region_with_parent_id(region_one['id']) - self.catalog_api.update_region( - region_one['id'], - {'parent_region_id': region_two['id']}) - self.catalog_api.delete_region(region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_two['id']) - - # region_one->region_two->region_three->region_one - region_one = self._create_region_with_parent_id() - region_two = self._create_region_with_parent_id(region_one['id']) - region_three = self._create_region_with_parent_id(region_two['id']) - self.catalog_api.update_region( - region_one['id'], - {'parent_region_id': region_three['id']}) - self.catalog_api.delete_region(region_two['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_two['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_one['id']) - self.assertRaises(exception.RegionNotFound, - self.catalog_api.get_region, - region_three['id']) - - def test_service_crud(self): - # create - new_service = unit.new_service_ref() - service_id = new_service['id'] - res = self.catalog_api.create_service(service_id, new_service) - self.assertDictEqual(new_service, res) - - # list - services = self.catalog_api.list_services() - self.assertIn(service_id, [x['id'] for x in services]) - - # update - service_name_update = {'name': uuid.uuid4().hex} - res = self.catalog_api.update_service(service_id, service_name_update) - expected_service = new_service.copy() - expected_service['name'] = service_name_update['name'] - self.assertDictEqual(expected_service, res) - - # delete - self.catalog_api.delete_service(service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.delete_service, - service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.get_service, - service_id) - - def _create_random_service(self): - new_service = unit.new_service_ref() - service_id = new_service['id'] - return self.catalog_api.create_service(service_id, new_service) - - def test_service_filtering(self): - target_service = self._create_random_service() - unrelated_service1 = self._create_random_service() - unrelated_service2 = self._create_random_service() - - # filter by type - hint_for_type = driver_hints.Hints() - hint_for_type.add_filter(name="type", value=target_service['type']) - services = self.catalog_api.list_services(hint_for_type) - - self.assertEqual(1, len(services)) - filtered_service = services[0] - self.assertEqual(target_service['type'], filtered_service['type']) - self.assertEqual(target_service['id'], filtered_service['id']) - - # filter should have been removed, since it was already used by the - # backend - self.assertEqual(0, len(hint_for_type.filters)) - - # the backend shouldn't filter by name, since this is handled by the - # front end - hint_for_name = driver_hints.Hints() - hint_for_name.add_filter(name="name", value=target_service['name']) - services = self.catalog_api.list_services(hint_for_name) - - self.assertEqual(3, len(services)) - - # filter should still be there, since it wasn't used by the backend - self.assertEqual(1, len(hint_for_name.filters)) - - self.catalog_api.delete_service(target_service['id']) - self.catalog_api.delete_service(unrelated_service1['id']) - self.catalog_api.delete_service(unrelated_service2['id']) - - @unit.skip_if_cache_disabled('catalog') - def test_cache_layer_service_crud(self): - new_service = unit.new_service_ref() - service_id = new_service['id'] - res = self.catalog_api.create_service(service_id, new_service) - self.assertDictEqual(new_service, res) - self.catalog_api.get_service(service_id) - updated_service = copy.deepcopy(new_service) - updated_service['description'] = uuid.uuid4().hex - # update bypassing catalog api - self.catalog_api.driver.update_service(service_id, updated_service) - self.assertDictContainsSubset(new_service, - self.catalog_api.get_service(service_id)) - self.catalog_api.get_service.invalidate(self.catalog_api, service_id) - self.assertDictContainsSubset(updated_service, - self.catalog_api.get_service(service_id)) - - # delete bypassing catalog api - self.catalog_api.driver.delete_service(service_id) - self.assertDictContainsSubset(updated_service, - self.catalog_api.get_service(service_id)) - self.catalog_api.get_service.invalidate(self.catalog_api, service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.delete_service, - service_id) - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.get_service, - service_id) - - @unit.skip_if_cache_disabled('catalog') - def test_invalidate_cache_when_updating_service(self): - new_service = unit.new_service_ref() - service_id = new_service['id'] - self.catalog_api.create_service(service_id, new_service) - - # cache the service - self.catalog_api.get_service(service_id) - - # update the service via catalog api - new_type = {'type': uuid.uuid4().hex} - self.catalog_api.update_service(service_id, new_type) - - # assert that we can get the new service - current_service = self.catalog_api.get_service(service_id) - self.assertEqual(new_type['type'], current_service['type']) - - def test_delete_service_with_endpoint(self): - # create a service - service = unit.new_service_ref() - self.catalog_api.create_service(service['id'], service) - - # create an endpoint attached to the service - endpoint = unit.new_endpoint_ref(service_id=service['id'], - region_id=None) - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - - # deleting the service should also delete the endpoint - self.catalog_api.delete_service(service['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - endpoint['id']) - - def test_cache_layer_delete_service_with_endpoint(self): - service = unit.new_service_ref() - self.catalog_api.create_service(service['id'], service) - - # create an endpoint attached to the service - endpoint = unit.new_endpoint_ref(service_id=service['id'], - region_id=None) - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - # cache the result - self.catalog_api.get_service(service['id']) - self.catalog_api.get_endpoint(endpoint['id']) - # delete the service bypassing catalog api - self.catalog_api.driver.delete_service(service['id']) - self.assertDictContainsSubset(endpoint, - self.catalog_api. - get_endpoint(endpoint['id'])) - self.assertDictContainsSubset(service, - self.catalog_api. - get_service(service['id'])) - self.catalog_api.get_endpoint.invalidate(self.catalog_api, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - endpoint['id']) - # multiple endpoints associated with a service - second_endpoint = unit.new_endpoint_ref(service_id=service['id'], - region_id=None) - self.catalog_api.create_service(service['id'], service) - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - self.catalog_api.create_endpoint(second_endpoint['id'], - second_endpoint) - self.catalog_api.delete_service(service['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - second_endpoint['id']) - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - second_endpoint['id']) - - def test_get_service_returns_not_found(self): - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.get_service, - uuid.uuid4().hex) - - def test_delete_service_returns_not_found(self): - self.assertRaises(exception.ServiceNotFound, - self.catalog_api.delete_service, - uuid.uuid4().hex) - - def test_create_endpoint_nonexistent_service(self): - endpoint = unit.new_endpoint_ref(service_id=uuid.uuid4().hex, - region_id=None) - self.assertRaises(exception.ValidationError, - self.catalog_api.create_endpoint, - endpoint['id'], - endpoint) - - def test_update_endpoint_nonexistent_service(self): - dummy_service, enabled_endpoint, dummy_disabled_endpoint = ( - self._create_endpoints()) - new_endpoint = unit.new_endpoint_ref(service_id=uuid.uuid4().hex) - self.assertRaises(exception.ValidationError, - self.catalog_api.update_endpoint, - enabled_endpoint['id'], - new_endpoint) - - def test_create_endpoint_nonexistent_region(self): - service = unit.new_service_ref() - self.catalog_api.create_service(service['id'], service) - - endpoint = unit.new_endpoint_ref(service_id=service['id']) - self.assertRaises(exception.ValidationError, - self.catalog_api.create_endpoint, - endpoint['id'], - endpoint) - - def test_update_endpoint_nonexistent_region(self): - dummy_service, enabled_endpoint, dummy_disabled_endpoint = ( - self._create_endpoints()) - new_endpoint = unit.new_endpoint_ref(service_id=uuid.uuid4().hex) - self.assertRaises(exception.ValidationError, - self.catalog_api.update_endpoint, - enabled_endpoint['id'], - new_endpoint) - - def test_get_endpoint_returns_not_found(self): - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.get_endpoint, - uuid.uuid4().hex) - - def test_delete_endpoint_returns_not_found(self): - self.assertRaises(exception.EndpointNotFound, - self.catalog_api.delete_endpoint, - uuid.uuid4().hex) - - def test_create_endpoint(self): - service = unit.new_service_ref() - self.catalog_api.create_service(service['id'], service) - - endpoint = unit.new_endpoint_ref(service_id=service['id'], - region_id=None) - self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy()) - - def test_update_endpoint(self): - dummy_service_ref, endpoint_ref, dummy_disabled_endpoint_ref = ( - self._create_endpoints()) - res = self.catalog_api.update_endpoint(endpoint_ref['id'], - {'interface': 'private'}) - expected_endpoint = endpoint_ref.copy() - expected_endpoint['enabled'] = True - expected_endpoint['interface'] = 'private' - if self._legacy_endpoint_id_in_endpoint: - expected_endpoint['legacy_endpoint_id'] = None - if self._enabled_default_to_true_when_creating_endpoint: - expected_endpoint['enabled'] = True - self.assertDictEqual(expected_endpoint, res) - - def _create_endpoints(self): - # Creates a service and 2 endpoints for the service in the same region. - # The 'public' interface is enabled and the 'internal' interface is - # disabled. - - def create_endpoint(service_id, region, **kwargs): - ref = unit.new_endpoint_ref( - service_id=service_id, - region_id=region, - url='http://localhost/%s' % uuid.uuid4().hex, - **kwargs) - - self.catalog_api.create_endpoint(ref['id'], ref) - return ref - - # Create a service for use with the endpoints. - service_ref = unit.new_service_ref() - service_id = service_ref['id'] - self.catalog_api.create_service(service_id, service_ref) - - region = unit.new_region_ref() - self.catalog_api.create_region(region) - - # Create endpoints - enabled_endpoint_ref = create_endpoint(service_id, region['id']) - disabled_endpoint_ref = create_endpoint( - service_id, region['id'], enabled=False, interface='internal') - - return service_ref, enabled_endpoint_ref, disabled_endpoint_ref - - def test_list_endpoints(self): - service = unit.new_service_ref() - self.catalog_api.create_service(service['id'], service) - - expected_ids = set([uuid.uuid4().hex for _ in range(3)]) - for endpoint_id in expected_ids: - endpoint = unit.new_endpoint_ref(service_id=service['id'], - id=endpoint_id, - region_id=None) - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - - endpoints = self.catalog_api.list_endpoints() - self.assertEqual(expected_ids, set(e['id'] for e in endpoints)) - - def test_get_catalog_endpoint_disabled(self): - """Get back only enabled endpoints when get the v2 catalog.""" - service_ref, enabled_endpoint_ref, dummy_disabled_endpoint_ref = ( - self._create_endpoints()) - - user_id = uuid.uuid4().hex - project_id = uuid.uuid4().hex - catalog = self.catalog_api.get_catalog(user_id, project_id) - - exp_entry = { - 'id': enabled_endpoint_ref['id'], - 'name': service_ref['name'], - 'publicURL': enabled_endpoint_ref['url'], - } - - region = enabled_endpoint_ref['region_id'] - self.assertEqual(exp_entry, catalog[region][service_ref['type']]) - - def test_get_v3_catalog_endpoint_disabled(self): - """Get back only enabled endpoints when get the v3 catalog.""" - enabled_endpoint_ref = self._create_endpoints()[1] - - user_id = uuid.uuid4().hex - project_id = uuid.uuid4().hex - catalog = self.catalog_api.get_v3_catalog(user_id, project_id) - - endpoint_ids = [x['id'] for x in catalog[0]['endpoints']] - self.assertEqual([enabled_endpoint_ref['id']], endpoint_ids) - - @unit.skip_if_cache_disabled('catalog') - def test_invalidate_cache_when_updating_endpoint(self): - service = unit.new_service_ref() - self.catalog_api.create_service(service['id'], service) - - # create an endpoint attached to the service - endpoint = unit.new_endpoint_ref(service_id=service['id'], - region_id=None) - self.catalog_api.create_endpoint(endpoint['id'], endpoint) - - # cache the endpoint - self.catalog_api.get_endpoint(endpoint['id']) - - # update the endpoint via catalog api - new_url = {'url': uuid.uuid4().hex} - self.catalog_api.update_endpoint(endpoint['id'], new_url) - - # assert that we can get the new endpoint - current_endpoint = self.catalog_api.get_endpoint(endpoint['id']) - self.assertEqual(new_url['url'], current_endpoint['url']) diff --git a/keystone-moon/keystone/tests/unit/catalog/test_core.py b/keystone-moon/keystone/tests/unit/catalog/test_core.py deleted file mode 100644 index b04b0bb7..00000000 --- a/keystone-moon/keystone/tests/unit/catalog/test_core.py +++ /dev/null @@ -1,100 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from keystone.catalog import core -from keystone import exception -from keystone.tests import unit - - -class FormatUrlTests(unit.BaseTestCase): - - def test_successful_formatting(self): - url_template = ('http://$(public_bind_host)s:$(admin_port)d/' - '$(tenant_id)s/$(user_id)s/$(project_id)s') - project_id = uuid.uuid4().hex - values = {'public_bind_host': 'server', 'admin_port': 9090, - 'tenant_id': 'A', 'user_id': 'B', 'project_id': project_id} - actual_url = core.format_url(url_template, values) - - expected_url = 'http://server:9090/A/B/%s' % (project_id,) - self.assertEqual(expected_url, actual_url) - - def test_raises_malformed_on_missing_key(self): - self.assertRaises(exception.MalformedEndpoint, - core.format_url, - "http://$(public_bind_host)s/$(public_port)d", - {"public_bind_host": "1"}) - - def test_raises_malformed_on_wrong_type(self): - self.assertRaises(exception.MalformedEndpoint, - core.format_url, - "http://$(public_bind_host)d", - {"public_bind_host": "something"}) - - def test_raises_malformed_on_incomplete_format(self): - self.assertRaises(exception.MalformedEndpoint, - core.format_url, - "http://$(public_bind_host)", - {"public_bind_host": "1"}) - - def test_formatting_a_non_string(self): - def _test(url_template): - self.assertRaises(exception.MalformedEndpoint, - core.format_url, - url_template, - {}) - - _test(None) - _test(object()) - - def test_substitution_with_key_not_allowed(self): - # If the url template contains a substitution that's not in the allowed - # list then MalformedEndpoint is raised. - # For example, admin_token isn't allowed. - url_template = ('http://$(public_bind_host)s:$(public_port)d/' - '$(tenant_id)s/$(user_id)s/$(admin_token)s') - values = {'public_bind_host': 'server', 'public_port': 9090, - 'tenant_id': 'A', 'user_id': 'B', 'admin_token': 'C'} - self.assertRaises(exception.MalformedEndpoint, - core.format_url, - url_template, - values) - - def test_substitution_with_allowed_tenant_keyerror(self): - # No value of 'tenant_id' is passed into url_template. - # mod: format_url will return None instead of raising - # "MalformedEndpoint" exception. - # This is intentional behavior since we don't want to skip - # all the later endpoints once there is an URL of endpoint - # trying to replace 'tenant_id' with None. - url_template = ('http://$(public_bind_host)s:$(admin_port)d/' - '$(tenant_id)s/$(user_id)s') - values = {'public_bind_host': 'server', 'admin_port': 9090, - 'user_id': 'B'} - self.assertIsNone(core.format_url(url_template, values, - silent_keyerror_failures=['tenant_id'])) - - def test_substitution_with_allowed_project_keyerror(self): - # No value of 'project_id' is passed into url_template. - # mod: format_url will return None instead of raising - # "MalformedEndpoint" exception. - # This is intentional behavior since we don't want to skip - # all the later endpoints once there is an URL of endpoint - # trying to replace 'project_id' with None. - url_template = ('http://$(public_bind_host)s:$(admin_port)d/' - '$(project_id)s/$(user_id)s') - values = {'public_bind_host': 'server', 'admin_port': 9090, - 'user_id': 'B'} - self.assertIsNone(core.format_url(url_template, values, - silent_keyerror_failures=['project_id'])) -- cgit 1.2.3-korg