# Copyright (c) 2013-2014 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import mock import socket from oslo_config import cfg from oslo_serialization import jsonutils import requests import webob.exc from neutron.db import segments_db from neutron.extensions import portbindings from neutron.plugins.common import constants from neutron.plugins.ml2 import config as config from neutron.plugins.ml2 import driver_api as api from neutron.plugins.ml2 import driver_context as driver_context from neutron.plugins.ml2 import plugin from neutron.tests import base from neutron.tests.unit.plugins.ml2 import test_plugin from neutron.tests.unit import testlib_api from neutron_lib import constants as n_constants from networking_odl.common import client from networking_odl.common import constants as odl_const from networking_odl.ml2 import legacy_port_binding from networking_odl.ml2 import mech_driver from networking_odl.ml2 import network_topology cfg.CONF.import_group('ml2_odl', 'networking_odl.common.config') HOST = 'fake-host' PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin' FAKE_NETWORK = {'status': 'ACTIVE', 'subnets': [], 'name': 'net1', 'provider:physical_network': None, 'admin_state_up': True, 'tenant_id': 'test-tenant', 'provider:network_type': 'local', 'router:external': False, 'shared': False, 'id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e', 'provider:segmentation_id': None} FAKE_SUBNET = {'ipv6_ra_mode': None, 'allocation_pools': [{'start': '10.0.0.2', 'end': '10.0.1.254'}], 'host_routes': [], 'ipv6_address_mode': None, 'cidr': '10.0.0.0/23', 'id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839', 'name': '', 'enable_dhcp': True, 'network_id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e', 'tenant_id': 'test-tenant', 'dns_nameservers': [], 'gateway_ip': '10.0.0.1', 'ip_version': 4, 'shared': False} FAKE_PORT = {'status': 'DOWN', 'binding:host_id': '', 'allowed_address_pairs': [], 'device_owner': 'fake_owner', 'binding:profile': {}, 'fixed_ips': [], 'id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839', 'security_groups': [], 'device_id': 'fake_device', 'name': '', 'admin_state_up': True, 'network_id': 'c13bba05-eb07-45ba-ace2-765706b2d701', 'tenant_id': 'bad_tenant_id', 'binding:vif_details': {}, 'binding:vnic_type': 'normal', 'binding:vif_type': 'unbound', 'mac_address': '12:34:56:78:21:b6'} FAKE_SECURITY_GROUP = {'description': 'Default security group', 'id': '6875fc07-853f-4230-9ab9-23d1af894240', 'name': 'default', 'security_group_rules': [], 'tenant_id': '04bb5f9a0fa14ad18203035c791ffae2'} FAKE_SECURITY_GROUP_RULE = {'direction': 'ingress', 'ethertype': 'IPv4', 'id': '399029df-cefe-4a7a-b6d6-223558627d23', 'port_range_max': 0, 'port_range_min': 0, 'protocol': 0, 'remote_group_id': '6875fc07-853f-4230-9ab9', 'remote_ip_prefix': 0, 'security_group_id': '6875fc07-853f-4230-9ab9', 'tenant_id': '04bb5f9a0fa14ad18203035c791ffae2'} class OpenDaylightTestCase(test_plugin.Ml2PluginV2TestCase): _mechanism_drivers = ['opendaylight'] def setUp(self): # Set URL/user/pass so init doesn't throw a cfg required error. # They are not used in these tests since sendjson is overwritten. config.cfg.CONF.set_override('url', 'http://127.0.0.1:9999', 'ml2_odl') config.cfg.CONF.set_override('username', 'someuser', 'ml2_odl') config.cfg.CONF.set_override('password', 'somepass', 'ml2_odl') super(OpenDaylightTestCase, self).setUp() self.port_create_status = 'DOWN' self.mech = mech_driver.OpenDaylightMechanismDriver() mock.patch.object( client.OpenDaylightRestClient, 'sendjson', new=self.check_sendjson).start() # Prevent test from accidentally connecting to any web service mock.patch.object( network_topology, 'NetworkTopologyClient', return_value=mock.Mock( specs=network_topology.NetworkTopologyClient, get=mock.Mock(side_effect=requests.HTTPError))).start() # Prevent hosts resolution from changing the behaviour of tests mock.patch.object( network_topology.utils, 'get_addresses_by_name', side_effect=socket.gaierror).start() def check_sendjson(self, method, urlpath, obj): self.assertFalse(urlpath.startswith("http://")) class OpenDayLightMechanismConfigTests(testlib_api.SqlTestCase): def _set_config(self, url='http://127.0.0.1:9999', username='someuser', password='somepass'): config.cfg.CONF.set_override('mechanism_drivers', ['logger', 'opendaylight'], 'ml2') config.cfg.CONF.set_override('url', url, 'ml2_odl') config.cfg.CONF.set_override('username', username, 'ml2_odl') config.cfg.CONF.set_override('password', password, 'ml2_odl') def _test_missing_config(self, **kwargs): self._set_config(**kwargs) self.assertRaises(config.cfg.RequiredOptError, plugin.Ml2Plugin) def test_valid_config(self): self._set_config() plugin.Ml2Plugin() def test_missing_url_raises_exception(self): self._test_missing_config(url=None) def test_missing_username_raises_exception(self): self._test_missing_config(username=None) def test_missing_password_raises_exception(self): self._test_missing_config(password=None) class OpenDaylightMechanismTestBasicGet(test_plugin.TestMl2BasicGet, OpenDaylightTestCase): pass class OpenDaylightMechanismTestNetworksV2(test_plugin.TestMl2NetworksV2, OpenDaylightTestCase): pass class OpenDaylightMechanismTestSubnetsV2(test_plugin.TestMl2SubnetsV2, OpenDaylightTestCase): pass class OpenDaylightMechanismTestPortsV2(test_plugin.TestMl2PortsV2, OpenDaylightTestCase): def setUp(self): mock.patch.object( mech_driver.OpenDaylightDriver, 'out_of_sync', new_callable=mock.PropertyMock(return_value=False)).start() super(OpenDaylightMechanismTestPortsV2, self).setUp() def test_update_port_mac(self): self.check_update_port_mac( host_arg={portbindings.HOST_ID: HOST}, arg_list=(portbindings.HOST_ID,), expected_status=webob.exc.HTTPConflict.code, expected_error='PortBound') class DataMatcher(object): def __init__(self, operation, object_type, context): self._data = context.current.copy() self._object_type = object_type filter_cls = mech_driver.OpenDaylightDriver.FILTER_MAP[ '%ss' % object_type] attr_filter = getattr(filter_cls, 'filter_%s_attributes' % operation) attr_filter(self._data, context) def __eq__(self, s): data = jsonutils.loads(s) return self._data == data[self._object_type] def __ne__(self, s): return not self.__eq__(s) class OpenDaylightSyncTestCase(OpenDaylightTestCase): def setUp(self): super(OpenDaylightSyncTestCase, self).setUp() self.given_back_end = mech_driver.OpenDaylightDriver() def test_simple_sync_all_with_HTTPError_not_found(self): self.given_back_end.out_of_sync = True ml2_plugin = plugin.Ml2Plugin() response = mock.Mock(status_code=requests.codes.not_found) fake_exception = requests.exceptions.HTTPError('Test', response=response) def side_eff(*args, **kwargs): # HTTP ERROR exception with 404 status code will be raised when use # sendjson to get the object in ODL DB if args[0] == 'get': raise fake_exception with mock.patch.object(client.OpenDaylightRestClient, 'sendjson', side_effect=side_eff), \ mock.patch.object(plugin.Ml2Plugin, 'get_networks', return_value=[FAKE_NETWORK.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_network', return_value=FAKE_NETWORK.copy()), \ mock.patch.object(plugin.Ml2Plugin, 'get_subnets', return_value=[FAKE_SUBNET.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_ports', return_value=[FAKE_PORT.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_security_groups', return_value=[FAKE_SECURITY_GROUP.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_security_group_rules', return_value=[FAKE_SECURITY_GROUP_RULE.copy()]): self.given_back_end.sync_full(ml2_plugin) sync_id_list = [FAKE_NETWORK['id'], FAKE_SUBNET['id'], FAKE_PORT['id'], FAKE_SECURITY_GROUP['id'], FAKE_SECURITY_GROUP_RULE['id']] act = [] for args, kwargs in \ client.OpenDaylightRestClient.sendjson.call_args_list: if args[0] == 'post': for key in args[2]: act.append(args[2][key][0]['id']) self.assertEqual(act, sync_id_list) def test_simple_sync_all_with_all_synced(self): self.given_back_end.out_of_sync = True ml2_plugin = plugin.Ml2Plugin() with mock.patch.object(client.OpenDaylightRestClient, 'sendjson', return_value=None), \ mock.patch.object(plugin.Ml2Plugin, 'get_networks', return_value=[FAKE_NETWORK.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_subnets', return_value=[FAKE_SUBNET.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_ports', return_value=[FAKE_PORT.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_security_groups', return_value=[FAKE_SECURITY_GROUP.copy()]), \ mock.patch.object(plugin.Ml2Plugin, 'get_security_group_rules', return_value=[FAKE_SECURITY_GROUP_RULE.copy()]): self.given_back_end.sync_full(ml2_plugin) # it's only called for GET, there is no call for PUT # 5 = network, subnet, port, security_group, security_group_rule self.assertEqual(5, client.OpenDaylightRestClient.sendjson.call_count) class OpenDaylightMechanismDriverTestCase(base.BaseTestCase): def setUp(self): super(OpenDaylightMechanismDriverTestCase, self).setUp() config.cfg.CONF.set_override('mechanism_drivers', ['logger', 'opendaylight'], 'ml2') config.cfg.CONF.set_override('url', 'http://127.0.0.1:9999', 'ml2_odl') config.cfg.CONF.set_override('username', 'someuser', 'ml2_odl') config.cfg.CONF.set_override('password', 'somepass', 'ml2_odl') self.mech = mech_driver.OpenDaylightMechanismDriver() self.mech.initialize() @staticmethod def _get_mock_network_operation_context(): context = mock.Mock(current=FAKE_NETWORK.copy()) return context @staticmethod def _get_mock_subnet_operation_context(): context = mock.Mock(current=FAKE_SUBNET.copy()) return context @staticmethod def _get_mock_port_operation_context(): context = mock.Mock(current=FAKE_PORT.copy()) context._plugin.get_security_group = mock.Mock(return_value={}) return context @classmethod def _get_mock_operation_context(cls, object_type): getter = getattr(cls, '_get_mock_%s_operation_context' % object_type) return getter() _status_code_msgs = { 200: '', 201: '', 204: '', 400: '400 Client Error: Bad Request', 401: '401 Client Error: Unauthorized', 403: '403 Client Error: Forbidden', 404: '404 Client Error: Not Found', 409: '409 Client Error: Conflict', 501: '501 Server Error: Not Implemented', 503: '503 Server Error: Service Unavailable', } @classmethod def _get_mock_request_response(cls, status_code): response = mock.Mock(status_code=status_code) response.raise_for_status = mock.Mock() if status_code < 400 else ( mock.Mock(side_effect=requests.exceptions.HTTPError( cls._status_code_msgs[status_code], response=response))) return response def _test_single_operation(self, method, context, status_code, exc_class=None, *args, **kwargs): self.mech.odl_drv.out_of_sync = False request_response = self._get_mock_request_response(status_code) with mock.patch('requests.request', return_value=request_response) as mock_method: if exc_class is not None: self.assertRaises(exc_class, method, context) else: method(context) mock_method.assert_called_once_with( headers={'Content-Type': 'application/json'}, auth=(config.cfg.CONF.ml2_odl.username, config.cfg.CONF.ml2_odl.password), timeout=config.cfg.CONF.ml2_odl.timeout, *args, **kwargs) def _test_create_resource_postcommit(self, object_type, status_code, exc_class=None): method = getattr(self.mech, 'create_%s_postcommit' % object_type) context = self._get_mock_operation_context(object_type) url = '%s/%ss' % (config.cfg.CONF.ml2_odl.url, object_type) kwargs = {'url': url, 'data': DataMatcher(odl_const.ODL_CREATE, object_type, context)} self._test_single_operation(method, context, status_code, exc_class, 'post', **kwargs) def _test_update_resource_postcommit(self, object_type, status_code, exc_class=None): method = getattr(self.mech, 'update_%s_postcommit' % object_type) context = self._get_mock_operation_context(object_type) url = '%s/%ss/%s' % (config.cfg.CONF.ml2_odl.url, object_type, context.current['id']) kwargs = {'url': url, 'data': DataMatcher(odl_const.ODL_UPDATE, object_type, context)} self._test_single_operation(method, context, status_code, exc_class, 'put', **kwargs) def _test_delete_resource_postcommit(self, object_type, status_code, exc_class=None): method = getattr(self.mech, 'delete_%s_postcommit' % object_type) context = self._get_mock_operation_context(object_type) url = '%s/%ss/%s' % (config.cfg.CONF.ml2_odl.url, object_type, context.current['id']) kwargs = {'url': url, 'data': None} self._test_single_operation(method, context, status_code, exc_class, odl_const.ODL_DELETE, **kwargs) def test_create_network_postcommit(self): self._test_create_resource_postcommit(odl_const.ODL_NETWORK, requests.codes.created) for status_code in (requests.codes.bad_request, requests.codes.unauthorized): self._test_create_resource_postcommit( odl_const.ODL_NETWORK, status_code, requests.exceptions.HTTPError) def test_create_subnet_postcommit(self): self._test_create_resource_postcommit(odl_const.ODL_SUBNET, requests.codes.created) for status_code in (requests.codes.bad_request, requests.codes.unauthorized, requests.codes.forbidden, requests.codes.not_found, requests.codes.conflict, requests.codes.not_implemented): self._test_create_resource_postcommit( odl_const.ODL_SUBNET, status_code, requests.exceptions.HTTPError) def test_create_port_postcommit(self): self._test_create_resource_postcommit(odl_const.ODL_PORT, requests.codes.created) for status_code in (requests.codes.bad_request, requests.codes.unauthorized, requests.codes.forbidden, requests.codes.not_found, requests.codes.conflict, requests.codes.not_implemented, requests.codes.service_unavailable): self._test_create_resource_postcommit( odl_const.ODL_PORT, status_code, requests.exceptions.HTTPError) def test_update_network_postcommit(self): self._test_update_resource_postcommit(odl_const.ODL_NETWORK, requests.codes.ok) for status_code in (requests.codes.bad_request, requests.codes.forbidden, requests.codes.not_found): self._test_update_resource_postcommit( odl_const.ODL_NETWORK, status_code, requests.exceptions.HTTPError) def test_update_subnet_postcommit(self): self._test_update_resource_postcommit(odl_const.ODL_SUBNET, requests.codes.ok) for status_code in (requests.codes.bad_request, requests.codes.unauthorized, requests.codes.forbidden, requests.codes.not_found, requests.codes.not_implemented): self._test_update_resource_postcommit( odl_const.ODL_SUBNET, status_code, requests.exceptions.HTTPError) def test_update_port_postcommit(self): self._test_update_resource_postcommit(odl_const.ODL_PORT, requests.codes.ok) for status_code in (requests.codes.bad_request, requests.codes.unauthorized, requests.codes.forbidden, requests.codes.not_found, requests.codes.conflict, requests.codes.not_implemented): self._test_update_resource_postcommit( odl_const.ODL_PORT, status_code, requests.exceptions.HTTPError) def test_delete_network_postcommit(self): self._test_delete_resource_postcommit(odl_const.ODL_NETWORK, requests.codes.no_content) self._test_delete_resource_postcommit(odl_const.ODL_NETWORK, requests.codes.not_found) for status_code in (requests.codes.unauthorized, requests.codes.conflict): self._test_delete_resource_postcommit( odl_const.ODL_NETWORK, status_code, requests.exceptions.HTTPError) def test_delete_subnet_postcommit(self): self._test_delete_resource_postcommit(odl_const.ODL_SUBNET, requests.codes.no_content) self._test_delete_resource_postcommit(odl_const.ODL_SUBNET, requests.codes.not_found) for status_code in (requests.codes.unauthorized, requests.codes.conflict, requests.codes.not_implemented): self._test_delete_resource_postcommit( odl_const.ODL_SUBNET, status_code, requests.exceptions.HTTPError) def test_delete_port_postcommit(self): self._test_delete_resource_postcommit(odl_const.ODL_PORT, requests.codes.no_content) self._test_delete_resource_postcommit(odl_const.ODL_PORT, requests.codes.not_found) for status_code in (requests.codes.unauthorized, requests.codes.forbidden, requests.codes.not_implemented): self._test_delete_resource_postcommit( odl_const.ODL_PORT, status_code, requests.exceptions.HTTPError) def test_port_emtpy_tenant_id_work_around(self): """Validate the work around code of port creation""" plugin = mock.Mock() plugin_context = mock.Mock() network = self._get_mock_operation_context( odl_const.ODL_NETWORK).current port = self._get_mock_operation_context(odl_const.ODL_PORT).current tenant_id = network['tenant_id'] port['tenant_id'] = '' with mock.patch.object(segments_db, 'get_network_segments'): context = driver_context.PortContext( plugin, plugin_context, port, network, {}, 0, None) self.mech.odl_drv.FILTER_MAP[ odl_const.ODL_PORTS].filter_create_attributes(port, context) self.assertEqual(tenant_id, port['tenant_id']) def test_update_port_filter(self): """Validate the filter code on update port operation""" items_to_filter = ['network_id', 'id', 'status', 'tenant_id'] plugin_context = mock.Mock() network = self._get_mock_operation_context( odl_const.ODL_NETWORK).current subnet = self._get_mock_operation_context(odl_const.ODL_SUBNET).current port = self._get_mock_operation_context(odl_const.ODL_PORT).current port['fixed_ips'] = [{'subnet_id': subnet['id'], 'ip_address': '10.0.0.10'}] port['mac_address'] = port['mac_address'].upper() orig_port = copy.deepcopy(port) with mock.patch.object(segments_db, 'get_network_segments'): context = driver_context.PortContext( plugin, plugin_context, port, network, {}, 0, None) self.mech.odl_drv.FILTER_MAP[ odl_const.ODL_PORTS].filter_update_attributes(port, context) for key, value in port.items(): if key not in items_to_filter: self.assertEqual(orig_port[key], value) class TestOpenDaylightMechanismDriver(base.DietTestCase): # given valid and invalid segments valid_segment = { api.ID: 'API_ID', api.NETWORK_TYPE: constants.TYPE_LOCAL, api.SEGMENTATION_ID: 'API_SEGMENTATION_ID', api.PHYSICAL_NETWORK: 'API_PHYSICAL_NETWORK'} invalid_segment = { api.ID: 'API_ID', api.NETWORK_TYPE: constants.TYPE_NONE, api.SEGMENTATION_ID: 'API_SEGMENTATION_ID', api.PHYSICAL_NETWORK: 'API_PHYSICAL_NETWORK'} def test_bind_port_front_end(self): given_front_end = mech_driver.OpenDaylightMechanismDriver() given_port_context = self.given_port_context() given_back_end = mech_driver.OpenDaylightDriver() given_front_end.odl_drv = given_back_end given_back_end.port_binding_controller = \ legacy_port_binding.LegacyPortBindingManager() # when port is bound given_front_end.bind_port(given_port_context) # then context binding is setup with returned vif_type and valid # segment API ID given_port_context.set_binding.assert_called_once_with( self.valid_segment[api.ID], portbindings.VIF_TYPE_OVS, given_back_end.port_binding_controller.vif_details, status=n_constants.PORT_STATUS_ACTIVE) def given_port_context(self): from neutron.plugins.ml2 import driver_context as ctx # given NetworkContext network = mock.MagicMock(spec=api.NetworkContext) # given port context return mock.MagicMock( spec=ctx.PortContext, current={'id': 'CURRENT_CONTEXT_ID'}, segments_to_bind=[self.valid_segment, self.invalid_segment], network=network, _new_bound_segment=self.valid_segment)