# -*- coding: utf-8 -*- # Copyright 2014 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os.path import tempfile from oslo_concurrency import processutils from os_net_config import impl_ifcfg from os_net_config import NetConfig from os_net_config import objects from os_net_config.tests import base from os_net_config import utils _BASE_IFCFG = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no """ _BASE_IFCFG_NETWORKMANAGER = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=yes PEERDNS=no """ _HOTPLUG = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=yes NM_CONTROLLED=no PEERDNS=no BOOTPROTO=none """ _NO_IP = _BASE_IFCFG + "BOOTPROTO=none\n" _V4_IFCFG = _BASE_IFCFG + """BOOTPROTO=static IPADDR=192.168.1.2 NETMASK=255.255.255.0 """ _V4_V6_IFCFG = _BASE_IFCFG + """IPV6INIT=yes BOOTPROTO=static IPADDR=192.168.1.2 NETMASK=255.255.255.0 IPV6_AUTOCONF=no IPV6ADDR=2001:abc:a::/64 """ _IFCFG_VLAN = """# This file is autogenerated by os-net-config DEVICE=em1.120 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no VLAN=yes BOOTPROTO=none """ _V4_IFCFG_MAPPED = _V4_IFCFG.replace('em1', 'nic1') + "HWADDR=a1:b2:c3:d4:e5\n" _BASE_IB_IFCFG = """# This file is autogenerated by os-net-config DEVICE=ib0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no TYPE=Infiniband """ _V4_IB_IFCFG = _BASE_IB_IFCFG + """BOOTPROTO=static IPADDR=192.168.1.2 NETMASK=255.255.255.0 """ _V4_IFCFG_MULTIPLE = _V4_IFCFG + """IPADDR1=192.168.1.3 NETMASK1=255.255.255.255 IPADDR2=10.0.0.2 NETMASK2=255.0.0.0 """ _IB_V4_IFCFG_MULTIPLE = _V4_IB_IFCFG + """IPADDR1=192.168.1.3 NETMASK1=255.255.255.255 IPADDR2=10.0.0.2 NETMASK2=255.0.0.0 """ _V6_IFCFG = _BASE_IFCFG + """IPV6INIT=yes IPV6_AUTOCONF=no IPV6ADDR=2001:abc:a::/64 """ _V6_IFCFG_MULTIPLE = (_V6_IFCFG + "IPV6ADDR_SECONDARIES=\"2001:abc:b::1/64 " + "2001:abc:c::2/96\"\n") _OVS_IFCFG = _BASE_IFCFG + "DEVICETYPE=ovs\nBOOTPROTO=none\n" _OVS_IFCFG_TUNNEL = """# This file is autogenerated by os-net-config DEVICE=tun0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSTunnel OVS_BRIDGE=br-ctlplane OVS_TUNNEL_TYPE=gre OVS_TUNNEL_OPTIONS="options:remote_ip=192.168.1.1" """ _OVS_BRIDGE_IFCFG = _BASE_IFCFG + "DEVICETYPE=ovs\n" _LINUX_BRIDGE_IFCFG = _BASE_IFCFG + "BRIDGE=br-ctlplane\nBOOTPROTO=none\n" _ROUTES = """default via 192.168.1.1 dev em1 metric 10 172.19.0.0/24 via 192.168.1.1 dev em1 172.20.0.0/24 via 192.168.1.5 dev em1 metric 100 """ _ROUTES_V6 = """default via 2001:db8::1 dev em1 2001:db8:dead:beef:cafe::/56 via fd00:fd00:2000::1 dev em1 2001:db8:dead:beff::/64 via fd00:fd00:2000::1 dev em1 metric 100 """ _OVS_INTERFACE = _BASE_IFCFG + """DEVICETYPE=ovs TYPE=OVSPort OVS_BRIDGE=br-ctlplane BOOTPROTO=none """ _OVS_BRIDGE_DHCP = """# This file is autogenerated by os-net-config DEVICE=br-ctlplane ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no DEVICETYPE=ovs TYPE=OVSBridge OVSBOOTPROTO=dhcp OVSDHCPINTERFACES="em1" """ _NM_CONTROLLED_INTERFACE = _BASE_IFCFG_NETWORKMANAGER + """MASTER=bond1 SLAVE=yes BOOTPROTO=none """ _NM_CONTROLLED_BOND = """# This file is autogenerated by os-net-config DEVICE=bond1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=yes PEERDNS=no """ _OVS_BRIDGE_DHCP_STANDALONE = _OVS_BRIDGE_DHCP + \ "OVS_EXTRA=\"set bridge br-ctlplane fail_mode=standalone\"\n" _OVS_BRIDGE_DHCP_SECURE = _OVS_BRIDGE_DHCP + \ "OVS_EXTRA=\"set bridge br-ctlplane fail_mode=secure\"\n" _LINUX_BRIDGE_DHCP = """# This file is autogenerated by os-net-config DEVICE=br-ctlplane ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no TYPE=Bridge DELAY=0 BOOTPROTO=dhcp """ _OVS_BRIDGE_STATIC = """# This file is autogenerated by os-net-config DEVICE=br-ctlplane ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSBridge BOOTPROTO=static IPADDR=192.168.1.2 NETMASK=255.255.255.0 """ _LINUX_BRIDGE_STATIC = """# This file is autogenerated by os-net-config DEVICE=br-ctlplane ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no TYPE=Bridge DELAY=0 BOOTPROTO=static IPADDR=192.168.1.2 NETMASK=255.255.255.0 """ _OVS_BRIDGE_DHCP_PRIMARY_INTERFACE = _OVS_BRIDGE_DHCP + \ "OVS_EXTRA=\"set bridge br-ctlplane other-config:hwaddr=a1:b2:c3:d4:e5\"\n" _OVS_BRIDGE_DHCP_OVS_EXTRA = _OVS_BRIDGE_DHCP + \ "OVS_EXTRA=\"set bridge br-ctlplane other-config:hwaddr=a1:b2:c3:d4:e5" + \ " -- br-set-external-id br-ctlplane bridge-id br-ctlplane\"\n" _BASE_VLAN = """# This file is autogenerated by os-net-config DEVICE=vlan5 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no VLAN=yes PHYSDEV=em1 """ # vlans on an OVS bridge do not set VLAN=yes or PHYSDEV _BASE_VLAN_OVS = """# This file is autogenerated by os-net-config DEVICE=vlan5 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no """ _VLAN_NO_IP = _BASE_VLAN + "BOOTPROTO=none\n" _VLAN_OVS = _BASE_VLAN_OVS + "DEVICETYPE=ovs\nBOOTPROTO=none\n" _VLAN_OVS_BRIDGE = _BASE_VLAN_OVS + """DEVICETYPE=ovs TYPE=OVSIntPort OVS_BRIDGE=br-ctlplane OVS_OPTIONS="tag=5" BOOTPROTO=none """ _VLAN_LINUX_BRIDGE = _BASE_VLAN_OVS + """VLAN=yes PHYSDEV=em1 BRIDGE=br-ctlplane BOOTPROTO=none """ _OVS_BOND_DHCP = """# This file is autogenerated by os-net-config DEVICE=bond0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no DEVICETYPE=ovs TYPE=OVSBond OVSBOOTPROTO=dhcp BOND_IFACES="em1 em2" """ _LINUX_BOND_DHCP = """# This file is autogenerated by os-net-config DEVICE=bond0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no BOOTPROTO=dhcp """ _LINUX_TEAM_DHCP = """# This file is autogenerated by os-net-config DEVICE=team0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no BOOTPROTO=dhcp DEVICETYPE=Team """ _LINUX_BOND_INTERFACE = _BASE_IFCFG + """MASTER=bond0 SLAVE=yes BOOTPROTO=none """ _LINUX_TEAM_INTERFACE = _BASE_IFCFG + """TEAM_MASTER=team0 BOOTPROTO=none """ _IVS_UPLINK = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ivs IVS_BRIDGE=ivs BOOTPROTO=none """ _IVS_INTERFACE = """# This file is autogenerated by os-net-config DEVICE=storage5 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no TYPE=IVSIntPort DEVICETYPE=ivs IVS_BRIDGE=ivs MTU=1500 BOOTPROTO=static IPADDR=172.16.2.7 NETMASK=255.255.255.0 """ _IVS_CONFIG = ('DAEMON_ARGS=\"--hitless --certificate /etc/ivs ' '--inband-vlan 4092 -u em1 --internal-port=storage5\"') _NFVSWITCH_INTERFACE = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=nfvswitch NFVSWITCH_BRIDGE=nfvswitch BOOTPROTO=none """ _NFVSWITCH_INTERNAL = """# This file is autogenerated by os-net-config DEVICE=storage5 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no TYPE=NFVSWITCHIntPort DEVICETYPE=nfvswitch NFVSWITCH_BRIDGE=nfvswitch MTU=1500 BOOTPROTO=static IPADDR=172.16.2.7 NETMASK=255.255.255.0 """ _NFVSWITCH_CONFIG = ('SETUP_ARGS=\"-c 2,3,4,5 -u em1 -m storage5\"') _OVS_IFCFG_PATCH_PORT = """# This file is autogenerated by os-net-config DEVICE=br-pub-patch ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSPatchPort OVS_BRIDGE=br-ex OVS_PATCH_PEER=br-ex-patch """ _LINUX_TEAM_PRIMARY_IFACE = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no TEAM_MASTER=team1 TEAM_PORT_CONFIG='{"prio": 100}' BOOTPROTO=none """ class TestIfcfgNetConfig(base.TestCase): def setUp(self): super(TestIfcfgNetConfig, self).setUp() self.provider = impl_ifcfg.IfcfgNetConfig() def tearDown(self): super(TestIfcfgNetConfig, self).tearDown() def get_interface_config(self, name='em1'): return self.provider.interface_data[name] def get_vlan_config(self, name='vlan1'): return self.provider.vlan_data[name] def get_linux_bond_config(self, name='bond0'): return self.provider.linuxbond_data[name] def get_linux_team_config(self, name='team0'): return self.provider.linuxteam_data[name] def get_route_config(self, name='em1'): return self.provider.route_data.get(name, '') def get_route6_config(self, name='em1'): return self.provider.route6_data.get(name, '') def test_add_base_interface(self): interface = objects.Interface('em1') self.provider.add_interface(interface) self.assertEqual(_NO_IP, self.get_interface_config()) def test_add_interface_with_hotplug(self): interface = objects.Interface('em1', hotplug=True) self.provider.add_interface(interface) self.assertEqual(_HOTPLUG, self.get_interface_config()) def test_add_base_interface_vlan(self): interface = objects.Interface('em1.120') self.provider.add_interface(interface) self.assertEqual(_IFCFG_VLAN, self.get_interface_config('em1.120')) def test_add_ovs_interface(self): interface = objects.Interface('em1') interface.ovs_port = True self.provider.add_interface(interface) self.assertEqual(_OVS_IFCFG, self.get_interface_config()) def test_add_ovs_tunnel(self): interface = objects.OvsTunnel('tun0') interface.type = 'ovs_tunnel' interface.tunnel_type = 'gre' interface.ovs_options = ['options:remote_ip=192.168.1.1'] interface.bridge_name = 'br-ctlplane' self.provider.add_interface(interface) self.assertEqual(_OVS_IFCFG_TUNNEL, self.get_interface_config('tun0')) def test_add_ovs_patch_port(self): patch_port = objects.OvsPatchPort("br-pub-patch") patch_port.type = 'ovs_patch_port' patch_port.bridge_name = 'br-ex' patch_port.peer = 'br-ex-patch' self.provider.add_interface(patch_port) self.assertEqual(_OVS_IFCFG_PATCH_PORT, self.get_interface_config('br-pub-patch')) def test_add_interface_with_v4(self): v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('em1', addresses=[v4_addr]) self.provider.add_interface(interface) self.assertEqual(_V4_IFCFG, self.get_interface_config()) self.assertEqual('', self.get_route_config()) def test_add_interface_with_v4_multiple(self): addresses = [objects.Address('192.168.1.2/24'), objects.Address('192.168.1.3/32'), objects.Address('10.0.0.2/8')] interface = objects.Interface('em1', addresses=addresses) self.provider.add_interface(interface) self.assertEqual(_V4_IFCFG_MULTIPLE, self.get_interface_config()) self.assertEqual('', self.get_route_config()) def test_add_interface_map_persisted(self): def test_interface_mac(name): macs = {'em1': 'a1:b2:c3:d4:e5'} return macs[name] self.stubs.Set(utils, 'interface_mac', test_interface_mac) nic_mapping = {'nic1': 'em1'} self.stubbed_mapped_nics = nic_mapping v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('nic1', addresses=[v4_addr], nic_mapping=nic_mapping, persist_mapping=True) self.assertEqual('a1:b2:c3:d4:e5', interface.hwaddr) self.provider.add_interface(interface) self.assertEqual(_V4_IFCFG_MAPPED, self.get_interface_config('nic1')) self.assertEqual('', self.get_route_config('nic1')) def test_add_interface_with_v6(self): v6_addr = objects.Address('2001:abc:a::/64') interface = objects.Interface('em1', addresses=[v6_addr]) self.provider.add_interface(interface) self.assertEqual(_V6_IFCFG, self.get_interface_config()) def test_add_interface_with_v6_multiple(self): addresses = [objects.Address('2001:abc:a::/64'), objects.Address('2001:abc:b::1/64'), objects.Address('2001:abc:c::2/96')] interface = objects.Interface('em1', addresses=addresses) self.provider.add_interface(interface) self.assertEqual(_V6_IFCFG_MULTIPLE, self.get_interface_config()) def test_network_with_routes(self): route1 = objects.Route('192.168.1.1', default=True, route_options="metric 10") route2 = objects.Route('192.168.1.1', '172.19.0.0/24') route3 = objects.Route('192.168.1.5', '172.20.0.0/24', route_options="metric 100") v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('em1', addresses=[v4_addr], routes=[route1, route2, route3]) self.provider.add_interface(interface) self.assertEqual(_V4_IFCFG, self.get_interface_config()) self.assertEqual(_ROUTES, self.get_route_config()) def test_network_with_ipv6_routes(self): route1 = objects.Route('192.168.1.1', default=True, route_options="metric 10") route2 = objects.Route('192.168.1.1', '172.19.0.0/24') route3 = objects.Route('192.168.1.5', '172.20.0.0/24', route_options="metric 100") route4 = objects.Route('2001:db8::1', default=True) route5 = objects.Route('fd00:fd00:2000::1', '2001:db8:dead:beef:cafe::/56') route6 = objects.Route('fd00:fd00:2000::1', '2001:db8:dead:beff::/64', route_options="metric 100") v4_addr = objects.Address('192.168.1.2/24') v6_addr = objects.Address('2001:abc:a::/64') interface = objects.Interface('em1', addresses=[v4_addr, v6_addr], routes=[route1, route2, route3, route4, route5, route6]) self.provider.add_interface(interface) self.assertEqual(_V4_V6_IFCFG, self.get_interface_config()) self.assertEqual(_ROUTES_V6, self.get_route6_config()) def test_network_ovs_bridge_with_dhcp(self): interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_DHCP, self.provider.bridge_data['br-ctlplane']) def test_network_ovs_bridge_with_standalone_fail_mode(self): interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface], fail_mode='standalone') self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_DHCP_STANDALONE, self.provider.bridge_data['br-ctlplane']) def test_network_ovs_bridge_with_secure_fail_mode(self): interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface], fail_mode='secure') self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_DHCP_SECURE, self.provider.bridge_data['br-ctlplane']) def test_network_linux_bridge_with_dhcp(self): interface = objects.Interface('em1') bridge = objects.LinuxBridge('br-ctlplane', use_dhcp=True, members=[interface]) self.provider.add_linux_bridge(bridge) self.provider.add_interface(interface) self.assertEqual(_LINUX_BRIDGE_IFCFG, self.get_interface_config()) self.assertEqual(_LINUX_BRIDGE_DHCP, self.provider.linuxbridge_data['br-ctlplane']) def test_network_ovs_bridge_static(self): v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', members=[interface], addresses=[v4_addr]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_STATIC, self.provider.bridge_data['br-ctlplane']) def test_network_ovs_bridge_with_tunnel(self): interface = objects.OvsTunnel('tun0') interface.type = 'ovs_tunnel' interface.tunnel_type = 'gre' interface.ovs_options = ['options:remote_ip=192.168.1.1'] interface.bridge_name = 'br-ctlplane' self.provider.add_interface(interface) v4_addr = objects.Address('192.168.1.2/24') bridge = objects.OvsBridge('br-ctlplane', members=[interface], addresses=[v4_addr]) self.provider.add_bridge(bridge) self.provider.add_interface(interface) self.assertEqual(_OVS_IFCFG_TUNNEL, self.get_interface_config('tun0')) self.assertEqual(_OVS_BRIDGE_STATIC, self.provider.bridge_data['br-ctlplane']) def test_network_linux_bridge_static(self): v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('em1') bridge = objects.LinuxBridge('br-ctlplane', members=[interface], addresses=[v4_addr]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_LINUX_BRIDGE_IFCFG, self.get_interface_config()) self.assertEqual(_LINUX_BRIDGE_STATIC, self.provider.bridge_data['br-ctlplane']) def test_network_ovs_bridge_with_dhcp_primary_interface(self): def test_interface_mac(name): return "a1:b2:c3:d4:e5" self.stubs.Set(utils, 'interface_mac', test_interface_mac) interface = objects.Interface('em1', primary=True) bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_DHCP_PRIMARY_INTERFACE, self.provider.bridge_data['br-ctlplane']) def test_network_ovs_bridge_with_dhcp_primary_interface_with_extra(self): def test_interface_mac(name): return "a1:b2:c3:d4:e5" self.stubs.Set(utils, 'interface_mac', test_interface_mac) interface = objects.Interface('em1', primary=True) ovs_extra = "br-set-external-id br-ctlplane bridge-id br-ctlplane" bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface], ovs_extra=[ovs_extra]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_DHCP_OVS_EXTRA, self.provider.bridge_data['br-ctlplane']) def test_network_ovs_bridge_with_dhcp_primary_interface_with_format(self): def test_interface_mac(name): return "a1:b2:c3:d4:e5" self.stubs.Set(utils, 'interface_mac', test_interface_mac) interface = objects.Interface('em1', primary=True) ovs_extra = "br-set-external-id {name} bridge-id {name}" bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface], ovs_extra=[ovs_extra]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.assertEqual(_OVS_INTERFACE, self.get_interface_config()) self.assertEqual(_OVS_BRIDGE_DHCP_OVS_EXTRA, self.provider.bridge_data['br-ctlplane']) def test_network_ivs_with_uplink_and_interface(self): interface = objects.Interface('em1') v4_addr = objects.Address('172.16.2.7/24') ivs_interface = objects.IvsInterface(vlan_id=5, name='storage', addresses=[v4_addr]) bridge = objects.IvsBridge(members=[interface, ivs_interface]) self.provider.add_interface(interface) self.provider.add_ivs_interface(ivs_interface) self.provider.add_bridge(bridge) self.assertEqual(_IVS_UPLINK, self.get_interface_config()) self.assertEqual(_IVS_INTERFACE, self.provider.ivsinterface_data[ivs_interface.name]) data = self.provider.generate_ivs_config(['em1'], ['storage5']) self.assertEqual(_IVS_CONFIG, data) def test_network_nfvswitch_with_interfaces_and_internal_interfaces(self): interface = objects.Interface('em1') v4_addr = objects.Address('172.16.2.7/24') nfvswitch_internal = objects.NfvswitchInternal(vlan_id=5, name='storage', addresses=[v4_addr]) iface_name = nfvswitch_internal.name bridge = objects.NfvswitchBridge(members=[interface, nfvswitch_internal], options="-c 2,3,4,5") self.provider.add_interface(interface) self.provider.add_nfvswitch_internal(nfvswitch_internal) self.provider.add_nfvswitch_bridge(bridge) self.assertEqual(_NFVSWITCH_INTERFACE, self.get_interface_config()) self.assertEqual(_NFVSWITCH_INTERNAL, self.provider.nfvswitch_intiface_data[iface_name]) data = self.provider.generate_nfvswitch_config(['em1'], ['storage5']) self.assertEqual(_NFVSWITCH_CONFIG, data) def test_add_ib_interface_with_v4_multiple(self): addresses = [objects.Address('192.168.1.2/24'), objects.Address('192.168.1.3/32'), objects.Address('10.0.0.2/8')] ib_interface = objects.IbInterface('ib0', addresses=addresses) self.provider.add_interface(ib_interface) self.assertEqual(_IB_V4_IFCFG_MULTIPLE, self.get_interface_config('ib0')) self.assertEqual('', self.get_route_config()) def test_add_vlan(self): vlan = objects.Vlan('em1', 5) self.provider.add_vlan(vlan) self.assertEqual(_VLAN_NO_IP, self.get_vlan_config('vlan5')) def test_add_vlan_ovs(self): vlan = objects.Vlan('em1', 5) vlan.ovs_port = True self.provider.add_vlan(vlan) self.assertEqual(_VLAN_OVS, self.get_vlan_config('vlan5')) def test_add_vlan_mtu_1500(self): vlan = objects.Vlan('em1', 5, mtu=1500) self.provider.add_vlan(vlan) expected = _VLAN_NO_IP + 'MTU=1500\n' self.assertEqual(expected, self.get_vlan_config('vlan5')) def test_add_ovs_bridge_with_vlan(self): vlan = objects.Vlan('em1', 5) bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[vlan]) self.provider.add_vlan(vlan) self.provider.add_bridge(bridge) self.assertEqual(_VLAN_OVS_BRIDGE, self.get_vlan_config('vlan5')) def test_add_linux_bridge_with_vlan(self): vlan = objects.Vlan('em1', 5) bridge = objects.LinuxBridge('br-ctlplane', use_dhcp=True, members=[vlan]) self.provider.add_vlan(vlan) self.provider.add_bridge(bridge) self.assertEqual(_VLAN_LINUX_BRIDGE, self.get_vlan_config('vlan5')) def test_ovs_bond(self): interface1 = objects.Interface('em1') interface2 = objects.Interface('em2') bond = objects.OvsBond('bond0', use_dhcp=True, members=[interface1, interface2]) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.provider.add_bond(bond) self.assertEqual(_NO_IP, self.get_interface_config('em1')) em2_config = """# This file is autogenerated by os-net-config DEVICE=em2 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no BOOTPROTO=none """ self.assertEqual(em2_config, self.get_interface_config('em2')) self.assertEqual(_OVS_BOND_DHCP, self.get_interface_config('bond0')) def test_linux_bond(self): interface1 = objects.Interface('em1') interface2 = objects.Interface('em2') bond = objects.LinuxBond('bond0', use_dhcp=True, members=[interface1, interface2]) self.provider.add_linux_bond(bond) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.assertEqual(_LINUX_BOND_DHCP, self.get_linux_bond_config('bond0')) self.assertEqual(_LINUX_BOND_INTERFACE, self.get_interface_config('em1')) def test_linux_team(self): interface1 = objects.Interface('em1') interface2 = objects.Interface('em2') team = objects.LinuxTeam('team0', use_dhcp=True, members=[interface1, interface2]) self.provider.add_linux_team(team) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.assertEqual(_LINUX_TEAM_DHCP, self.get_linux_team_config('team0')) self.assertEqual(_LINUX_TEAM_INTERFACE, self.get_interface_config('em1')) def test_interface_defroute(self): interface1 = objects.Interface('em1') interface2 = objects.Interface('em2', defroute=False) self.provider.add_interface(interface1) self.provider.add_interface(interface2) em1_config = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no BOOTPROTO=none """ em2_config = """# This file is autogenerated by os-net-config DEVICE=em2 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no BOOTPROTO=none DEFROUTE=no """ self.assertEqual(em1_config, self.get_interface_config('em1')) self.assertEqual(em2_config, self.get_interface_config('em2')) def test_interface_dhclient_opts(self): interface1 = objects.Interface('em1', dhclient_args='--foobar') self.provider.add_interface(interface1) em1_config = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no BOOTPROTO=none DHCLIENTARGS=--foobar """ self.assertEqual(em1_config, self.get_interface_config('em1')) def test_interface_ethtool_opts(self): interface1 = objects.Interface('em1', ethtool_opts='speed 1000 duplex full') self.provider.add_interface(interface1) em1_config = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no BOOTPROTO=none ETHTOOL_OPTS=\"speed 1000 duplex full\" """ self.assertEqual(em1_config, self.get_interface_config('em1')) def test_interface_single_dns_server(self): interface1 = objects.Interface('em1', dns_servers=['1.2.3.4']) self.provider.add_interface(interface1) em1_config = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no BOOTPROTO=none DNS1=1.2.3.4 """ self.assertEqual(em1_config, self.get_interface_config('em1')) def test_interface_dns_servers(self): interface1 = objects.Interface('em1', dns_servers=['1.2.3.4', '5.6.7.8']) self.provider.add_interface(interface1) em1_config = """# This file is autogenerated by os-net-config DEVICE=em1 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no BOOTPROTO=none DNS1=1.2.3.4 DNS2=5.6.7.8 """ self.assertEqual(em1_config, self.get_interface_config('em1')) def test_nm_controlled(self): interface1 = objects.Interface('em1', nm_controlled=True) interface2 = objects.Interface('em2', nm_controlled=True) bond = objects.LinuxBond('bond1', nm_controlled=True, members=[interface1, interface2]) self.provider.add_linux_bond(bond) self.provider.add_interface(interface1) self.provider.add_interface(interface2) ifcfg_data = self.get_interface_config('em1') self.assertEqual(_NM_CONTROLLED_INTERFACE, ifcfg_data) bond_data = self.get_linux_bond_config('bond1') self.assertEqual(_NM_CONTROLLED_BOND, bond_data) def test_network_ovs_dpdk_bridge_and_port(self): nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'} self.stubbed_mapped_nics = nic_mapping interface = objects.Interface(name='nic3') dpdk_port = objects.OvsDpdkPort(name='dpdk0', members=[interface]) bridge = objects.OvsUserBridge('br-link', members=[dpdk_port]) def test_bind_dpdk_interfaces(ifname, driver, noop): self.assertEqual(ifname, 'eth2') self.assertEqual(driver, 'vfio-pci') self.stubs.Set(utils, 'bind_dpdk_interfaces', test_bind_dpdk_interfaces) self.provider.add_ovs_dpdk_port(dpdk_port) self.provider.add_ovs_user_bridge(bridge) br_link_config = """# This file is autogenerated by os-net-config DEVICE=br-link ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSUserBridge """ dpdk0_config = """# This file is autogenerated by os-net-config DEVICE=dpdk0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSDPDKPort OVS_BRIDGE=br-link """ self.assertEqual(br_link_config, self.provider.bridge_data['br-link']) self.assertEqual(dpdk0_config, self.get_interface_config('dpdk0')) def test_network_ovs_dpdk_bridge_and_port_with_mtu_rxqueue(self): nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'} self.stubbed_mapped_nics = nic_mapping interface = objects.Interface(name='nic3') dpdk_port = objects.OvsDpdkPort(name='dpdk0', members=[interface], mtu=9000, rx_queue=4) bridge = objects.OvsUserBridge('br-link', members=[dpdk_port]) def test_bind_dpdk_interfaces(ifname, driver, noop): self.assertEqual(ifname, 'eth2') self.assertEqual(driver, 'vfio-pci') self.stubs.Set(utils, 'bind_dpdk_interfaces', test_bind_dpdk_interfaces) self.provider.add_ovs_dpdk_port(dpdk_port) self.provider.add_ovs_user_bridge(bridge) br_link_config = """# This file is autogenerated by os-net-config DEVICE=br-link ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSUserBridge """ dpdk0_config = """# This file is autogenerated by os-net-config DEVICE=dpdk0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSDPDKPort OVS_BRIDGE=br-link RX_QUEUE=4 MTU=9000 OVS_EXTRA="set Interface $DEVICE mtu_request=$MTU \ -- set Interface $DEVICE options:n_rxq=$RX_QUEUE" """ self.assertEqual(br_link_config, self.provider.bridge_data['br-link']) self.assertEqual(dpdk0_config, self.get_interface_config('dpdk0')) def test_network_ovs_dpdk_bond(self): nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'} self.stubbed_mapped_nics = nic_mapping iface0 = objects.Interface(name='nic2') dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0]) iface1 = objects.Interface(name='nic3') dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1]) bond = objects.OvsDpdkBond('dpdkbond0', members=[dpdk0, dpdk1]) bridge = objects.OvsUserBridge('br-link', members=[bond]) def test_bind_dpdk_interfaces(ifname, driver, noop): self.assertIn(ifname, ['eth1', 'eth2']) self.assertEqual(driver, 'vfio-pci') self.stubs.Set(utils, 'bind_dpdk_interfaces', test_bind_dpdk_interfaces) self.provider.add_ovs_dpdk_bond(bond) self.provider.add_ovs_user_bridge(bridge) dpdk_bond_config = """# This file is autogenerated by os-net-config DEVICE=dpdkbond0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSDPDKBond OVS_BRIDGE=br-link BOND_IFACES="dpdk0 dpdk1" """ self.assertEqual(dpdk_bond_config, self.get_interface_config('dpdkbond0')) def test_network_ovs_dpdk_bond_with_mtu(self): nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'} self.stubbed_mapped_nics = nic_mapping iface0 = objects.Interface(name='nic2') dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0]) iface1 = objects.Interface(name='nic3') dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1]) bond = objects.OvsDpdkBond('dpdkbond0', mtu=9000, members=[dpdk0, dpdk1]) bridge = objects.OvsUserBridge('br-link', members=[bond]) def test_bind_dpdk_interfaces(ifname, driver, noop): self.assertIn(ifname, ['eth1', 'eth2']) self.assertEqual(driver, 'vfio-pci') self.stubs.Set(utils, 'bind_dpdk_interfaces', test_bind_dpdk_interfaces) self.provider.add_ovs_dpdk_bond(bond) self.provider.add_ovs_user_bridge(bridge) dpdk_bond_config = """# This file is autogenerated by os-net-config DEVICE=dpdkbond0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSDPDKBond OVS_BRIDGE=br-link BOND_IFACES="dpdk0 dpdk1" MTU=9000 OVS_EXTRA="set Interface dpdk0 mtu_request=$MTU \ -- set Interface dpdk1 mtu_request=$MTU" """ self.assertEqual(dpdk_bond_config, self.get_interface_config('dpdkbond0')) def test_network_ovs_dpdk_bond_with_rx_queue(self): nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'} self.stubbed_mapped_nics = nic_mapping iface0 = objects.Interface(name='nic2') dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0]) iface1 = objects.Interface(name='nic3') dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1]) bond = objects.OvsDpdkBond('dpdkbond0', rx_queue=4, members=[dpdk0, dpdk1]) bridge = objects.OvsUserBridge('br-link', members=[bond]) def test_bind_dpdk_interfaces(ifname, driver, noop): self.assertIn(ifname, ['eth1', 'eth2']) self.assertEqual(driver, 'vfio-pci') self.stubs.Set(utils, 'bind_dpdk_interfaces', test_bind_dpdk_interfaces) self.provider.add_ovs_dpdk_bond(bond) self.provider.add_ovs_user_bridge(bridge) dpdk_bond_config = """# This file is autogenerated by os-net-config DEVICE=dpdkbond0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSDPDKBond OVS_BRIDGE=br-link BOND_IFACES="dpdk0 dpdk1" RX_QUEUE=4 OVS_EXTRA="set Interface dpdk0 options:n_rxq=$RX_QUEUE \ -- set Interface dpdk1 options:n_rxq=$RX_QUEUE" """ self.assertEqual(dpdk_bond_config, self.get_interface_config('dpdkbond0')) def test_network_ovs_dpdk_bond_with_mtu_and_rx_queue(self): nic_mapping = {'nic1': 'eth0', 'nic2': 'eth1', 'nic3': 'eth2'} self.stubbed_mapped_nics = nic_mapping iface0 = objects.Interface(name='nic2') dpdk0 = objects.OvsDpdkPort(name='dpdk0', members=[iface0]) iface1 = objects.Interface(name='nic3') dpdk1 = objects.OvsDpdkPort(name='dpdk1', members=[iface1]) bond = objects.OvsDpdkBond('dpdkbond0', rx_queue=4, mtu=9000, members=[dpdk0, dpdk1]) bridge = objects.OvsUserBridge('br-link', members=[bond]) def test_bind_dpdk_interfaces(ifname, driver, noop): self.assertIn(ifname, ['eth1', 'eth2']) self.assertEqual(driver, 'vfio-pci') self.stubs.Set(utils, 'bind_dpdk_interfaces', test_bind_dpdk_interfaces) self.provider.add_ovs_dpdk_bond(bond) self.provider.add_ovs_user_bridge(bridge) dpdk_bond_config = """# This file is autogenerated by os-net-config DEVICE=dpdkbond0 ONBOOT=yes HOTPLUG=no NM_CONTROLLED=no PEERDNS=no DEVICETYPE=ovs TYPE=OVSDPDKBond OVS_BRIDGE=br-link BOND_IFACES="dpdk0 dpdk1" RX_QUEUE=4 MTU=9000 OVS_EXTRA="set Interface dpdk0 mtu_request=$MTU \ -- set Interface dpdk1 mtu_request=$MTU \ -- set Interface dpdk0 options:n_rxq=$RX_QUEUE \ -- set Interface dpdk1 options:n_rxq=$RX_QUEUE" """ self.assertEqual(dpdk_bond_config, self.get_interface_config('dpdkbond0')) class TestIfcfgNetConfigApply(base.TestCase): def setUp(self): super(TestIfcfgNetConfigApply, self).setUp() self.temp_ifcfg_file = tempfile.NamedTemporaryFile() self.temp_bond_file = tempfile.NamedTemporaryFile() self.temp_route_file = tempfile.NamedTemporaryFile() self.temp_route6_file = tempfile.NamedTemporaryFile() self.temp_bridge_file = tempfile.NamedTemporaryFile() self.temp_cleanup_file = tempfile.NamedTemporaryFile(delete=False) self.ifup_interface_names = [] self.ovs_appctl_cmds = [] self.stop_dhclient_interfaces = [] def test_ifcfg_path(name): return self.temp_ifcfg_file.name self.stubs.Set(impl_ifcfg, 'ifcfg_config_path', test_ifcfg_path) def test_remove_ifcfg_config(name): ifcfg_file = self.temp_ifcfg_file.name if os.path.exists(ifcfg_file): os.remove(ifcfg_file) self.stubs.Set(impl_ifcfg, 'remove_ifcfg_config', test_remove_ifcfg_config) def test_routes_path(name): return self.temp_route_file.name self.stubs.Set(impl_ifcfg, 'route_config_path', test_routes_path) def test_routes6_path(name): return self.temp_route6_file.name self.stubs.Set(impl_ifcfg, 'route6_config_path', test_routes6_path) def test_bridge_path(name): return self.temp_bridge_file.name self.stubs.Set(impl_ifcfg, 'bridge_config_path', test_bridge_path) def test_cleanup_pattern(): return self.temp_cleanup_file.name self.stubs.Set(impl_ifcfg, 'cleanup_pattern', test_cleanup_pattern) def test_stop_dhclient_process(interface): self.stop_dhclient_interfaces.append(interface) self.stubs.Set(impl_ifcfg, 'stop_dhclient_process', test_stop_dhclient_process) def test_execute(*args, **kwargs): if args[0] == '/sbin/ifup': self.ifup_interface_names.append(args[1]) elif args[0] == '/bin/ovs-appctl': self.ovs_appctl_cmds.append(' '.join(args)) pass self.stubs.Set(processutils, 'execute', test_execute) self.provider = impl_ifcfg.IfcfgNetConfig() def tearDown(self): self.temp_ifcfg_file.close() self.temp_route_file.close() self.temp_route6_file.close() self.temp_bridge_file.close() if os.path.exists(self.temp_cleanup_file.name): self.temp_cleanup_file.close() super(TestIfcfgNetConfigApply, self).tearDown() def test_network_apply(self): route1 = objects.Route('192.168.1.1', default=True, route_options="metric 10") route2 = objects.Route('192.168.1.1', '172.19.0.0/24') route3 = objects.Route('192.168.1.5', '172.20.0.0/24', route_options="metric 100") v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('em1', addresses=[v4_addr], routes=[route1, route2, route3]) self.provider.add_interface(interface) self.provider.apply() ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name) self.assertEqual(_V4_IFCFG, ifcfg_data) route_data = utils.get_file_data(self.temp_route_file.name) self.assertEqual(_ROUTES, route_data) def test_dhcp_ovs_bridge_network_apply(self): interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.provider.apply() ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name) self.assertEqual(_OVS_INTERFACE, ifcfg_data) bridge_data = utils.get_file_data(self.temp_bridge_file.name) self.assertEqual(_OVS_BRIDGE_DHCP, bridge_data) route_data = utils.get_file_data(self.temp_route_file.name) self.assertEqual("", route_data) def test_dhclient_stop_on_iface_activate(self): self.stop_dhclient_interfaces = [] v4_addr = objects.Address('192.168.1.2/24') interface = objects.Interface('em1', addresses=[v4_addr]) interface2 = objects.Interface('em2', use_dhcp=True) interface3 = objects.Interface('em3', use_dhcp=False) self.provider.add_interface(interface) self.provider.add_interface(interface2) self.provider.add_interface(interface3) self.provider.apply() # stop dhclient on em1 due to static IP and em3 due to no IP self.assertIn('em1', self.stop_dhclient_interfaces) self.assertIn('em3', self.stop_dhclient_interfaces) self.assertNotIn('em2', self.stop_dhclient_interfaces) def test_apply_noactivate(self): interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.provider.apply(activate=False) self.assertEqual([], self.ifup_interface_names) def test_bond_active_slave(self): # setup and apply a bond interface1 = objects.Interface('em1') interface2 = objects.Interface('em2', primary=True) bond = objects.OvsBond('bond1', use_dhcp=True, members=[interface1, interface2]) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.provider.add_bond(bond) self.provider.apply() ovs_appctl_cmds = '/bin/ovs-appctl bond/set-active-slave bond1 em2' self.assertIn(ovs_appctl_cmds, self.ovs_appctl_cmds) def test_bond_active_ordering(self): # setup and apply a bond interface1 = objects.Interface('em1') interface2 = objects.Interface('em2') bond = objects.OvsBond('bond1', use_dhcp=True, members=[interface1, interface2]) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.provider.add_bond(bond) self.provider.apply() ovs_appctl_cmds = '/bin/ovs-appctl bond/set-active-slave bond1 em1' self.assertIn(ovs_appctl_cmds, self.ovs_appctl_cmds) def test_restart_children_on_change(self): # setup and apply a bridge interface = objects.Interface('em1') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.provider.apply() self.assertIn('em1', self.ifup_interface_names) self.assertIn('br-ctlplane', self.ifup_interface_names) # changing the bridge should restart the interface too self.ifup_interface_names = [] bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False, members=[interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.provider.apply() self.assertIn('em1', self.ifup_interface_names) # test infiniband interfaces act as proper bridge members ib_interface = objects.IbInterface('ib0') bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[ib_interface]) self.provider.add_interface(ib_interface) self.provider.add_bridge(bridge) self.provider.apply() self.assertIn('ib0', self.ifup_interface_names) self.assertIn('br-ctlplane', self.ifup_interface_names) # changing the bridge should restart the interface too self.ifup_interface_names = [] bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False, members=[ib_interface]) self.provider.add_interface(interface) self.provider.add_bridge(bridge) self.provider.apply() self.assertIn('ib0', self.ifup_interface_names) # setup and apply a bond on a bridge self.ifup_interface_names = [] interface1 = objects.Interface('em1') interface2 = objects.Interface('em2') bond = objects.OvsBond('bond0', members=[interface1, interface2]) bridge = objects.OvsBridge('br-ctlplane', use_dhcp=True, members=[bond]) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.provider.add_bond(bond) self.provider.add_bridge(bridge) self.provider.apply() # changing the bridge should restart everything self.ifup_interface_names = [] bridge = objects.OvsBridge('br-ctlplane', use_dhcp=False, members=[bond]) self.provider.add_interface(interface1) self.provider.add_interface(interface2) self.provider.add_bond(bond) self.provider.add_bridge(bridge) self.provider.apply() self.assertIn('br-ctlplane', self.ifup_interface_names) self.assertIn('bond0', self.ifup_interface_names) self.assertIn('em1', self.ifup_interface_names) self.assertIn('em2', self.ifup_interface_names) def test_restart_interface_counts(self): interface = objects.Interface('em1') self.provider.add_interface(interface) interface2 = objects.Interface('em2') self.provider.add_interface(interface2) self.provider.apply() self.assertEqual(1, self.ifup_interface_names.count("em1")) self.assertEqual(1, self.ifup_interface_names.count("em2")) def test_vlan_apply(self): vlan = objects.Vlan('em1', 5) self.provider.add_vlan(vlan) self.provider.apply() ifcfg_data = utils.get_file_data(self.temp_ifcfg_file.name) self.assertEqual(_VLAN_NO_IP, ifcfg_data) def test_cleanup(self): self.provider.apply(cleanup=True) self.assertTrue(not os.path.exists(self.temp_cleanup_file.name)) def test_cleanup_not_loopback(self): tmp_lo_file = '%s-lo' % self.temp_cleanup_file.name utils.write_config(tmp_lo_file, 'foo') def test_cleanup_pattern(): return '%s-*' % self.temp_cleanup_file.name self.stubs.Set(impl_ifcfg, 'cleanup_pattern', test_cleanup_pattern) self.provider.apply(cleanup=True) self.assertTrue(os.path.exists(tmp_lo_file)) os.remove(tmp_lo_file) def test_ovs_restart_called(self): interface = objects.Interface('em1') dpdk_port = objects.OvsDpdkPort('dpdk0', members=[interface]) execute_strings = [] def test_execute(*args, **kwargs): execute_strings.append(args[1]) self.stubs.Set(NetConfig, 'execute', test_execute) self.provider.noop = True self.provider.add_ovs_dpdk_port(dpdk_port) self.provider.apply() self.assertIn('Restart openvswitch', execute_strings) def test_ovs_restart_not_called(self): interface = objects.Interface('em1') execute_strings = [] def test_execute(*args, **kwargs): execute_strings.append(args[1]) self.stubs.Set(NetConfig, 'execute', test_execute) self.provider.noop = True self.provider.add_interface(interface) self.provider.apply() self.assertNotIn('Restart openvswitch', execute_strings)