summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/tests/unit/common
diff options
context:
space:
mode:
Diffstat (limited to 'networking-odl/networking_odl/tests/unit/common')
-rw-r--r--networking-odl/networking_odl/tests/unit/common/__init__.py0
-rw-r--r--networking-odl/networking_odl/tests/unit/common/test_cache.py242
-rw-r--r--networking-odl/networking_odl/tests/unit/common/test_callback.py83
-rw-r--r--networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py174
-rw-r--r--networking-odl/networking_odl/tests/unit/common/test_utils.py156
5 files changed, 655 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/tests/unit/common/__init__.py b/networking-odl/networking_odl/tests/unit/common/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking-odl/networking_odl/tests/unit/common/__init__.py
diff --git a/networking-odl/networking_odl/tests/unit/common/test_cache.py b/networking-odl/networking_odl/tests/unit/common/test_cache.py
new file mode 100644
index 0000000..b702455
--- /dev/null
+++ b/networking-odl/networking_odl/tests/unit/common/test_cache.py
@@ -0,0 +1,242 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.tests import base
+
+from networking_odl.common import cache
+
+
+class TestCache(base.DietTestCase):
+
+ def test_init_with_callable(self):
+
+ def given_fetch_method():
+ pass
+
+ cache.Cache(given_fetch_method)
+
+ def test_init_without_callable(self):
+ self.assertRaises(TypeError, lambda: cache.Cache(object()))
+
+ def test_fecth_once(self):
+ value = 'value'
+
+ given_fetch_method = mock.Mock(return_value=iter([('key', value)]))
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ result = given_cache.fetch('key', 60.0)
+
+ # Result is returned
+ self.assertIs(value, result)
+
+ # Then fetch method is called once
+ given_fetch_method.assert_called_once_with(('key',))
+
+ def test_fecth_with_no_result(self):
+ given_fetch_method = mock.Mock(return_value=iter([]))
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ try:
+ given_cache.fetch('key', 60.0)
+ except cache.CacheFetchError as error:
+ given_fetch_method.assert_called_once_with(('key',))
+ self.assertRaises(KeyError, error.reraise_cause)
+ else:
+ self.fail('Expecting CacheFetchError to be raised.')
+
+ @mock.patch.object(cache, 'LOG')
+ def test_fecth_with_failure(self, logger):
+ # pylint: disable=unused-argument
+
+ given_error = RuntimeError("It doesn't work like this!")
+
+ def failing_function(keys):
+ raise given_error
+
+ given_fetch_method = mock.Mock(side_effect=failing_function)
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ try:
+ given_cache.fetch('key', 60.0)
+ except cache.CacheFetchError as error:
+ given_fetch_method.assert_called_once_with(('key',))
+ self.assertRaises(RuntimeError, error.reraise_cause)
+ else:
+ self.fail('Expecting CacheFetchError to be raised.')
+ logger.warning.assert_called_once_with(
+ 'Error fetching values for keys: %r', "'key'",
+ exc_info=(type(given_error), given_error, mock.ANY))
+
+ def test_fecth_again_after_clear(self):
+ value1 = 'value1'
+ value2 = 'value2'
+ given_fetch_method = mock.Mock(
+ side_effect=[iter([('key', value1)]),
+ iter([('key', value2)])])
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ result1 = given_cache.fetch('key', 60.0)
+
+ # When cache is cleared
+ given_cache.clear()
+
+ # When value with same key is fetched again
+ result2 = given_cache.fetch('key', 0.0)
+
+ # Then first result is returned
+ self.assertIs(value1, result1)
+
+ # Then fetch method is called twice
+ self.assertEqual(
+ [mock.call(('key',)), mock.call(('key',))],
+ given_fetch_method.mock_calls)
+
+ # Then second result is returned
+ self.assertIs(value2, result2)
+
+ def test_fecth_again_before_timeout(self):
+ value1 = 'value1'
+ value2 = 'value2'
+ given_fetch_method = mock.Mock(
+ side_effect=[iter([('key', value1)]),
+ iter([('key', value2)])])
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ result1 = given_cache.fetch('key', 1.0)
+
+ # When value with same key is fetched again and cached entry is not
+ # expired
+ result2 = given_cache.fetch('key', 0.0)
+
+ # First result is returned
+ self.assertIs(value1, result1)
+
+ # Then fetch method is called once
+ given_fetch_method.assert_called_once_with(('key',))
+
+ # Then first result is returned twice
+ self.assertIs(value1, result2)
+
+ def test_fecth_again_after_timeout(self):
+ value1 = 'value1'
+ value2 = 'value2'
+ given_fetch_method = mock.Mock(
+ side_effect=[iter([('key', value1)]),
+ iter([('key', value2)])])
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ result1 = given_cache.fetch('key', 0.0)
+
+ # When value with same key is fetched again and cached entry is
+ # expired
+ result2 = given_cache.fetch('key', 0.0)
+
+ # Then first result is returned
+ self.assertIs(value1, result1)
+
+ # Then fetch method is called twice
+ self.assertEqual(
+ [mock.call(('key',)), mock.call(('key',))],
+ given_fetch_method.mock_calls)
+
+ # Then second result is returned
+ self.assertIs(value2, result2)
+
+ def test_fecth_two_values_yielding_both_before_timeout(self):
+ value1 = 'value1'
+ value2 = 'value2'
+ given_fetch_method = mock.Mock(
+ return_value=iter([('key1', value1),
+ ('key2', value2)]))
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ result1 = given_cache.fetch('key1', 60.0)
+
+ # When value with another key is fetched and cached entry is not
+ # expired
+ result2 = given_cache.fetch('key2', 60.0)
+
+ # Then first result is returned
+ self.assertIs(value1, result1)
+
+ # Then fetch method is called once
+ given_fetch_method.assert_called_once_with(('key1',))
+
+ # Then second result is returned
+ self.assertIs(value2, result2)
+
+ def test_fecth_two_values_yielding_both_after_timeout(self):
+ value1 = 'value1'
+ value2 = 'value2'
+ given_fetch_method = mock.Mock(
+ return_value=[('key1', value1), ('key2', value2)])
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ result1 = given_cache.fetch('key1', 0.0)
+
+ # When value with another key is fetched and cached entry is
+ # expired
+ result2 = given_cache.fetch('key2', 0.0)
+
+ # Then first result is returned
+ self.assertIs(value1, result1)
+
+ # Then fetch method is called twice
+ self.assertEqual(
+ [mock.call(('key1',)), mock.call(('key2',))],
+ given_fetch_method.mock_calls)
+
+ # Then second result is returned
+ self.assertIs(value2, result2)
+
+ def test_fecth_all_with_multiple_entries(self):
+ given_fetch_method = mock.Mock(
+ return_value=iter([('key', 'value1'),
+ ('key', 'value2')]))
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ results = list(given_cache.fetch_all(['key'], 0.0))
+
+ # Then fetch method is once
+ given_fetch_method.assert_called_once_with(('key',))
+
+ # Then both results are yield in the right order
+ self.assertEqual([('key', 'value1'), ('key', 'value2')], results)
+
+ def test_fecth_all_with_repeated_entries(self):
+ entry = ('key', 'value')
+ given_fetch_method = mock.Mock(
+ return_value=iter([entry, entry, entry]))
+ given_cache = cache.Cache(given_fetch_method)
+
+ # When value with key is fetched
+ results = list(given_cache.fetch_all(['key'], 0.0))
+
+ # Then fetch method is once
+ given_fetch_method.assert_called_once_with(('key',))
+
+ # Then results are yield in the right order
+ self.assertEqual([entry, entry, entry], results)
diff --git a/networking-odl/networking_odl/tests/unit/common/test_callback.py b/networking-odl/networking_odl/tests/unit/common/test_callback.py
new file mode 100644
index 0000000..f5e2ee6
--- /dev/null
+++ b/networking-odl/networking_odl/tests/unit/common/test_callback.py
@@ -0,0 +1,83 @@
+# Copyright (c) 2013-2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from networking_odl.common import callback
+from networking_odl.common import constants as odl_const
+from networking_odl.ml2.mech_driver import OpenDaylightDriver
+
+import mock
+import testtools
+
+from neutron.callbacks import events
+from neutron.callbacks import resources
+
+
+FAKE_ID = 'fakeid'
+
+
+class ODLCallbackTestCase(testtools.TestCase):
+ odl_driver = OpenDaylightDriver()
+ sgh = callback.OdlSecurityGroupsHandler(odl_driver)
+
+ def setUp(self):
+ super(ODLCallbackTestCase, self).setUp()
+
+ @mock.patch.object(OpenDaylightDriver, 'sync_from_callback')
+ def _test_callback_for_sg(self, event, op, sg, sg_id, sfc):
+ self.sgh.sg_callback(resources.SECURITY_GROUP,
+ event,
+ None,
+ security_group=sg,
+ security_group_id=sg_id)
+
+ expected_dict = ({resources.SECURITY_GROUP: sg}
+ if sg is not None else None)
+ sfc.assert_called_with(
+ op, callback._RESOURCE_MAPPING[resources.SECURITY_GROUP], sg_id,
+ expected_dict)
+
+ def test_callback_sg_create(self):
+ self._test_callback_for_sg(events.AFTER_CREATE, odl_const.ODL_CREATE,
+ mock.Mock(), None)
+
+ def test_callback_sg_update(self):
+ self._test_callback_for_sg(events.AFTER_UPDATE, odl_const.ODL_UPDATE,
+ mock.Mock(), FAKE_ID)
+
+ def test_callback_sg_delete(self):
+ self._test_callback_for_sg(events.AFTER_DELETE, odl_const.ODL_DELETE,
+ None, FAKE_ID)
+
+ @mock.patch.object(OpenDaylightDriver, 'sync_from_callback')
+ def _test_callback_for_sg_rules(self, event, op, sg_rule, sg_rule_id, sfc):
+ self.sgh.sg_callback(resources.SECURITY_GROUP_RULE,
+ event,
+ None,
+ security_group_rule=sg_rule,
+ security_group_rule_id=sg_rule_id)
+
+ expected_dict = ({resources.SECURITY_GROUP_RULE: sg_rule}
+ if sg_rule is not None else None)
+ sfc.assert_called_with(
+ op, callback._RESOURCE_MAPPING[resources.SECURITY_GROUP_RULE],
+ sg_rule_id, expected_dict)
+
+ def test_callback_sg_rules_create(self):
+ self._test_callback_for_sg_rules(
+ events.AFTER_CREATE, odl_const.ODL_CREATE, mock.Mock(), None)
+
+ def test_callback_sg_rules_delete(self):
+ self._test_callback_for_sg_rules(
+ events.AFTER_DELETE, odl_const.ODL_DELETE, None, FAKE_ID)
diff --git a/networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py b/networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py
new file mode 100644
index 0000000..ea3b5a8
--- /dev/null
+++ b/networking-odl/networking_odl/tests/unit/common/test_lightweight_testing.py
@@ -0,0 +1,174 @@
+# Copyright (c) 2015 Intel Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from networking_odl.common import lightweight_testing as lwt
+
+from neutron.tests import base
+
+
+class LightweightTestingTestCase(base.DietTestCase):
+
+ def test_create_client_with_lwt_enabled(self):
+ """Have to do the importation here, otherwise there will be a loop"""
+ from networking_odl.common import client as odl_client
+ odl_client.cfg.CONF.set_override('enable_lightweight_testing',
+ True, 'ml2_odl')
+ # DietTestCase does not automatically cleans configuration overrides
+ self.addCleanup(odl_client.cfg.CONF.reset)
+
+ client = odl_client.OpenDaylightRestClient.create_client()
+ self.assertIsInstance(client, lwt.OpenDaylightLwtClient)
+
+ def test_create_client_with_lwt_disabled(self):
+ """Have to do the importation here, otherwise there will be a loop"""
+ from networking_odl.common import client as odl_client
+ odl_client.cfg.CONF.set_override('enable_lightweight_testing',
+ False, 'ml2_odl')
+ # DietTestCase does not automatically cleans configuration overrides
+ self.addCleanup(odl_client.cfg.CONF.reset)
+
+ client = odl_client.OpenDaylightRestClient.create_client()
+ self.assertIsInstance(client, odl_client.OpenDaylightRestClient)
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'networks': {}}, clear=True)
+ def test_post_single_resource(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ fake_network1 = {'id': 'fakeid1', 'name': 'fake_network1'}
+ obj = {'networks': fake_network1}
+ response = client.sendjson('post', 'networks', obj)
+ self.assertEqual(lwt.NO_CONTENT, response.status_code)
+ lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict
+ self.assertEqual(lwt_dict['networks']['fakeid1'],
+ fake_network1)
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'networks': {}}, clear=True)
+ def test_post_multiple_resources(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ fake_network1 = {'id': 'fakeid1', 'name': 'fake_network1'}
+ fake_network2 = {'id': 'fakeid2', 'name': 'fake_network2'}
+ obj = {'networks': [fake_network1, fake_network2]}
+ response = client.sendjson('post', 'networks', obj)
+ self.assertEqual(lwt.NO_CONTENT, response.status_code)
+ lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict
+ self.assertEqual(lwt_dict['networks']['fakeid1'],
+ fake_network1)
+ self.assertEqual(lwt_dict['networks']['fakeid2'],
+ fake_network2)
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'ports': {'fakeid1': {'id': 'fakeid1',
+ 'name': 'fake_port1'}}},
+ clear=True)
+ def test_get_single_resource(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ url_path = 'ports/fakeid1'
+ response = client.sendjson('get', url_path, None)
+ self.assertEqual(lwt.OK, response.status_code)
+ res = response.json()
+ # For single resource, the return value is a dict
+ self.assertEqual(res['port']['name'], 'fake_port1')
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'ports': {'fakeid1': {'id': 'fakeid1',
+ 'name': 'fake_port1'},
+ 'fakeid2': {'id': 'fakeid2',
+ 'name': 'fake_port2'}}},
+ clear=True)
+ def test_get_multiple_resources(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ url_path = 'ports/'
+ response = client.sendjson('get', url_path, None)
+ self.assertEqual(lwt.OK, response.status_code)
+ res = response.json()
+ for port in res:
+ self.assertIn(port['port']['name'],
+ ['fake_port1', 'fake_port2'])
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'subnets': {'fakeid1': {'id': 'fakeid1',
+ 'name': 'fake_subnet1'}}},
+ clear=True)
+ def test_put_single_resource(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ changed = {'id': 'fakeid1', 'name': 'fake_subnet1_changed'}
+ obj = {'subnets': changed}
+
+ url_path = 'subnets/fakeid1'
+ response = client.sendjson('put', url_path, obj)
+ self.assertEqual(lwt.NO_CONTENT, response.status_code)
+ lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict
+ self.assertEqual('fake_subnet1_changed',
+ lwt_dict['subnets']['fakeid1']['name'])
+
+ """Check the client does not change the parameter"""
+ self.assertEqual('fakeid1', changed['id'])
+ self.assertEqual('fake_subnet1_changed', changed['name'])
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'subnets': {'fakeid1': {'id': 'fakeid1',
+ 'name': 'fake_subnet1'},
+ 'fakeid2': {'id': 'fakeid2',
+ 'name': 'fake_subnet2'}}},
+ clear=True)
+ def test_put_multiple_resources(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ changed1 = {'id': 'fakeid1', 'name': 'fake_subnet1_changed'}
+ changed2 = {'id': 'fakeid2', 'name': 'fake_subnet2_changed'}
+ obj = {'subnets': [changed1, changed2]}
+
+ url_path = 'subnets/'
+ response = client.sendjson('put', url_path, obj)
+ self.assertEqual(lwt.NO_CONTENT, response.status_code)
+ lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict
+ self.assertEqual('fake_subnet1_changed',
+ lwt_dict['subnets']['fakeid1']['name'])
+ self.assertEqual('fake_subnet2_changed',
+ lwt_dict['subnets']['fakeid2']['name'])
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'networks': {'fakeid1': {'id': 'fakeid1',
+ 'name': 'fake_network1'}}},
+ clear=True)
+ def test_delete_single_resource(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ url_path = 'networks/fakeid1'
+ response = client.sendjson('delete', url_path, None)
+ self.assertEqual(lwt.NO_CONTENT, response.status_code)
+ lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict
+ network = lwt_dict['networks'].get('fakeid1')
+ self.assertIsNone(network)
+
+ @mock.patch.dict(lwt.OpenDaylightLwtClient.lwt_dict,
+ {'networks': {'fakeid1': {'id': 'fakeid1',
+ 'name': 'fake_network1'},
+ 'fakeid2': {'id': 'fakeid2',
+ 'name': 'fake_network2'}}},
+ clear=True)
+ def test_delete_multiple_resources(self):
+ client = lwt.OpenDaylightLwtClient.create_client()
+ network1 = {'id': 'fakeid1'}
+ network2 = {'id': 'fakeid2'}
+ obj = {'networks': [network1, network2]}
+ response = client.sendjson('delete', 'networks/', obj)
+ self.assertEqual(lwt.NO_CONTENT, response.status_code)
+ lwt_dict = lwt.OpenDaylightLwtClient.lwt_dict
+ network = lwt_dict['networks'].get('fakeid1')
+ self.assertIsNone(network)
+ network = lwt_dict['networks'].get('fakeid2')
+ self.assertIsNone(network)
diff --git a/networking-odl/networking_odl/tests/unit/common/test_utils.py b/networking-odl/networking_odl/tests/unit/common/test_utils.py
new file mode 100644
index 0000000..dcfb50e
--- /dev/null
+++ b/networking-odl/networking_odl/tests/unit/common/test_utils.py
@@ -0,0 +1,156 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+
+from neutron.tests import base
+
+from networking_odl.common import cache
+from networking_odl.common import utils
+
+
+class TestGetAddressesByName(base.DietTestCase):
+
+ # pylint: disable=protected-access, unused-argument
+
+ def setUp(self):
+ super(TestGetAddressesByName, self).setUp()
+ self.clear_cache()
+ self.addCleanup(self.clear_cache)
+ time = self.patch(
+ utils.cache, 'time', clock=mock.Mock(return_value=0.0))
+ self.clock = time.clock
+ socket = self.patch(utils, 'socket')
+ self.getaddrinfo = socket.getaddrinfo
+
+ def patch(self, target, name, *args, **kwargs):
+ context = mock.patch.object(target, name, *args, **kwargs)
+ mocked = context.start()
+ self.addCleanup(context.stop)
+ return mocked
+
+ def clear_cache(self):
+ utils._addresses_by_name_cache.clear()
+
+ def test_get_addresses_by_valid_name(self):
+ self.getaddrinfo.return_value = [
+ (2, 1, 6, '', ('127.0.0.1', 0)),
+ (2, 2, 17, '', ('127.0.0.1', 0)),
+ (2, 3, 0, '', ('127.0.0.1', 0)),
+ (2, 1, 6, '', ('10.237.214.247', 0)),
+ (2, 2, 17, '', ('10.237.214.247', 0)),
+ (2, 3, 0, '', ('10.237.214.247', 0))]
+
+ # When valid host name is requested
+ result = utils.get_addresses_by_name('some_host_name')
+
+ # Then correct addresses are returned
+ self.assertEqual(('127.0.0.1', '10.237.214.247'), result)
+
+ # Then fetched addresses are cached
+ self.assertEqual(result, utils.get_addresses_by_name('some_host_name'))
+
+ # Then addresses are fetched only once
+ self.getaddrinfo.assert_called_once_with('some_host_name', None)
+
+ def test_get_addresses_by_valid_name_when_cache_expires(self):
+ self.getaddrinfo.return_value = [
+ (2, 1, 6, '', ('127.0.0.1', 0)),
+ (2, 2, 17, '', ('127.0.0.1', 0)),
+ (2, 3, 0, '', ('127.0.0.1', 0)),
+ (2, 1, 6, '', ('10.237.214.247', 0)),
+ (2, 2, 17, '', ('10.237.214.247', 0)),
+ (2, 3, 0, '', ('10.237.214.247', 0))]
+
+ # When valid host name is requested
+ result1 = utils.get_addresses_by_name('some_host_name')
+
+ # and after a long time
+ self.clock.return_value = 1.0e6
+
+ # When valid host name is requested
+ result2 = utils.get_addresses_by_name('some_host_name')
+
+ # Then correct addresses are returned
+ self.assertEqual(('127.0.0.1', '10.237.214.247'), result1)
+ self.assertEqual(('127.0.0.1', '10.237.214.247'), result2)
+
+ # Then addresses are fetched twice
+ self.getaddrinfo.assert_has_calls(
+ [mock.call('some_host_name', None),
+ mock.call('some_host_name', None)])
+
+ @mock.patch.object(cache, 'LOG')
+ def test_get_addresses_by_invalid_name(self, cache_logger):
+
+ # Given addresses resolution is failing
+ given_error = RuntimeError("I don't know him!")
+
+ def failing_getaddrinfo(name, service):
+ raise given_error
+
+ self.getaddrinfo.side_effect = failing_getaddrinfo
+
+ # When invalid name is requested
+ self.assertRaises(
+ RuntimeError, utils.get_addresses_by_name, 'some_host_name')
+
+ # When invalid name is requested again
+ self.assertRaises(
+ RuntimeError, utils.get_addresses_by_name, 'some_host_name')
+
+ # Then result is fetched only once
+ self.getaddrinfo.assert_has_calls(
+ [mock.call('some_host_name', None)])
+ cache_logger.warning.assert_has_calls(
+ [mock.call(
+ 'Error fetching values for keys: %r', "'some_host_name'",
+ exc_info=(RuntimeError, given_error, mock.ANY)),
+ mock.call(
+ 'Error fetching values for keys: %r', "'some_host_name'",
+ exc_info=(RuntimeError, given_error, mock.ANY))])
+
+ @mock.patch.object(cache, 'LOG')
+ def test_get_addresses_failing_when_expired_in_cache(self, cache_logger):
+ self.getaddrinfo.return_value = [
+ (2, 1, 6, '', ('127.0.0.1', 0)),
+ (2, 2, 17, '', ('127.0.0.1', 0)),
+ (2, 3, 0, '', ('127.0.0.1', 0)),
+ (2, 1, 6, '', ('10.237.214.247', 0)),
+ (2, 2, 17, '', ('10.237.214.247', 0)),
+ (2, 3, 0, '', ('10.237.214.247', 0))]
+
+ # Given valid result is in chache but expired
+ utils.get_addresses_by_name('some_host_name')
+ self.clock.return_value = 1.0e6
+
+ # Given addresses resolution is now failing
+ given_error = RuntimeError("This is top secret.")
+
+ def failing_getaddrinfo(name, service):
+ raise given_error
+
+ self.getaddrinfo.side_effect = failing_getaddrinfo
+
+ self.assertRaises(
+ RuntimeError, utils.get_addresses_by_name, 'some_host_name')
+
+ # Then result is fetched more times
+ self.getaddrinfo.assert_has_calls(
+ [mock.call('some_host_name', None),
+ mock.call('some_host_name', None)])
+ cache_logger.warning.assert_called_once_with(
+ 'Error fetching values for keys: %r', "'some_host_name'",
+ exc_info=(RuntimeError, given_error, mock.ANY))