diff options
author | Wojciech Dec <wdec@cisco.com> | 2016-08-16 19:27:01 +0200 |
---|---|---|
committer | Wojciech Dec <wdec@cisco.com> | 2016-08-16 19:29:27 +0200 |
commit | c3b2c2a9a22bac5cf17813c589444d3abebaa23b (patch) | |
tree | 68c2fc0cb8c32cbb8fabf69ac81e1e0ba50cff2a /networking-odl/networking_odl/tests/unit/common | |
parent | 3285c8e93ea59d98b392591ef6dfa5b1de3bb92d (diff) |
Adding Mitaka networking-old module with the ODL topology based port
binding resolution mechanism from https://review.openstack.org/333186
Change-Id: I10d400aac9bb639c146527f0f93e6925cb74d9de
Signed-off-by: Wojciech Dec <wdec@cisco.com>
Diffstat (limited to 'networking-odl/networking_odl/tests/unit/common')
5 files changed, 655 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/tests/unit/common/__init__.py b/networking-odl/networking_odl/tests/unit/common/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking-odl/networking_odl/tests/unit/common/__init__.py diff --git a/networking-odl/networking_odl/tests/unit/common/test_cache.py b/networking-odl/networking_odl/tests/unit/common/test_cache.py new file mode 100644 index 0000000..b702455 --- /dev/null +++ b/networking-odl/networking_odl/tests/unit/common/test_cache.py @@ -0,0 +1,242 @@ +# Copyright (c) 2015 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.tests import base + +from networking_odl.common import cache + + +class TestCache(base.DietTestCase): + + def test_init_with_callable(self): + + def given_fetch_method(): + pass + + cache.Cache(given_fetch_method) + + def test_init_without_callable(self): + self.assertRaises(TypeError, lambda: cache.Cache(object())) + + def test_fecth_once(self): + value = 'value' + + given_fetch_method = mock.Mock(return_value=iter([('key', value)])) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + result = given_cache.fetch('key', 60.0) + + # Result is returned + self.assertIs(value, result) + + # Then fetch method is called once + given_fetch_method.assert_called_once_with(('key',)) + + def test_fecth_with_no_result(self): + given_fetch_method = mock.Mock(return_value=iter([])) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + try: + given_cache.fetch('key', 60.0) + except cache.CacheFetchError as error: + given_fetch_method.assert_called_once_with(('key',)) + self.assertRaises(KeyError, error.reraise_cause) + else: + self.fail('Expecting CacheFetchError to be raised.') + + @mock.patch.object(cache, 'LOG') + def test_fecth_with_failure(self, logger): + # pylint: disable=unused-argument + + given_error = RuntimeError("It doesn't work like this!") + + def failing_function(keys): + raise given_error + + given_fetch_method = mock.Mock(side_effect=failing_function) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + try: + given_cache.fetch('key', 60.0) + except cache.CacheFetchError as error: + given_fetch_method.assert_called_once_with(('key',)) + self.assertRaises(RuntimeError, error.reraise_cause) + else: + self.fail('Expecting CacheFetchError to be raised.') + logger.warning.assert_called_once_with( + 'Error fetching values for keys: %r', "'key'", + exc_info=(type(given_error), given_error, mock.ANY)) + + def test_fecth_again_after_clear(self): + value1 = 'value1' + value2 = 'value2' + given_fetch_method = mock.Mock( + side_effect=[iter([('key', value1)]), + iter([('key', value2)])]) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + result1 = given_cache.fetch('key', 60.0) + + # When cache is cleared + given_cache.clear() + + # When value with same key is fetched again + result2 = given_cache.fetch('key', 0.0) + + # Then first result is returned + self.assertIs(value1, result1) + + # Then fetch method is called twice + self.assertEqual( + [mock.call(('key',)), mock.call(('key',))], + given_fetch_method.mock_calls) + + # Then second result is returned + self.assertIs(value2, result2) + + def test_fecth_again_before_timeout(self): + value1 = 'value1' + value2 = 'value2' + given_fetch_method = mock.Mock( + side_effect=[iter([('key', value1)]), + iter([('key', value2)])]) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + result1 = given_cache.fetch('key', 1.0) + + # When value with same key is fetched again and cached entry is not + # expired + result2 = given_cache.fetch('key', 0.0) + + # First result is returned + self.assertIs(value1, result1) + + # Then fetch method is called once + given_fetch_method.assert_called_once_with(('key',)) + + # Then first result is returned twice + self.assertIs(value1, result2) + + def test_fecth_again_after_timeout(self): + value1 = 'value1' + value2 = 'value2' + given_fetch_method = mock.Mock( + side_effect=[iter([('key', value1)]), + iter([('key', value2)])]) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + result1 = given_cache.fetch('key', 0.0) + + # When value with same key is fetched again and cached entry is + # expired + result2 = given_cache.fetch('key', 0.0) + + # Then first result is returned + self.assertIs(value1, result1) + + # Then fetch method is called twice + self.assertEqual( + [mock.call(('key',)), mock.call(('key',))], + given_fetch_method.mock_calls) + + # Then second result is returned + self.assertIs(value2, result2) + + def test_fecth_two_values_yielding_both_before_timeout(self): + value1 = 'value1' + value2 = 'value2' + given_fetch_method = mock.Mock( + return_value=iter([('key1', value1), + ('key2', value2)])) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + result1 = given_cache.fetch('key1', 60.0) + + # When value with another key is fetched and cached entry is not + # expired + result2 = given_cache.fetch('key2', 60.0) + + # Then first result is returned + self.assertIs(value1, result1) + + # Then fetch method is called once + given_fetch_method.assert_called_once_with(('key1',)) + + # Then second result is returned + self.assertIs(value2, result2) + + def test_fecth_two_values_yielding_both_after_timeout(self): + value1 = 'value1' + value2 = 'value2' + given_fetch_method = mock.Mock( + return_value=[('key1', value1), ('key2', value2)]) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + result1 = given_cache.fetch('key1', 0.0) + + # When value with another key is fetched and cached entry is + # expired + result2 = given_cache.fetch('key2', 0.0) + + # Then first result is returned + self.assertIs(value1, result1) + + # Then fetch method is called twice + self.assertEqual( + [mock.call(('key1',)), mock.call(('key2',))], + given_fetch_method.mock_calls) + + # Then second result is returned + self.assertIs(value2, result2) + + def test_fecth_all_with_multiple_entries(self): + given_fetch_method = mock.Mock( + return_value=iter([('key', 'value1'), + ('key', 'value2')])) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + results = list(given_cache.fetch_all(['key'], 0.0)) + + # Then fetch method is once + given_fetch_method.assert_called_once_with(('key',)) + + # Then both results are yield in the right order + self.assertEqual([('key', 'value1'), ('key', 'value2')], results) + + def test_fecth_all_with_repeated_entries(self): + entry = ('key', 'value') + given_fetch_method = mock.Mock( + return_value=iter([entry, entry, entry])) + given_cache = cache.Cache(given_fetch_method) + + # When value with key is fetched + results = list(given_cache.fetch_all(['key'], 0.0)) + + # Then fetch method is once + given_fetch_method.assert_called_once_with(('key',)) + + # Then results are yield in the right order + self.assertEqual([entry, entry, entry], results) diff --git a/networking-odl/networking_odl/tests/unit/common/test_callback.py b/networking-odl/networking_odl/tests/unit/common/test_callback.py new file mode 100644 index 0000000..f5e2ee6 --- /dev/null +++ b/networking-odl/networking_odl/tests/unit/common/test_callback.py @@ -0,0 +1,83 @@ +# Copyright (c) 2013-2014 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from networking_odl.common import callback +from networking_odl.common import constants as odl_const +from networking_odl.ml2.mech_driver import OpenDaylightDriver + +import mock +import testtools + +from neutron.callbacks import events +from neutron.callbacks import resources + + +FAKE_ID = 'fakeid' + + +class ODLCallbackTestCase(testtools.TestCase): + odl_driver = OpenDaylightDriver() + sgh = callback.OdlSecurityGroupsHandler(odl_driver) + + def setUp(self): + super(ODLCallbackTestCase, self).setUp() + + @mock.patch.object(OpenDaylightDriver, 'sync_from_callback') + def _test_callback_for_sg(self, event, op, sg, sg_id, sfc): + self.sgh.sg_callback(resources.SECURITY_GROUP, + event, + None, + security_group=sg, + security_group_id=sg_id) + + expected_dict = ({resources.SECURITY_GROUP: sg} + if sg is not None else None) + sfc.assert_called_with( + op, callback._RESOURCE_MAPPING[resources.SECURITY_GROUP], sg_id, + expected_dict) + + def test_callback_sg_create(self): + self._test_callback_for_sg(events.AFTER_CREATE, odl_const.ODL_CREATE, + mock.Mock(), None) + + def test_callback_sg_update(self): + self._test_callback_for_sg(events.AFTER_UPDATE, odl_const.ODL_UPDATE, + mock.Mock(), FAKE_ID) + + def test_callback_sg_delete(self): + self._test_callback_for_sg(events.AFTER_DELETE, odl_const.ODL_DELETE, + None, FAKE_ID) + + @mock.patch.object(OpenDaylightDriver, 'sync_from_callback') + def _test_callback_for_sg_rules(self, event, op, sg_rule, sg_rule_id, sfc): + self.sgh.sg_callback(resources.SECURITY_GROUP_RULE, + event, + None, + security_group_rule=sg_rule, + security_group_rule_id=sg_rule_id) + + expected_dict = ({resources.SECURITY_GROUP_RULE: sg_rule} + if sg_rule is not None else None) + sfc.assert_called_with( + op, callback._RESOURCE_MAPPING[resources.SECURITY_GROUP_RULE], + sg_rule_id, expected_dict) + + def test_callback_sg_rules_create(self): + self._test_callback_for_sg_rules( + events.AFTER_CREATE, odl_const.ODL_CREATE, mock.Mock(), None) + + def test_callback_sg_rules_delete(self): + self._test_callback_for_sg_rules( + events.AFTER_DELETE, odl_const.ODL_DELETE, None, FAKE_ID) diff --git a/networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py b/networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py new file mode 100644 index 0000000..ea3b5a8 --- /dev/null +++ b/networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py @@ -0,0 +1,174 @@ +# Copyright (c) 2015 Intel Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from networking_odl.common import lightweight_testing as lwt + +from neutron.tests import base + + +class LightweightTestingTestCase(base.DietTestCase): + + def test_create_client_with_lwt_enabled(self): + """Have to do the importation here, otherwise there will be a loop""" + from networking_odl.common import client as odl_client + odl_client.cfg.CONF.set_override('enable_lightweight_testing', + True, 'ml2_odl') + # DietTestCase does not automatically cleans configuration overrides + self.addCleanup(odl_client.cfg.CONF.reset) + + client = odl_client.OpenDaylightRestClient.create_client() + self.assertIsInstance(client, lwt.OpenDaylightLwtClient) + + def test_create_client_with_lwt_disabled(self): + """Have to do the importation here, otherwise there will be a loop""" + from networking_odl.common import client as odl_client + odl_client.cfg.CONF.set_override('enable_lightweight_testing', + False, 'ml2_odl') + # DietTestCase does not automatically cleans configuration overrides + self.addCleanup(odl_client.cfg.CONF.reset) + + client = odl_client.OpenDaylightRestClient.create_client() + self.assertIsInstance(client, odl_client.OpenDaylightRestClient) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'networks': {}}, clear=True) + def test_post_single_resource(self): + client = lwt.OpenDaylightLwtClient.create_client() + fake_network1 = {'id': 'fakeid1', 'name': 'fake_network1'} + obj = {'networks': fake_network1} + response = client.sendjson('post', 'networks', obj) + self.assertEqual(lwt.NO_CONTENT, response.status_code) + lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict + self.assertEqual(lwt_dict['networks']['fakeid1'], + fake_network1) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'networks': {}}, clear=True) + def test_post_multiple_resources(self): + client = lwt.OpenDaylightLwtClient.create_client() + fake_network1 = {'id': 'fakeid1', 'name': 'fake_network1'} + fake_network2 = {'id': 'fakeid2', 'name': 'fake_network2'} + obj = {'networks': [fake_network1, fake_network2]} + response = client.sendjson('post', 'networks', obj) + self.assertEqual(lwt.NO_CONTENT, response.status_code) + lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict + self.assertEqual(lwt_dict['networks']['fakeid1'], + fake_network1) + self.assertEqual(lwt_dict['networks']['fakeid2'], + fake_network2) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'ports': {'fakeid1': {'id': 'fakeid1', + 'name': 'fake_port1'}}}, + clear=True) + def test_get_single_resource(self): + client = lwt.OpenDaylightLwtClient.create_client() + url_path = 'ports/fakeid1' + response = client.sendjson('get', url_path, None) + self.assertEqual(lwt.OK, response.status_code) + res = response.json() + # For single resource, the return value is a dict + self.assertEqual(res['port']['name'], 'fake_port1') + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'ports': {'fakeid1': {'id': 'fakeid1', + 'name': 'fake_port1'}, + 'fakeid2': {'id': 'fakeid2', + 'name': 'fake_port2'}}}, + clear=True) + def test_get_multiple_resources(self): + client = lwt.OpenDaylightLwtClient.create_client() + url_path = 'ports/' + response = client.sendjson('get', url_path, None) + self.assertEqual(lwt.OK, response.status_code) + res = response.json() + for port in res: + self.assertIn(port['port']['name'], + ['fake_port1', 'fake_port2']) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'subnets': {'fakeid1': {'id': 'fakeid1', + 'name': 'fake_subnet1'}}}, + clear=True) + def test_put_single_resource(self): + client = lwt.OpenDaylightLwtClient.create_client() + changed = {'id': 'fakeid1', 'name': 'fake_subnet1_changed'} + obj = {'subnets': changed} + + url_path = 'subnets/fakeid1' + response = client.sendjson('put', url_path, obj) + self.assertEqual(lwt.NO_CONTENT, response.status_code) + lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict + self.assertEqual('fake_subnet1_changed', + lwt_dict['subnets']['fakeid1']['name']) + + """Check the client does not change the parameter""" + self.assertEqual('fakeid1', changed['id']) + self.assertEqual('fake_subnet1_changed', changed['name']) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'subnets': {'fakeid1': {'id': 'fakeid1', + 'name': 'fake_subnet1'}, + 'fakeid2': {'id': 'fakeid2', + 'name': 'fake_subnet2'}}}, + clear=True) + def test_put_multiple_resources(self): + client = lwt.OpenDaylightLwtClient.create_client() + changed1 = {'id': 'fakeid1', 'name': 'fake_subnet1_changed'} + changed2 = {'id': 'fakeid2', 'name': 'fake_subnet2_changed'} + obj = {'subnets': [changed1, changed2]} + + url_path = 'subnets/' + response = client.sendjson('put', url_path, obj) + self.assertEqual(lwt.NO_CONTENT, response.status_code) + lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict + self.assertEqual('fake_subnet1_changed', + lwt_dict['subnets']['fakeid1']['name']) + self.assertEqual('fake_subnet2_changed', + lwt_dict['subnets']['fakeid2']['name']) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'networks': {'fakeid1': {'id': 'fakeid1', + 'name': 'fake_network1'}}}, + clear=True) + def test_delete_single_resource(self): + client = lwt.OpenDaylightLwtClient.create_client() + url_path = 'networks/fakeid1' + response = client.sendjson('delete', url_path, None) + self.assertEqual(lwt.NO_CONTENT, response.status_code) + lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict + network = lwt_dict['networks'].get('fakeid1') + self.assertIsNone(network) + + @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict, + {'networks': {'fakeid1': {'id': 'fakeid1', + 'name': 'fake_network1'}, + 'fakeid2': {'id': 'fakeid2', + 'name': 'fake_network2'}}}, + clear=True) + def test_delete_multiple_resources(self): + client = lwt.OpenDaylightLwtClient.create_client() + network1 = {'id': 'fakeid1'} + network2 = {'id': 'fakeid2'} + obj = {'networks': [network1, network2]} + response = client.sendjson('delete', 'networks/', obj) + self.assertEqual(lwt.NO_CONTENT, response.status_code) + lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict + network = lwt_dict['networks'].get('fakeid1') + self.assertIsNone(network) + network = lwt_dict['networks'].get('fakeid2') + self.assertIsNone(network) diff --git a/networking-odl/networking_odl/tests/unit/common/test_utils.py b/networking-odl/networking_odl/tests/unit/common/test_utils.py new file mode 100644 index 0000000..dcfb50e --- /dev/null +++ b/networking-odl/networking_odl/tests/unit/common/test_utils.py @@ -0,0 +1,156 @@ +# Copyright (c) 2015 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.tests import base + +from networking_odl.common import cache +from networking_odl.common import utils + + +class TestGetAddressesByName(base.DietTestCase): + + # pylint: disable=protected-access, unused-argument + + def setUp(self): + super(TestGetAddressesByName, self).setUp() + self.clear_cache() + self.addCleanup(self.clear_cache) + time = self.patch( + utils.cache, 'time', clock=mock.Mock(return_value=0.0)) + self.clock = time.clock + socket = self.patch(utils, 'socket') + self.getaddrinfo = socket.getaddrinfo + + def patch(self, target, name, *args, **kwargs): + context = mock.patch.object(target, name, *args, **kwargs) + mocked = context.start() + self.addCleanup(context.stop) + return mocked + + def clear_cache(self): + utils._addresses_by_name_cache.clear() + + def test_get_addresses_by_valid_name(self): + self.getaddrinfo.return_value = [ + (2, 1, 6, '', ('127.0.0.1', 0)), + (2, 2, 17, '', ('127.0.0.1', 0)), + (2, 3, 0, '', ('127.0.0.1', 0)), + (2, 1, 6, '', ('10.237.214.247', 0)), + (2, 2, 17, '', ('10.237.214.247', 0)), + (2, 3, 0, '', ('10.237.214.247', 0))] + + # When valid host name is requested + result = utils.get_addresses_by_name('some_host_name') + + # Then correct addresses are returned + self.assertEqual(('127.0.0.1', '10.237.214.247'), result) + + # Then fetched addresses are cached + self.assertEqual(result, utils.get_addresses_by_name('some_host_name')) + + # Then addresses are fetched only once + self.getaddrinfo.assert_called_once_with('some_host_name', None) + + def test_get_addresses_by_valid_name_when_cache_expires(self): + self.getaddrinfo.return_value = [ + (2, 1, 6, '', ('127.0.0.1', 0)), + (2, 2, 17, '', ('127.0.0.1', 0)), + (2, 3, 0, '', ('127.0.0.1', 0)), + (2, 1, 6, '', ('10.237.214.247', 0)), + (2, 2, 17, '', ('10.237.214.247', 0)), + (2, 3, 0, '', ('10.237.214.247', 0))] + + # When valid host name is requested + result1 = utils.get_addresses_by_name('some_host_name') + + # and after a long time + self.clock.return_value = 1.0e6 + + # When valid host name is requested + result2 = utils.get_addresses_by_name('some_host_name') + + # Then correct addresses are returned + self.assertEqual(('127.0.0.1', '10.237.214.247'), result1) + self.assertEqual(('127.0.0.1', '10.237.214.247'), result2) + + # Then addresses are fetched twice + self.getaddrinfo.assert_has_calls( + [mock.call('some_host_name', None), + mock.call('some_host_name', None)]) + + @mock.patch.object(cache, 'LOG') + def test_get_addresses_by_invalid_name(self, cache_logger): + + # Given addresses resolution is failing + given_error = RuntimeError("I don't know him!") + + def failing_getaddrinfo(name, service): + raise given_error + + self.getaddrinfo.side_effect = failing_getaddrinfo + + # When invalid name is requested + self.assertRaises( + RuntimeError, utils.get_addresses_by_name, 'some_host_name') + + # When invalid name is requested again + self.assertRaises( + RuntimeError, utils.get_addresses_by_name, 'some_host_name') + + # Then result is fetched only once + self.getaddrinfo.assert_has_calls( + [mock.call('some_host_name', None)]) + cache_logger.warning.assert_has_calls( + [mock.call( + 'Error fetching values for keys: %r', "'some_host_name'", + exc_info=(RuntimeError, given_error, mock.ANY)), + mock.call( + 'Error fetching values for keys: %r', "'some_host_name'", + exc_info=(RuntimeError, given_error, mock.ANY))]) + + @mock.patch.object(cache, 'LOG') + def test_get_addresses_failing_when_expired_in_cache(self, cache_logger): + self.getaddrinfo.return_value = [ + (2, 1, 6, '', ('127.0.0.1', 0)), + (2, 2, 17, '', ('127.0.0.1', 0)), + (2, 3, 0, '', ('127.0.0.1', 0)), + (2, 1, 6, '', ('10.237.214.247', 0)), + (2, 2, 17, '', ('10.237.214.247', 0)), + (2, 3, 0, '', ('10.237.214.247', 0))] + + # Given valid result is in chache but expired + utils.get_addresses_by_name('some_host_name') + self.clock.return_value = 1.0e6 + + # Given addresses resolution is now failing + given_error = RuntimeError("This is top secret.") + + def failing_getaddrinfo(name, service): + raise given_error + + self.getaddrinfo.side_effect = failing_getaddrinfo + + self.assertRaises( + RuntimeError, utils.get_addresses_by_name, 'some_host_name') + + # Then result is fetched more times + self.getaddrinfo.assert_has_calls( + [mock.call('some_host_name', None), + mock.call('some_host_name', None)]) + cache_logger.warning.assert_called_once_with( + 'Error fetching values for keys: %r', "'some_host_name'", + exc_info=(RuntimeError, given_error, mock.ANY)) |