diff options
30 files changed, 3337 insertions, 165 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e881817 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +ovn4nfvk8s* +.tox/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8534dae --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +# SPDX-license-identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2018 Intel Corporation +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +GOPATH := $(shell realpath "$(PWD)/../../") + +export GOPATH ... +export GO111MODULE=on + +.PHONY: all +all: clean ovn4nfvk8s ovn4nfvk8s-cni + +ovn4nfvk8s: + @go build ./cmd/ovn4nfvk8s + +ovn4nfvk8s-cni: + @go build ./cmd/ovn4nfvk8s-cni + +test: + @go test -v ./... + +clean: + @rm -f ovn4nfvk8s* + diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..16d19d5 --- /dev/null +++ b/README.rst @@ -0,0 +1,198 @@ +.. Copyright 2018 Intel Corporation. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +================= +OVN4NFVK8s Plugin +================= + +Problem statement +----------------- + +Networking applications are of three types - Management applications, +Control plane applications and data plane applications. Management +and control plane applications are similar to Enterprise applications, +but data plane applications different in following aspects: + +- Multiple virtual network interfaces +- Multiple IP addresses +- SRIOV networking support +- Programmable virtual switch (for service function chaining, to tap + the traffic for visibility etc..) + +Kubernetes (Simply K8S) is the most popular container orchestrator. +K8S is supported by GCE, AZURE and AWS and will be supported by +Akraino Edge stack that enable edge clouds. + +K8S has being enhanced to support VM workload types, this helps +cloud providers that need to migrate legacy workloads to microservices +architecture. Cloud providers may continue to support VM workload +types for security reasons and hence there is need for VIM that +support both VMs and containers. Since same K8S instance can +orchestrate both VM and container workload types, same compute nodes +can be leveraged for both VMs and containers. Telco and CSPs are +seeing similar need to deploy networking applications as containers. + +Since, both VMs and container workloads are used for networking +applications, there would be need for + +- Sharing the networks across VMs and containers. +- Sharing the volumes across VMs and containers. + +**Network Function Virtualization Requirements** + +NFV workloads can be, + +- Management plane workloads +- Control plane work loads +- User plane (data plane workloads) +- User plane workloads normally have +- Multiple interfaces, Multiple subnets, Multiple virtual networks +- NFV workloads typically have its own management network. +- Some data plane workloads require SR-IOV NIC support for data + interfaces and virtual NIC for other interfaces (for performance + reasons) +- Need for multiple CNIs. +- NFV workloads require dynamic creation of virtual networks. Dynamic + configuration of subnets. + +New Proposal +------------ + +A new plugin addressing the below requirements, + +- For networking workloads as well typical application workloads +- Multi-interface support +- Multi-IP address support +- Dynamic creation of virtual networks +- Co-existing with SRIOV and other CNIs. +- Route management across virtual networks and external networks + +**OVN Background** + +OVN, the Open Virtual Network, is a system to support virtual network +abstraction. OVN complements the existing capabilities of OVS to add +native support for virtual network abstractions, such as virtual L2 +and L3 overlays and security groups. Services such as DHCP are also +desirable features. Just like OVS, OVN’s design goal is to have a +production quality implementation that can operate at significant +scale. + +**K8S-OVN4NFV Plugin development** + +Some code and ideas are being taken from ovn-kubernetes_ plugin +work that was done as part of OVN project. Due to good number of +changes, it is a new plugin with its own code base. This plugin +assumes that the first interface in a Pod is provided by some other +Plugin/CNI like Flannel or even OVN-Kubernetes and this plugin is +only responsible to add multiple interfaces based on the Pod +annotations. This plugin is currently tested to work with Multus as +CNI and Flannel as first interface. + +Its functionality is divided into to following: + +- Initialization: + + - Register itself as watcher to K8S API Server to receive POD events + and service events. + - Creates a distributed router + - Creates gateway + - Creates a logical switch to connect distributed router with + Gateway. + - Creates a subnet between distributed router & Gateway. + - Assigns first two IP addresses of the subnet to router and + Gateway. + - Created router port and gateway port as part of assigning IP + address and MAC addresses. + +- Watcher: + + - Upon POD bring up event + + - Checks the annotations specific to OVN. + - For each network on which POD is going to be brought up + - Validates whether the logical switch is already present. If not, + it is considered as error. + - If IP address and MAC addresses are not static, it asks OVN to + assign IP and MAC address. + - Collects all IP addresses/MAC addresses assigned. Puts them as + annotations (dynamic information) for that POD. + + - Upon POD deletion event + + - Returns the IP address and MAC address back to OVN pool. + +- OVN CNI + +This is present in every minion node. CNI is expected to be called +once for all OVN networks either Kubelet directly or via Multus. + + - Add: + + - Wait for annotations to be filled up by the watcher. From + annotations, it knows set of IP Address, MAC address and Routes + to be added. + - Using network APIs for each element in the set: + - Creates veth pair. + - Assigns the IP address and MAC address to one end of veth pair. + Other end veth pair is assigned to br-int. + - Creates routes based on the route list provided in annotations. + + - If isDefaultRoute is set in annotations, it creates default route + using this veth. + - Delete + + - Removes veth pair. + - Removes routes. + +**Figure** + +.. code-block:: raw + + +-----------------+ + | | + | | Program OVN Switch + |ovn4nfvk8s Plugin| +------------------+ + | +--------------------->| | + | | | OVN Switch | + | | | | + | | +------------------+ + +----+----------+-+ + ^ | + | | + |On Event |Annotate Pod + | | + | v + +----+--------------+ +------------------+ +-----------+ + | | | | | Pod | + | Kube API +--------> Kube Scheduler +---------->| | + | | | | +--------+--+ + | | +--------+---------+ | + +-------------------+ | | + | | + | |Assign IP & MAC + +--------v-----------+ | + | | | + | ovn4nfvk8s-cni | | + | +------------------+ + +--------------------+ + + + Complete Architecture can be found in ovn-kubernetes documentation at github + + +**References** + +.. _ovn-kubernetes: https://wiki.opnfv.org/display/OV/K8S+OVN+NFV+Plugin + +**Authors/Contributors** + +Addepalli, Srinivasa R <srinivasa.r.addepalli@intel.com> +Sood, Ritu <ritu.sood@intel.com> diff --git a/Readme.rst b/Readme.rst deleted file mode 100644 index b7da2c1..0000000 --- a/Readme.rst +++ /dev/null @@ -1,165 +0,0 @@ -================= -OVN4NFVK8s Plugin -================= - -**Kubernetes Background** - -Kubernetes (Simply K8S) is one of the popular container orchestrators. K8S is supported by GCE, AZURE and AWS and will be supported by Akraino Edge stack that enable edge clouds. K8S is also being enhanced to support VM workload types too. This helps cloud-regions that need to support VMs while moving some workloads to containers. For security reason, cloud-regions may continue to support VM types for security reasons and hence there is need for VIM that support both VMs and containers. Since same K8S instance can orchestrate both VM and container workload types, same compute nodes can be leveraged for both VMs and containers. Telco and CSPs are seeing similar need to deploy networking applications as containers. - -There are few differences between containers for Enterprise applications and networking applications. -Networking applications are of three types - Management applications, Control plane applications and data plane applications. Management and control plane applications are similar to Enterprise applications, but data plane applications different in following aspects: - - - Multiple virtual network interfaces - - Multiple IP addresses - - SRIOV networking support - - Programmable virtual switch (for service function chaining, to tap the traffic for visibility etc..) - -Since, both VMs and container workloads are used for networking applications, there would be need for - - - Sharing the networks across VMs and containers. - - Sharing the volumes across VMs and containers. - -**OVN Background** - -OVN, the Open Virtual Network, is a system to support virtual network abstraction. OVN complements the existing capabilities of OVS to add native support for virtual network abstractions, such as virtual L2 and L3 overlays and security groups. Services such as DHCP are also desirable features. Just like OVS, OVN’s design goal is to have a production quality implementation that can operate at significant scale. -An OVN deployment consists of several components: - - - A Cloud Management System (CMS), which is OVN’s ultimate client (via its users and administrators). OVN integration requires installing a CMS-specific plugin and related software (see below). OVN initially targets Open‐ Stack as CMS. - - An OVN Database physical or virtual node (or, eventually, cluster) installed in a central location. - - One or more (usually many) hypervisors. Hypervisors must run Open vSwitch. Any hypervisor platform supported by Open vSwitch is acceptable. - - Zero or more gateways. A gateway extends a tunnel-based logical network into a physical network by bidirectionally forwarding packets between tunnels and a physical Ethernet port. This allows non-virtualized machines to participate in logical networks. A gateway may be a physical host, a virtual machine, or an ASIC-based hardware switch that supports the vtep(5) schema. Hypervisors and gateways are together called transport node or chassis. - -**NFV Requirements** -NFV workloads can be, - - → Management plane workloads - - → Control plane work loads - - → User plane (data plane workloads) - - → User plane workloads normally have - - → Multiple interfaces, Multiple subnets, Multiple virtual networks - - → NFV workloads typically have its own management network. - - → Some data plane workloads require SR-IOV NIC support for data interfaces and virtual NIC for other interfaces (for performance reasons) - - → Need for multiple CNIs. - - → NFV workloads require dynamic creation of virtual networks. Dynamic configuration of subnets. - -**New Proposal** - -A New plugin addressing the below requirements, - -- For networking workloads as well typical application workloads -- Multi-interface support -- Multi-IP address support -- Dynamic creation of virtual networks -- Co-existing with SRIOV and other CNIs. -- Route management across virtual networks and external networks - -**Functionality** - -**K8S-OVN4NFV Plugin development** - -Some code and ideas are being taken from ovn-kubernetes [1] plugin work that was done as part of OVN project. Due to good number of changes, it is a new plugin with its own code base. This Plugin assumes that the first interface in a Pod is provided by some other Plugin/CNI like Flannel or even OVN-Kubernetes and this plugin is only responsible to add multiple interfaces based on the Pod annotations. This plugin is currently tested to work with Multus as CNI and Flannel as first interface. - -Its functionality is divided into to following: - -- Initialization: - - - Register itself as watcher to K8S API Server to receive POD events and service events. - - Creates a distributed router - - Create gateway - - Creates a logical switch to connect distributed router with Gateway. - - Creates a subnet between distributed router & Gateway. - - Assigns first two IP addresses of the subnet to router and Gateway. - - Created router port and gateway port as part of assigning IP address and MAC addresses. - -- Watcher: - - - Upon POD bring up event - - Checks the annotations specific to OVN. - - For each network on which POD is going to be brought up - - Validates whether the logical switch is already present. If not, it is considered as error. - - If IP address and MAC addresses are not static, it asks OVN to assign IP and MAC address. - - Collects all IP addresses/MAC addresses assigned. Puts them as annotations (dynamic information) for that POD. - - - Upon POD deletion event - - Returns the IP address and MAC address back to OVN pool. - -- OVN CNI - - This is present in every minion node. CNI is expected to be called once for all OVN networks either Kubelet directly or via Multus. - - - Add: - - - Wait for annotations to be filled up by the watcher. From annotations, it knows set of IP Address, MAC address and Routes to be added. - - - Using network APIs for each element in the set: - - - Creates veth pair. - - - Assigns the IP address and MAC address to one end of veth pair. Other end veth pair is assigned to br-int. - - - Creates routes based on the route list provided in annotations. - - - If isDefaultRoute is set in annotations, it creates default route using this veth. - - - Delete - - - Removes veth pair. - - - Removes routes. - - - -**Figure** - - +-----------------+ - | | - | | Program OVN Switch - |ovn4nfvk8s Plugin| +------------------+ - | +--------------------->| | - | | | OVN Switch | - | | | | - | | +------------------+ - +-+ -----+---+----+ - ^ | - | | - +On Event |Annotate Pod - | | - | v - ++------ -------+ +------------------+ +-----------+ - | | | | | Pod | - |Kube API +--------> Kube Schedular +---------->| | - | | | | +--------+--+ - | | +--------+---------+ | - +---------------+ | | - | | - | |Assign IP & MAC - +--------v-----------+ | - | | | - | ovn4nfvk8s-cni + | - | +------------------+ - +--------------------+ - - - Complete Architecture can be found in ovn-kubernetes documenatation at github - - -**References** - -[1] https://wiki.opnfv.org/display/OV/K8S+OVN+NFV+Plugin -[2] https://github.com/openvswitch/ovn-kubernetes -[3] https://github.com/intel/multus-cni -[4] https://github.com/Huawei-PaaS/CNI-Genie - -**Authors/Contributors** - -Addepalli, Srinivasa R <srinivasa.r.addepalli@intel.com> -Sood, Ritu <ritu.sood@intel.com> - diff --git a/cmd/ovn4nfvk8s-cni/.gitkeep b/cmd/ovn4nfvk8s-cni/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/cmd/ovn4nfvk8s-cni/.gitkeep +++ /dev/null diff --git a/cmd/ovn4nfvk8s-cni/app/helper_linux.go b/cmd/ovn4nfvk8s-cni/app/helper_linux.go new file mode 100644 index 0000000..1a98a61 --- /dev/null +++ b/cmd/ovn4nfvk8s-cni/app/helper_linux.go @@ -0,0 +1,160 @@ +// +build linux + +package app + +import ( + "fmt" + "net" + "os/exec" + "regexp" + "strings" + + "github.com/sirupsen/logrus" + + "github.com/containernetworking/cni/pkg/skel" + "github.com/containernetworking/cni/pkg/types/current" + "github.com/containernetworking/plugins/pkg/ip" + "github.com/containernetworking/plugins/pkg/ns" + "github.com/vishvananda/netlink" +) + +func renameLink(curName, newName string) error { + link, err := netlink.LinkByName(curName) + if err != nil { + return err + } + + if err := netlink.LinkSetDown(link); err != nil { + return err + } + if err := netlink.LinkSetName(link, newName); err != nil { + return err + } + if err := netlink.LinkSetUp(link); err != nil { + return err + } + + return nil +} + +func setupInterface(netns ns.NetNS, containerID, ifName, macAddress, ipAddress, gatewayIP, defaultGateway string, mtu int) (*current.Interface, *current.Interface, error) { + hostIface := ¤t.Interface{} + contIface := ¤t.Interface{} + + var oldHostVethName string + err := netns.Do(func(hostNS ns.NetNS) error { + // create the veth pair in the container and move host end into host netns + hostVeth, containerVeth, err := ip.SetupVeth(ifName, mtu, hostNS) + if err != nil { + return fmt.Errorf("failed to setup veth %s: %v", ifName, err) + //return err + } + hostIface.Mac = hostVeth.HardwareAddr.String() + contIface.Name = containerVeth.Name + + link, err := netlink.LinkByName(contIface.Name) + if err != nil { + return fmt.Errorf("failed to lookup %s: %v", contIface.Name, err) + } + + hwAddr, err := net.ParseMAC(macAddress) + if err != nil { + return fmt.Errorf("failed to parse mac address for %s: %v", contIface.Name, err) + } + err = netlink.LinkSetHardwareAddr(link, hwAddr) + if err != nil { + return fmt.Errorf("failed to add mac address %s to %s: %v", macAddress, contIface.Name, err) + } + contIface.Mac = macAddress + contIface.Sandbox = netns.Path() + + addr, err := netlink.ParseAddr(ipAddress) + if err != nil { + return err + } + err = netlink.AddrAdd(link, addr) + if err != nil { + return fmt.Errorf("failed to add IP addr %s to %s: %v", ipAddress, contIface.Name, err) + } + + if defaultGateway == "true" { + gw := net.ParseIP(gatewayIP) + if gw == nil { + return fmt.Errorf("parse ip of gateway failed") + } + err = ip.AddRoute(nil, gw, link) + if err != nil { + logrus.Errorf("ip.AddRoute failed %v gw %v link %v", err, gw, link) + return err + } + } + oldHostVethName = hostVeth.Name + + return nil + }) + if err != nil { + return nil, nil, err + } + + // rename the host end of veth pair + re := regexp.MustCompile("(\\d+)\\D*\\z") + index := re.FindAllString(ifName, -1) + hostIface.Name = containerID[:14] + index[0] + if err := renameLink(oldHostVethName, hostIface.Name); err != nil { + return nil, nil, fmt.Errorf("failed to rename %s to %s: %v", oldHostVethName, hostIface.Name, err) + } + + return hostIface, contIface, nil +} + +// ConfigureInterface sets up the container interface +var ConfigureInterface = func(args *skel.CmdArgs, namespace, podName, macAddress, ipAddress, gatewayIP, interfaceName, defaultGateway string, mtu int) ([]*current.Interface, error) { + netns, err := ns.GetNS(args.Netns) + if err != nil { + return nil, fmt.Errorf("failed to open netns %q: %v", args.Netns, err) + } + defer netns.Close() + hostIface, contIface, err := setupInterface(netns, args.ContainerID, interfaceName, macAddress, ipAddress, gatewayIP, defaultGateway, mtu) + if err != nil { + return nil, err + } + var ifaceID string + if interfaceName != "" { + ifaceID = fmt.Sprintf("%s_%s_%s", namespace, podName, interfaceName) + } else { + ifaceID = fmt.Sprintf("%s_%s", namespace, podName) + } + + ovsArgs := []string{ + "add-port", "br-int", hostIface.Name, "--", "set", + "interface", hostIface.Name, + fmt.Sprintf("external_ids:attached_mac=%s", macAddress), + fmt.Sprintf("external_ids:iface-id=%s", ifaceID), + fmt.Sprintf("external_ids:ip_address=%s", ipAddress), + fmt.Sprintf("external_ids:sandbox=%s", args.ContainerID), + } + + var out []byte + out, err = exec.Command("ovs-vsctl", ovsArgs...).CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failure in plugging pod interface: %v\n %q", err, string(out)) + } + + return []*current.Interface{hostIface, contIface}, nil +} + +// PlatformSpecificCleanup deletes the OVS port +func PlatformSpecificCleanup(ifaceName string) (bool, error) { + done := false + ovsArgs := []string{ + "del-port", "br-int", ifaceName, + } + out, err := exec.Command("ovs-vsctl", ovsArgs...).CombinedOutput() + if err != nil && !strings.Contains(string(out), "no port named") { + // DEL should be idempotent; don't return an error just log it + logrus.Warningf("failed to delete OVS port %s: %v\n %q", ifaceName, err, string(out)) + done = true + } + + return done, nil +} diff --git a/cmd/ovn4nfvk8s-cni/ovn4nfvk8s-cni.go b/cmd/ovn4nfvk8s-cni/ovn4nfvk8s-cni.go new file mode 100644 index 0000000..923363b --- /dev/null +++ b/cmd/ovn4nfvk8s-cni/ovn4nfvk8s-cni.go @@ -0,0 +1,290 @@ +// +build linux + +package main + +import ( + "encoding/json" + "fmt" + "net" + "os" + "strconv" + "strings" + "time" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli" + + "github.com/containernetworking/cni/pkg/skel" + "github.com/containernetworking/cni/pkg/types" + "github.com/containernetworking/cni/pkg/types/current" + "github.com/containernetworking/cni/pkg/version" + "k8s.io/apimachinery/pkg/util/wait" + + kexec "k8s.io/utils/exec" + "ovn4nfv-k8s-plugin/internal/pkg/kube" + + "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app" + "ovn4nfv-k8s-plugin/internal/pkg/config" +) + +func argString2Map(args string) (map[string]string, error) { + argsMap := make(map[string]string) + + pairs := strings.Split(args, ";") + for _, pair := range pairs { + kv := strings.Split(pair, "=") + if len(kv) != 2 { + return nil, fmt.Errorf("ARGS: invalid pair %q", pair) + } + keyString := kv[0] + valueString := kv[1] + argsMap[keyString] = valueString + } + + return argsMap, nil +} + +func parseOvnNetworkObject(ovnnetwork string) ([]map[string]string, error) { + var ovnNet []map[string]string + + if ovnnetwork == "" { + return nil, fmt.Errorf("parseOvnNetworkObject:error") + } + + if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil { + return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork) + } + + return ovnNet, nil +} + +func mergeWithResult(srcObj, dstObj types.Result) (types.Result, error) { + + if dstObj == nil { + return srcObj, nil + } + src, err := current.NewResultFromResult(srcObj) + if err != nil { + return nil, fmt.Errorf("Couldn't convert old result to current version: %v", err) + } + dst, err := current.NewResultFromResult(dstObj) + if err != nil { + return nil, fmt.Errorf("Couldn't convert old result to current version: %v", err) + } + + ifacesLength := len(dst.Interfaces) + + for _, iface := range src.Interfaces { + dst.Interfaces = append(dst.Interfaces, iface) + } + for _, ip := range src.IPs { + if ip.Interface != nil && *(ip.Interface) != -1 { + ip.Interface = current.Int(*(ip.Interface) + ifacesLength) + } + dst.IPs = append(dst.IPs, ip) + } + for _, route := range src.Routes { + dst.Routes = append(dst.Routes, route) + } + + for _, ns := range src.DNS.Nameservers { + dst.DNS.Nameservers = append(dst.DNS.Nameservers, ns) + } + for _, s := range src.DNS.Search { + dst.DNS.Search = append(dst.DNS.Search, s) + } + for _, opt := range src.DNS.Options { + dst.DNS.Options = append(dst.DNS.Options, opt) + } + // TODO: what about DNS.domain? + return dst, nil +} + +func prettyPrint(i interface{}) string { + s, _ := json.MarshalIndent(i, "", "\t") + return string(s) +} + +func addMultipleInterfaces(args *skel.CmdArgs, ovnAnnotation, namespace, podName string) types.Result { + logrus.Infof("ovn4nfvk8s-cni: addMultipleInterfaces ") + + var ovnAnnotatedMap []map[string]string + ovnAnnotatedMap, err := parseOvnNetworkObject(ovnAnnotation) + if err != nil { + logrus.Errorf("addLogicalPort : Error Parsing Ovn Network List %v", ovnAnnotatedMap) + return nil + } + if namespace == "" || podName == "" { + logrus.Errorf("required CNI variable missing") + return nil + } + var interfacesArray []*current.Interface + var index int + var result *current.Result + var dstResult types.Result + for _, ovnNet := range ovnAnnotatedMap { + ipAddress := ovnNet["ip_address"] + macAddress := ovnNet["mac_address"] + gatewayIP := ovnNet["gateway_ip"] + defaultGateway := ovnNet["defaultGateway"] + + if ipAddress == "" || macAddress == "" || gatewayIP == "" { + logrus.Errorf("failed in pod annotation key extract") + return nil + } + + index++ + interfaceName := ovnNet["interface"] + if interfaceName == "" { + logrus.Errorf("addMultipleInterfaces: interface can't be null") + return nil + } + logrus.Debugf("addMultipleInterfaces: ipAddress %v %v", ipAddress, interfaceName) + interfacesArray, err = app.ConfigureInterface(args, namespace, podName, macAddress, ipAddress, gatewayIP, interfaceName, defaultGateway, config.Default.MTU) + if err != nil { + logrus.Errorf("Failed to configure interface in pod: %v", err) + return nil + } + addr, addrNet, err := net.ParseCIDR(ipAddress) + if err != nil { + logrus.Errorf("failed to parse IP address %q: %v", ipAddress, err) + return nil + } + ipVersion := "6" + if addr.To4() != nil { + ipVersion = "4" + } + var routes types.Route + if defaultGateway == "true" { + defaultAddr, defaultAddrNet, _ := net.ParseCIDR("0.0.0.0/0") + routes = types.Route{Dst: net.IPNet{IP: defaultAddr, Mask: defaultAddrNet.Mask}, GW: net.ParseIP(gatewayIP)} + + result = ¤t.Result{ + Interfaces: interfacesArray, + IPs: []*current.IPConfig{ + { + Version: ipVersion, + Interface: current.Int(1), + Address: net.IPNet{IP: addr, Mask: addrNet.Mask}, + Gateway: net.ParseIP(gatewayIP), + }, + }, + Routes: []*types.Route{&routes}, + } + } else { + result = ¤t.Result{ + Interfaces: interfacesArray, + IPs: []*current.IPConfig{ + { + Version: ipVersion, + Interface: current.Int(1), + Address: net.IPNet{IP: addr, Mask: addrNet.Mask}, + Gateway: net.ParseIP(gatewayIP), + }, + }, + } + + } + // Build the result structure to pass back to the runtime + dstResult, err = mergeWithResult(types.Result(result), dstResult) + if err != nil { + logrus.Errorf("Failed to merge results: %v", err) + return nil + } + } + logrus.Infof("addMultipleInterfaces: %s", prettyPrint(dstResult)) + return dstResult +} + +func cmdAdd(args *skel.CmdArgs) error { + logrus.Infof("ovn4nfvk8s-cni: cmdAdd ") + conf := &types.NetConf{} + if err := json.Unmarshal(args.StdinData, conf); err != nil { + return fmt.Errorf("failed to load netconf: %v", err) + } + + argsMap, err := argString2Map(args.Args) + if err != nil { + return err + } + + namespace := argsMap["K8S_POD_NAMESPACE"] + podName := argsMap["K8S_POD_NAME"] + if namespace == "" || podName == "" { + return fmt.Errorf("required CNI variable missing") + } + + clientset, err := config.NewClientset(&config.Kubernetes) + if err != nil { + return fmt.Errorf("Could not create clientset for kubernetes: %v", err) + } + kubecli := &kube.Kube{KClient: clientset} + + // Get the IP address and MAC address from the API server. + var annotationBackoff = wait.Backoff{Duration: 1 * time.Second, Steps: 14, Factor: 1.5, Jitter: 0.1} + var annotation map[string]string + if err := wait.ExponentialBackoff(annotationBackoff, func() (bool, error) { + annotation, err = kubecli.GetAnnotationsOnPod(namespace, podName) + if err != nil { + // TODO: check if err is non recoverable + logrus.Warningf("Error while obtaining pod annotations - %v", err) + return false, nil + } + if _, ok := annotation["ovnIfaceList"]; ok { + return true, nil + } + return false, nil + }); err != nil { + return fmt.Errorf("failed to get pod annotation - %v", err) + } + logrus.Infof("ovn4nfvk8s-cni: Annotation Found ") + ovnAnnotation, ok := annotation["ovnIfaceList"] + if !ok { + return fmt.Errorf("Error while obtaining pod annotations") + } + result := addMultipleInterfaces(args, ovnAnnotation, namespace, podName) + return result.Print() +} + +func cmdDel(args *skel.CmdArgs) error { + logrus.Infof("ovn4nfvk8s-cni: cmdDel ") + for i := 0; i < 10; i++ { + ifaceName := args.ContainerID[:14] + strconv.Itoa(i) + done, err := app.PlatformSpecificCleanup(ifaceName) + if err != nil { + logrus.Errorf("Teardown error: %v", err) + } + if done { + break + } + } + return nil +} + +func main() { + logrus.Infof("ovn4nfvk8s-cni CNI Invoked by Multus") + c := cli.NewApp() + c.Name = "ovn4nfvk8s-cni" + c.Usage = "a CNI plugin to set up or tear down a additional interfaces with OVN" + c.Version = "0.0.2" + c.Flags = config.Flags + + exec := kexec.New() + c.Action = func(ctx *cli.Context) error { + if _, err := config.InitConfig(ctx, exec, nil); err != nil { + return err + } + + skel.PluginMain(cmdAdd, cmdDel, version.All) + return nil + } + + if err := c.Run(os.Args); err != nil { + // Print the error to stdout in conformance with the CNI spec + e, ok := err.(*types.Error) + if !ok { + e = &types.Error{Code: 100, Msg: err.Error()} + } + e.Print() + } +} diff --git a/cmd/ovn4nfvk8s-cni/ovn4nfvk8s-cni_test.go b/cmd/ovn4nfvk8s-cni/ovn4nfvk8s-cni_test.go new file mode 100644 index 0000000..d5b7b6b --- /dev/null +++ b/cmd/ovn4nfvk8s-cni/ovn4nfvk8s-cni_test.go @@ -0,0 +1,56 @@ +// +build linux + +package main + +import ( + "github.com/containernetworking/cni/pkg/skel" + "github.com/containernetworking/cni/pkg/types/current" + "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app" + "testing" +) + +func TestAddMultipleInterfaces(t *testing.T) { + oldConfigureInterface := app.ConfigureInterface + // as we are exiting, revert ConfigureInterface back at end of function + defer func() { app.ConfigureInterface = oldConfigureInterface }() + app.ConfigureInterface = func(args *skel.CmdArgs, namespace, podName, macAddress, ipAddress, gatewayIP, interfaceName, defaultGateway string, mtu int) ([]*current.Interface, error) { + return []*current.Interface{ + { + Name: "pod", + Mac: "0a:00:00:00:00:0c", + Sandbox: "102103104", + }}, nil + } + args := &skel.CmdArgs{"102103104", "default", "eth0", "", "", nil} + + ovnAnnotation := "[{\"ip_address\":\"172.16.24.2/24\", \"mac_address\":\"0a:00:00:00:00:0c\", \"gateway_ip\": \"172.16.24.1\",\"interface\":\"net0\"}] " + result := addMultipleInterfaces(args, ovnAnnotation, "default", "pod") + if result == nil { + t.Errorf("Failed addMultipleInterfaces %+v", ovnAnnotation) + } + ovnAnnotation = "[{\"ip_address\":\"172.16.24.2/24\", \"mac_address\":\"0a:00:00:00:00:0c\", \"gateway_ip\": \"172.16.24.1\",\"defaultGateway\":\"true\",\"interface\":\"net0\"}] " + result = addMultipleInterfaces(args, ovnAnnotation, "default", "pod") + if result == nil { + t.Errorf("Failed addMultipleInterfaces %+v", ovnAnnotation) + } + ovnAnnotation = "[{\"ip_address\":\"172.16.24.2/24\", \"mac_address\":\"0a:00:00:00:00:0c\", \"gateway_ip\": \"172.16.24.1\"}] " + result = addMultipleInterfaces(args, ovnAnnotation, "default", "pod") + if result != nil { + t.Errorf("Failed addMultipleInterfaces %+v", ovnAnnotation) + } + ovnAnnotation = "[{\"mac_address\":\"0a:00:00:00:00:0c\", \"gateway_ip\": \"172.16.24.1\",\"interface\":\"net0\"}] " + result = addMultipleInterfaces(args, ovnAnnotation, "default", "pod") + if result != nil { + t.Errorf("Failed addMultipleInterfaces %+v", ovnAnnotation) + } + ovnAnnotation = "[{\"ip_address\":\"172.16.24.2/24\", \"mac_address\":\"0a:00:00:00:00:0c\", \"gateway_ip\": \"172.16.24.1\",\"interface\":\"net0\"}, {\"ip_address\":\"172.16.25.2/24\", \"mac_address\":\"0a:00:00:00:00:0d\", \"gateway_ip\": \"172.16.25.1\",\"interface\":\"net1\"}]" + result = addMultipleInterfaces(args, ovnAnnotation, "default", "pod") + if result == nil { + t.Errorf("Failed addMultipleInterfaces %+v", ovnAnnotation) + } + ovnAnnotation = "[{\"ip_address\":\"172.16.24.2/24\", \"mac_address\":\"0a:00:00:00:00:0c\", \"gateway_ip\": \"172.16.24.1\",\"interface\":\"net0\", \"defaultGateway\":\"true\"}, {\"ip_address\":\"172.16.25.2/24\", \"mac_address\":\"0a:00:00:00:00:0d\", \"gateway_ip\": \"172.16.25.1\",\"interface\":\"net1\"}]" + result = addMultipleInterfaces(args, ovnAnnotation, "default", "pod") + if result == nil { + t.Errorf("Failed addMultipleInterfaces %+v", ovnAnnotation) + } +} diff --git a/cmd/ovn4nfvk8s/.gitkeep b/cmd/ovn4nfvk8s/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/cmd/ovn4nfvk8s/.gitkeep +++ /dev/null diff --git a/cmd/ovn4nfvk8s/ovn4nfvk8s.go b/cmd/ovn4nfvk8s/ovn4nfvk8s.go new file mode 100644 index 0000000..d097558 --- /dev/null +++ b/cmd/ovn4nfvk8s/ovn4nfvk8s.go @@ -0,0 +1,132 @@ +package main + +import ( + "fmt" + "io/ioutil" + "os" + "os/signal" + "syscall" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli" + + kexec "k8s.io/utils/exec" + + "ovn4nfv-k8s-plugin/internal/pkg/config" + "ovn4nfv-k8s-plugin/internal/pkg/factory" + "ovn4nfv-k8s-plugin/internal/pkg/ovn" + "ovn4nfv-k8s-plugin/internal/pkg/util" +) + +func main() { + c := cli.NewApp() + c.Name = "ovn4nfvk8s" + c.Usage = "run ovn4nfvk8s to start pod watchers" + c.Version = config.Version + c.Flags = append([]cli.Flag{ + // Daemon file + cli.StringFlag{ + Name: "pidfile", + Usage: "Name of file that will hold the ovn4nfvk8s pid (optional)", + }, + }, config.Flags...) + c.Action = func(c *cli.Context) error { + return runOvnKube(c) + } + + if err := c.Run(os.Args); err != nil { + logrus.Fatal(err) + } +} + +func delPidfile(pidfile string) { + if pidfile != "" { + if _, err := os.Stat(pidfile); err == nil { + if err := os.Remove(pidfile); err != nil { + logrus.Errorf("%s delete failed: %v", pidfile, err) + } + } + } +} + +func runOvnKube(ctx *cli.Context) error { + fmt.Println("ovn4nfvk8s started") + exec := kexec.New() + _, err := config.InitConfig(ctx, exec, nil) + if err != nil { + return err + } + pidfile := ctx.String("pidfile") + + c := make(chan os.Signal, 2) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + delPidfile(pidfile) + os.Exit(1) + }() + + defer delPidfile(pidfile) + + if pidfile != "" { + // need to test if already there + _, err := os.Stat(pidfile) + + // Create if it doesn't exist, else exit with error + if os.IsNotExist(err) { + if err := ioutil.WriteFile(pidfile, []byte(fmt.Sprintf("%d", os.Getpid())), 0644); err != nil { + logrus.Errorf("failed to write pidfile %s (%v). Ignoring..", pidfile, err) + } + } else { + // get the pid and see if it exists + pid, err := ioutil.ReadFile(pidfile) + if err != nil { + logrus.Errorf("pidfile %s exists but can't be read", pidfile) + return err + } + _, err1 := os.Stat("/proc/" + string(pid[:]) + "/cmdline") + if os.IsNotExist(err1) { + // Left over pid from dead process + if err := ioutil.WriteFile(pidfile, []byte(fmt.Sprintf("%d", os.Getpid())), 0644); err != nil { + logrus.Errorf("failed to write pidfile %s (%v). Ignoring..", pidfile, err) + } + } else { + logrus.Errorf("pidfile %s exists and ovn4nfvk8s is running", pidfile) + os.Exit(1) + } + } + } + + if err = util.SetExec(exec); err != nil { + logrus.Errorf("Failed to initialize exec helper: %v", err) + return err + } + + clientset, err := config.NewClientset(&config.Kubernetes) + if err != nil { + panic(err.Error()) + } + + // Create distributed router and gateway for the deployment + err = ovn.SetupMaster("ovn4nfv-master") + if err != nil { + logrus.Errorf(err.Error()) + panic(err.Error()) + } + // create factory and start the ovn controller + stopChan := make(chan struct{}) + factory, err := factory.NewWatchFactory(clientset, stopChan) + if err != nil { + panic(err.Error) + } + + ovnController := ovn.NewOvnController(clientset, factory) + if err := ovnController.Run(); err != nil { + logrus.Errorf(err.Error()) + panic(err.Error()) + } + // run forever + select {} + + return nil +} @@ -0,0 +1,38 @@ +module ovn4nfv-k8s-plugin + +require ( + github.com/containernetworking/cni v0.6.0 + github.com/containernetworking/plugins v0.7.4 + github.com/coreos/go-iptables v0.4.0 // indirect + github.com/gogo/protobuf v1.1.1 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect + github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect + github.com/googleapis/gnostic v0.2.0 // indirect + github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f // indirect + github.com/hashicorp/golang-lru v0.5.0 // indirect + github.com/imdario/mergo v0.3.6 // indirect + github.com/json-iterator/go v1.1.5 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/onsi/ginkgo v1.6.0 + github.com/onsi/gomega v1.4.2 + github.com/peterbourgon/diskv v2.0.1+incompatible // indirect + github.com/sirupsen/logrus v1.2.0 + github.com/spf13/pflag v1.0.3 // indirect + github.com/urfave/cli v1.20.0 + github.com/vishvananda/netlink v1.0.0 + github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect + golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288 // indirect + golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect + gopkg.in/gcfg.v1 v1.2.3 + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/warnings.v0 v0.1.2 // indirect + k8s.io/api v0.0.0-20181117111259-46ad728b8d13 + k8s.io/apimachinery v0.0.0-20181116115711-1b0702fe2927 + k8s.io/client-go v9.0.0+incompatible + k8s.io/klog v0.1.0 // indirect + k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be // indirect + k8s.io/utils v0.0.0-20181115163542-0d26856f57b3 + sigs.k8s.io/yaml v1.1.0 // indirect +) @@ -0,0 +1,96 @@ +github.com/containernetworking/cni v0.6.0 h1:FXICGBZNMtdHlW65trpoHviHctQD3seWhRRcqp2hMOU= +github.com/containernetworking/cni v0.6.0/go.mod h1:LGwApLUm2FpoOfxTDEeq8T9ipbpZ61X79hmU3w8FmsY= +github.com/containernetworking/plugins v0.7.4 h1:ugkuXfg1Pdzm54U5DGMzreYIkZPSCmSq4rm5TIXVICA= +github.com/containernetworking/plugins v0.7.4/go.mod h1:dagHaAhNjXjT9QYOklkKJDGaQPTg4pf//FrUcJeb7FU= +github.com/coreos/go-iptables v0.4.0 h1:wh4UbVs8DhLUbpyq97GLJDKrQMjEDD63T1xE4CrsKzQ= +github.com/coreos/go-iptables v0.4.0/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g= +github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= +github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f h1:ShTPMJQes6tubcjzGMODIVG5hlrCeImaBnZzKF2N8SM= +github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= +github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= +github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I= +github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= +github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= +github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM= +github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= +github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc h1:R83G5ikgLMxrBvLh22JhdfI8K6YXEPHx5P03Uu3DRs4= +github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288 h1:JIqe8uIcRBHXDQVvZtHwp80ai3Lw3IJAeJEs55Dc1W0= +golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs= +gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= +gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +k8s.io/api v0.0.0-20181117111259-46ad728b8d13 h1:kScMdtyRni4/487ib8PTPnHNcgWWiRRH94iyicChmS0= +k8s.io/api v0.0.0-20181117111259-46ad728b8d13/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apimachinery v0.0.0-20181116115711-1b0702fe2927 h1:RkGqNDA3mKVqAQbCHoB+QeHshksgEzhAlFqY4HhlSu0= +k8s.io/apimachinery v0.0.0-20181116115711-1b0702fe2927/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= +k8s.io/client-go v9.0.0+incompatible h1:/PdJjifJTjMFe0G4ESclZDcwF1+bFePTJ2xf+MXjcvs= +k8s.io/client-go v9.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= +k8s.io/klog v0.1.0 h1:I5HMfc/DtuVaGR1KPwUrTc476K8NCqNBldC7H4dYEzk= +k8s.io/klog v0.1.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be h1:aWEq4nbj7HRJ0mtKYjNSk/7X28Tl6TI6FeG8gKF+r7Q= +k8s.io/kube-openapi v0.0.0-20181114233023-0317810137be/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= +k8s.io/utils v0.0.0-20181115163542-0d26856f57b3 h1:S3/Kq185JnolOEemhmDXXd23l2t4bX5hPQPQPADlF1E= +k8s.io/utils v0.0.0-20181115163542-0d26856f57b3/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= +sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= +sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/internal/pkg/config/.gitkeep b/internal/pkg/config/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/config/.gitkeep +++ /dev/null diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go new file mode 100644 index 0000000..045a0ac --- /dev/null +++ b/internal/pkg/config/config.go @@ -0,0 +1,359 @@ +package config + +import ( + "fmt" + "net/url" + "os" + "path/filepath" + "reflect" + "strings" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli" + gcfg "gopkg.in/gcfg.v1" + + kexec "k8s.io/utils/exec" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/cert" +) + +// The following are global config parameters that other modules may access directly +var ( + // ovn-kubernetes version, to be changed with every release + Version = "0.3.0" + + // Default holds parsed config file parameters and command-line overrides + Default = DefaultConfig{ + MTU: 1400, + } + + // Logging holds logging-related parsed config file parameters and command-line overrides + Logging = LoggingConfig{ + File: "", // do not log to a file by default + Level: 4, + } + + // CNI holds CNI-related parsed config file parameters and command-line overrides + CNI = CNIConfig{ + ConfDir: "/etc/cni/net.d", + Plugin: "ovn4nfvk8s-cni", + } + + // Kubernetes holds Kubernetes-related parsed config file parameters and command-line overrides + Kubernetes = KubernetesConfig{ + APIServer: "http://localhost:8080", + } +) + +// DefaultConfig holds parsed config file parameters and command-line overrides +type DefaultConfig struct { + // MTU value used for the overlay networks. + MTU int `gcfg:"mtu"` +} + +// LoggingConfig holds logging-related parsed config file parameters and command-line overrides +type LoggingConfig struct { + // File is the path of the file to log to + File string `gcfg:"logfile"` + // Level is the logging verbosity level + Level int `gcfg:"loglevel"` +} + +// CNIConfig holds CNI-related parsed config file parameters and command-line overrides +type CNIConfig struct { + // ConfDir specifies the CNI config directory in which to write the overlay CNI config file + ConfDir string `gcfg:"conf-dir"` + // Plugin specifies the name of the CNI plugin + Plugin string `gcfg:"plugin"` +} + +// KubernetesConfig holds Kubernetes-related parsed config file parameters and command-line overrides +type KubernetesConfig struct { + Kubeconfig string `gcfg:"kubeconfig"` + CACert string `gcfg:"cacert"` + APIServer string `gcfg:"apiserver"` + Token string `gcfg:"token"` +} + +// Config is used to read the structured config file and to cache config in testcases +type config struct { + Default DefaultConfig + Logging LoggingConfig + CNI CNIConfig + Kubernetes KubernetesConfig +} + +// copy members of struct 'src' into the corresponding field in struct 'dst' +// if the field in 'src' is a non-zero int or a non-zero-length string. This +// function should be called with pointers to structs. +func overrideFields(dst, src interface{}) { + dstStruct := reflect.ValueOf(dst).Elem() + srcStruct := reflect.ValueOf(src).Elem() + if dstStruct.Kind() != srcStruct.Kind() || dstStruct.Kind() != reflect.Struct { + panic("mismatched value types") + } + if dstStruct.NumField() != srcStruct.NumField() { + panic("mismatched struct types") + } + + for i := 0; i < dstStruct.NumField(); i++ { + dstField := dstStruct.Field(i) + srcField := srcStruct.Field(i) + if dstField.Kind() != srcField.Kind() { + panic("mismatched struct fields") + } + switch srcField.Kind() { + case reflect.String: + if srcField.String() != "" { + dstField.Set(srcField) + } + case reflect.Int: + if srcField.Int() != 0 { + dstField.Set(srcField) + } + default: + panic(fmt.Sprintf("unhandled struct field type: %v", srcField.Kind())) + } + } +} + +var cliConfig config + +// Flags are general command-line flags. Apps should add these flags to their +// own urfave/cli flags and call InitConfig() early in the application. +var Flags = []cli.Flag{ + cli.StringFlag{ + Name: "config-file", + Usage: "configuration file path (default: /etc/openvswitch/ovn4nfv_k8s.conf)", + }, + + // Generic options + cli.IntFlag{ + Name: "mtu", + Usage: "MTU value used for the overlay networks (default: 1400)", + Destination: &cliConfig.Default.MTU, + }, + + // Logging options + cli.IntFlag{ + Name: "loglevel", + Usage: "log verbosity and level: 5=debug, 4=info, 3=warn, 2=error, 1=fatal (default: 4)", + Destination: &cliConfig.Logging.Level, + }, + cli.StringFlag{ + Name: "logfile", + Usage: "path of a file to direct log output to", + Destination: &cliConfig.Logging.File, + }, + + // CNI options + cli.StringFlag{ + Name: "cni-conf-dir", + Usage: "the CNI config directory in which to write the overlay CNI config file (default: /etc/cni/net.d)", + Destination: &cliConfig.CNI.ConfDir, + }, + cli.StringFlag{ + Name: "cni-plugin", + Usage: "the name of the CNI plugin (default: ovn4nfvk8s-cni)", + Destination: &cliConfig.CNI.Plugin, + }, + + // Kubernetes-related options + cli.StringFlag{ + Name: "k8s-kubeconfig", + Usage: "absolute path to the Kubernetes kubeconfig file (not required if the --k8s-apiserver, --k8s-ca-cert, and --k8s-token are given)", + Destination: &cliConfig.Kubernetes.Kubeconfig, + }, + cli.StringFlag{ + Name: "k8s-apiserver", + Usage: "URL of the Kubernetes API server (not required if --k8s-kubeconfig is given) (default: http://localhost:8443)", + Destination: &cliConfig.Kubernetes.APIServer, + }, + cli.StringFlag{ + Name: "k8s-cacert", + Usage: "the absolute path to the Kubernetes API CA certificate (not required if --k8s-kubeconfig is given)", + Destination: &cliConfig.Kubernetes.CACert, + }, + cli.StringFlag{ + Name: "k8s-token", + Usage: "the Kubernetes API authentication token (not required if --k8s-kubeconfig is given)", + Destination: &cliConfig.Kubernetes.Token, + }, +} + +type Defaults struct { + K8sAPIServer bool + K8sToken bool + K8sCert bool +} + +const ( + ovsVsctlCommand = "ovs-vsctl" +) + +func buildKubernetesConfig(exec kexec.Interface, cli, file *config, defaults *Defaults) error { + + // Copy config file values over default values + overrideFields(&Kubernetes, &file.Kubernetes) + // And CLI overrides over config file and default values + overrideFields(&Kubernetes, &cli.Kubernetes) + + if Kubernetes.Kubeconfig != "" && !pathExists(Kubernetes.Kubeconfig) { + return fmt.Errorf("kubernetes kubeconfig file %q not found", Kubernetes.Kubeconfig) + } + if Kubernetes.CACert != "" && !pathExists(Kubernetes.CACert) { + return fmt.Errorf("kubernetes CA certificate file %q not found", Kubernetes.CACert) + } + + url, err := url.Parse(Kubernetes.APIServer) + if err != nil { + return fmt.Errorf("kubernetes API server address %q invalid: %v", Kubernetes.APIServer, err) + } else if url.Scheme != "https" && url.Scheme != "http" { + return fmt.Errorf("kubernetes API server URL scheme %q invalid", url.Scheme) + } + + return nil +} + +// getConfigFilePath returns config file path and 'true' if the config file is +// the fallback path (eg not given by the user), 'false' if given explicitly +// by the user +func getConfigFilePath(ctx *cli.Context) (string, bool) { + configFile := ctx.String("config-file") + if configFile != "" { + return configFile, false + } + + // default + return filepath.Join("/etc", "openvswitch", "ovn4nfv_k8s.conf"), true + +} + +// InitConfig reads the config file and common command-line options and +// constructs the global config object from them. It returns the config file +// path (if explicitly specified) or an error +func InitConfig(ctx *cli.Context, exec kexec.Interface, defaults *Defaults) (string, error) { + return InitConfigWithPath(ctx, exec, "", defaults) +} + +// InitConfigWithPath reads the given config file (or if empty, reads the config file +// specified by command-line arguments, or empty, the default config file) and +// common command-line options and constructs the global config object from +// them. It returns the config file path (if explicitly specified) or an error +func InitConfigWithPath(ctx *cli.Context, exec kexec.Interface, configFile string, defaults *Defaults) (string, error) { + var cfg config + var retConfigFile string + var configFileIsDefault bool + + // If no specific config file was given, try to find one from command-line + // arguments, or the platform-specific default config file path + if configFile == "" { + configFile, configFileIsDefault = getConfigFilePath(ctx) + } + + logrus.SetOutput(os.Stderr) + + if !configFileIsDefault { + // Only return explicitly specified config file + retConfigFile = configFile + } + + f, err := os.Open(configFile) + // Failure to find a default config file is not a hard error + if err != nil && !configFileIsDefault { + return "", fmt.Errorf("failed to open config file %s: %v", configFile, err) + } + if f != nil { + defer f.Close() + + // Parse ovn4nfvk8s config file. + if err = gcfg.ReadInto(&cfg, f); err != nil { + return "", fmt.Errorf("failed to parse config file %s: %v", f.Name(), err) + } + logrus.Infof("Parsed config file %s", f.Name()) + logrus.Infof("Parsed config: %+v", cfg) + } + + if defaults == nil { + defaults = &Defaults{} + } + + // Build config that needs no special processing + overrideFields(&Default, &cfg.Default) + overrideFields(&Default, &cliConfig.Default) + overrideFields(&CNI, &cfg.CNI) + overrideFields(&CNI, &cliConfig.CNI) + + // Logging setup + overrideFields(&Logging, &cfg.Logging) + overrideFields(&Logging, &cliConfig.Logging) + logrus.SetLevel(logrus.Level(Logging.Level)) + if Logging.File != "" { + var file *os.File + file, err = os.OpenFile(Logging.File, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660) + if err != nil { + logrus.Errorf("failed to open logfile %s (%v). Ignoring..", Logging.File, err) + } else { + logrus.SetOutput(file) + } + } + + if err = buildKubernetesConfig(exec, &cliConfig, &cfg, defaults); err != nil { + return "", err + } + logrus.Debugf("Default config: %+v", Default) + logrus.Debugf("Logging config: %+v", Logging) + logrus.Debugf("CNI config: %+v", CNI) + logrus.Debugf("Kubernetes config: %+v", Kubernetes) + + return retConfigFile, nil +} + +func pathExists(path string) bool { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return false + } + return true +} + +// NewClientset creates a Kubernetes clientset from either a kubeconfig, +// TLS properties, or an apiserver URL +func NewClientset(conf *KubernetesConfig) (*kubernetes.Clientset, error) { + var kconfig *rest.Config + var err error + + if conf.Kubeconfig != "" { + // uses the current context in kubeconfig + kconfig, err = clientcmd.BuildConfigFromFlags("", conf.Kubeconfig) + } else if strings.HasPrefix(conf.APIServer, "https") { + if conf.APIServer == "" || conf.Token == "" { + return nil, fmt.Errorf("TLS-secured apiservers require token and CA certificate") + } + kconfig = &rest.Config{ + Host: conf.APIServer, + BearerToken: conf.Token, + } + if conf.CACert != "" { + if _, err := cert.NewPool(conf.CACert); err != nil { + return nil, err + } + kconfig.TLSClientConfig = rest.TLSClientConfig{CAFile: conf.CACert} + } + } else if strings.HasPrefix(conf.APIServer, "http") { + kconfig, err = clientcmd.BuildConfigFromFlags(conf.APIServer, "") + } else { + // Assume we are running from a container managed by kubernetes + // and read the apiserver address and tokens from the + // container's environment. + kconfig, err = rest.InClusterConfig() + } + if err != nil { + return nil, err + } + + return kubernetes.NewForConfig(kconfig) +} diff --git a/internal/pkg/factory/.gitkeep b/internal/pkg/factory/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/factory/.gitkeep +++ /dev/null diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go new file mode 100644 index 0000000..a635e3f --- /dev/null +++ b/internal/pkg/factory/factory.go @@ -0,0 +1,318 @@ +package factory + +import ( + "fmt" + "reflect" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + + kapi "k8s.io/api/core/v1" + knet "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + informerfactory "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type informer struct { + sync.Mutex + oType reflect.Type + inf cache.SharedIndexInformer + handlers map[uint64]cache.ResourceEventHandler +} + +func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) { + i.Lock() + defer i.Unlock() + + objType := reflect.TypeOf(obj) + if objType != i.oType { + logrus.Errorf("object type %v did not match expected %v", objType, i.oType) + return + } + + for id, handler := range i.handlers { + f(id, handler) + } +} + +// WatchFactory initializes and manages common kube watches +type WatchFactory struct { + iFactory informerfactory.SharedInformerFactory + informers map[reflect.Type]*informer + handlerCounter uint64 +} + +const ( + resyncInterval = 12 * time.Hour +) + +func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer { + return &informer{ + oType: oType, + inf: inf, + handlers: make(map[uint64]cache.ResourceEventHandler), + } +} + +var ( + podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) + serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) + endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) + policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) + namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) + nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) +) + +// NewWatchFactory initializes a new watch factory +func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) { + // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have + // any race condition where a resync may be required e.g. cni executable on node watching for + // events on pods and assuming that an 'ADD' event will contain the annotations put in by + // ovnkube master (currently, it is just a 'get' loop) + // the downside of making it tight (like 10 minutes) is needless spinning on all resources + wf := &WatchFactory{ + iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval), + informers: make(map[reflect.Type]*informer), + } + + // Create shared informers we know we'll use + wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer()) + wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer()) + wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer()) + wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer()) + wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer()) + wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer()) + + wf.iFactory.Start(stopChan) + res := wf.iFactory.WaitForCacheSync(stopChan) + for oType, synced := range res { + if !synced { + return nil, fmt.Errorf("error in syncing cache for %v informer", oType) + } + informer := wf.informers[oType] + informer.inf.AddEventHandler(wf.newFederatedHandler(informer)) + } + + return wf, nil +} + +func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v ADD event for handler %d", inf.oType, id) + handler.OnAdd(obj) + }) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id) + handler.OnUpdate(oldObj, newObj) + }) + }, + DeleteFunc: func(obj interface{}) { + if inf.oType != reflect.TypeOf(obj) { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + logrus.Errorf("couldn't get object from tombstone: %+v", obj) + return + } + obj = tombstone.Obj + objType := reflect.TypeOf(obj) + if inf.oType != objType { + logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType) + return + } + } + inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id) + handler.OnDelete(obj) + }) + }, + } +} + +func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) { + switch objType { + case podType: + if pod, ok := obj.(*kapi.Pod); ok { + return &pod.ObjectMeta, nil + } + case serviceType: + if service, ok := obj.(*kapi.Service); ok { + return &service.ObjectMeta, nil + } + case endpointsType: + if endpoints, ok := obj.(*kapi.Endpoints); ok { + return &endpoints.ObjectMeta, nil + } + case policyType: + if policy, ok := obj.(*knet.NetworkPolicy); ok { + return &policy.ObjectMeta, nil + } + case namespaceType: + if namespace, ok := obj.(*kapi.Namespace); ok { + return &namespace.ObjectMeta, nil + } + case nodeType: + if node, ok := obj.(*kapi.Node); ok { + return &node.ObjectMeta, nil + } + } + return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType) +} + +func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + inf, ok := wf.informers[objType] + if !ok { + return 0, fmt.Errorf("unknown object type %v", objType) + } + + sel, err := metav1.LabelSelectorAsSelector(lsel) + if err != nil { + return 0, fmt.Errorf("error creating label selector: %v", err) + } + + filterFunc := func(obj interface{}) bool { + if namespace == "" && lsel == nil { + // Unfiltered handler + return true + } + meta, err := getObjectMeta(objType, obj) + if err != nil { + logrus.Errorf("watch handler filter error: %v", err) + return false + } + if namespace != "" && meta.Namespace != namespace { + return false + } + if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) { + return false + } + return true + } + + // Process existing items as a set so the caller can clean up + // after a restart or whatever + existingItems := inf.inf.GetStore().List() + if processExisting != nil { + items := make([]interface{}, 0) + for _, obj := range existingItems { + if filterFunc(obj) { + items = append(items, obj) + } + } + processExisting(items) + } + + handlerID := atomic.AddUint64(&wf.handlerCounter, 1) + + inf.Lock() + defer inf.Unlock() + + inf.handlers[handlerID] = cache.FilteringResourceEventHandler{ + FilterFunc: filterFunc, + Handler: funcs, + } + logrus.Debugf("added %v event handler %d", objType, handlerID) + + // Send existing items to the handler's add function; informers usually + // do this but since we share informers, it's long-since happened so + // we must emulate that here + for _, obj := range existingItems { + inf.handlers[handlerID].OnAdd(obj) + } + + return handlerID, nil +} + +func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error { + inf, ok := wf.informers[objType] + if !ok { + return fmt.Errorf("tried to remove unknown object type %v event handler", objType) + } + + inf.Lock() + defer inf.Unlock() + if _, ok := inf.handlers[handlerID]; !ok { + return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID) + } + delete(inf.handlers, handlerID) + logrus.Debugf("removed %v event handler %d", objType, handlerID) + return nil +} + +// AddPodHandler adds a handler function that will be executed on Pod object changes +func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(podType, "", nil, handlerFuncs, processExisting) +} + +// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change +func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting) +} + +// RemovePodHandler removes a Pod object event handler function +func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error { + return wf.removeHandler(podType, handlerID) +} + +// AddServiceHandler adds a handler function that will be executed on Service object changes +func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting) +} + +// RemoveServiceHandler removes a Service object event handler function +func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error { + return wf.removeHandler(serviceType, handlerID) +} + +// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes +func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting) +} + +// RemoveEndpointsHandler removes a Endpoints object event handler function +func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error { + return wf.removeHandler(endpointsType, handlerID) +} + +// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes +func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting) +} + +// RemovePolicyHandler removes a NetworkPolicy object event handler function +func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error { + return wf.removeHandler(policyType, handlerID) +} + +// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes +func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting) +} + +// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change +func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting) +} + +// RemoveNamespaceHandler removes a Namespace object event handler function +func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error { + return wf.removeHandler(namespaceType, handlerID) +} + +// AddNodeHandler adds a handler function that will be executed on Node object changes +func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting) +} + +// RemoveNodeHandler removes a Node object event handler function +func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error { + return wf.removeHandler(nodeType, handlerID) +} diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go new file mode 100644 index 0000000..38fb77e --- /dev/null +++ b/internal/pkg/factory/factory_test.go @@ -0,0 +1,686 @@ +package factory + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + knet "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestFactory(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Watch Factory Suite") +} + +func newObjectMeta(name, namespace string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + Namespace: namespace, + Labels: map[string]string{ + "name": name, + }, + } +} + +func newPod(name, namespace string) *v1.Pod { + return &v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + ObjectMeta: newObjectMeta(name, namespace), + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "containerName", + Image: "containerImage", + }, + }, + NodeName: "mynode", + }, + } +} + +func newNamespace(name string) *v1.Namespace { + return &v1.Namespace{ + Status: v1.NamespaceStatus{ + Phase: v1.NamespaceActive, + }, + ObjectMeta: newObjectMeta(name, name), + } +} + +func newNode(name string) *v1.Node { + return &v1.Node{ + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(name, ""), + } +} + +func newPolicy(name, namespace string) *knet.NetworkPolicy { + return &knet.NetworkPolicy{ + ObjectMeta: newObjectMeta(name, namespace), + } +} + +func newEndpoints(name, namespace string) *v1.Endpoints { + return &v1.Endpoints{ + ObjectMeta: newObjectMeta(name, namespace), + } +} + +func newService(name, namespace string) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + Namespace: namespace, + Labels: map[string]string{ + "name": name, + }, + }, + } +} + +func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { + w := watch.NewFake() + c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) + c.AddReactor("list", objType, listFn) + return w +} + +var _ = Describe("Watch Factory Operations", func() { + var ( + fakeClient *fake.Clientset + podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher + policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher + pods []*v1.Pod + namespaces []*v1.Namespace + nodes []*v1.Node + policies []*knet.NetworkPolicy + endpoints []*v1.Endpoints + services []*v1.Service + stop chan struct{} + numAdded, numUpdated, numDeleted int + ) + + BeforeEach(func() { + fakeClient = &fake.Clientset{} + stop = make(chan struct{}) + + pods = make([]*v1.Pod, 0) + podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.PodList{} + for _, p := range pods { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + namespaces = make([]*v1.Namespace, 0) + namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.NamespaceList{} + for _, p := range namespaces { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + nodes = make([]*v1.Node, 0) + nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.NodeList{} + for _, p := range nodes { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + policies = make([]*knet.NetworkPolicy, 0) + policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) { + obj := &knet.NetworkPolicyList{} + for _, p := range policies { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + endpoints = make([]*v1.Endpoints, 0) + endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.EndpointsList{} + for _, p := range endpoints { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + services = make([]*v1.Service, 0) + serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.ServiceList{} + for _, p := range services { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + numAdded = 0 + numUpdated = 0 + numDeleted = 0 + }) + + Context("when a processExisting is given", func() { + testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + id, err := wf.addHandler(objType, namespace, lsel, + cache.ResourceEventHandlerFuncs{}, + func(objs []interface{}) { + Expect(len(objs)).To(Equal(1)) + }) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(BeNumerically(">", uint64(0))) + wf.removeHandler(objType, id) + close(stop) + } + + It("is called for each existing pod", func() { + pods = append(pods, newPod("pod1", "default")) + testExisting(podType, "", nil) + }) + + It("is called for each existing namespace", func() { + namespaces = append(namespaces, newNamespace("default")) + testExisting(namespaceType, "", nil) + }) + + It("is called for each existing node", func() { + nodes = append(nodes, newNode("default")) + testExisting(nodeType, "", nil) + }) + + It("is called for each existing policy", func() { + policies = append(policies, newPolicy("denyall", "default")) + testExisting(policyType, "", nil) + }) + + It("is called for each existing endpoints", func() { + endpoints = append(endpoints, newEndpoints("myendpoint", "default")) + testExisting(endpointsType, "", nil) + }) + + It("is called for each existing service", func() { + services = append(services, newService("myservice", "default")) + testExisting(serviceType, "", nil) + }) + + It("is called for each existing pod that matches a given namespace and label", func() { + pod := newPod("pod1", "default") + pod.ObjectMeta.Labels["blah"] = "foobar" + pods = append(pods, pod) + testExisting(podType, "default", &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }) + }) + }) + + Context("when existing items are known to the informer", func() { + testExisting := func(objType reflect.Type) { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + id, err := wf.addHandler(objType, "", nil, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + numAdded++ + }, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(numAdded).To(Equal(2)) + wf.removeHandler(objType, id) + close(stop) + } + + It("calls ADD for each existing pod", func() { + pods = append(pods, newPod("pod1", "default")) + pods = append(pods, newPod("pod2", "default")) + testExisting(podType) + }) + + It("calls ADD for each existing namespace", func() { + namespaces = append(namespaces, newNamespace("default")) + namespaces = append(namespaces, newNamespace("default2")) + testExisting(namespaceType) + }) + + It("calls ADD for each existing node", func() { + nodes = append(nodes, newNode("default")) + nodes = append(nodes, newNode("default2")) + testExisting(nodeType) + }) + + It("calls ADD for each existing policy", func() { + policies = append(policies, newPolicy("denyall", "default")) + policies = append(policies, newPolicy("denyall2", "default")) + testExisting(policyType) + }) + + It("calls ADD for each existing endpoints", func() { + endpoints = append(endpoints, newEndpoints("myendpoint", "default")) + endpoints = append(endpoints, newEndpoints("myendpoint2", "default")) + testExisting(endpointsType) + }) + + It("calls ADD for each existing service", func() { + services = append(services, newService("myservice", "default")) + services = append(services, newService("myservice2", "default")) + testExisting(serviceType) + }) + }) + + addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 { + id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer GinkgoRecover() + numAdded++ + funcs.AddFunc(obj) + }, + UpdateFunc: func(old, new interface{}) { + defer GinkgoRecover() + numUpdated++ + funcs.UpdateFunc(old, new) + }, + DeleteFunc: func(obj interface{}) { + defer GinkgoRecover() + numDeleted++ + funcs.DeleteFunc(obj) + }, + }, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(BeNumerically(">", uint64(0))) + return id + } + + addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 { + return addFilteredHandler(wf, objType, "", nil, funcs) + } + + It("responds to pod add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newPod("pod1", "default") + id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newPod := new.(*v1.Pod) + Expect(reflect.DeepEqual(newPod, added)).To(BeTrue()) + Expect(newPod.Spec.NodeName).To(Equal("foobar")) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) + }, + }) + + pods = append(pods, added) + podWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.NodeName = "foobar" + podWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + pods = pods[:0] + podWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.RemovePodHandler(id) + close(stop) + }) + + It("responds to namespace add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNamespace("default") + id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ns := obj.(*v1.Namespace) + Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNS := new.(*v1.Namespace) + Expect(reflect.DeepEqual(newNS, added)).To(BeTrue()) + Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating)) + }, + DeleteFunc: func(obj interface{}) { + ns := obj.(*v1.Namespace) + Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) + }, + }) + + namespaces = append(namespaces, added) + namespaceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + namespaces = namespaces[:0] + namespaceWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.RemoveNamespaceHandler(id) + close(stop) + }) + + It("responds to node add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNode("mynode") + id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + Expect(reflect.DeepEqual(node, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNode := new.(*v1.Node) + Expect(reflect.DeepEqual(newNode, added)).To(BeTrue()) + Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated)) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1.Node) + Expect(reflect.DeepEqual(node, added)).To(BeTrue()) + }, + }) + + nodes = append(nodes, added) + nodeWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Status.Phase = v1.NodeTerminated + nodeWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + nodes = nodes[:0] + nodeWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(nodeType, id) + close(stop) + }) + + It("responds to policy add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newPolicy("mypolicy", "default") + id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + np := obj.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(np, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNP := new.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(newNP, added)).To(BeTrue()) + Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress})) + }, + DeleteFunc: func(obj interface{}) { + np := obj.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(np, added)).To(BeTrue()) + }, + }) + + policies = append(policies, added) + policyWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress} + policyWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + policies = policies[:0] + policyWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(policyType, id) + close(stop) + }) + + It("responds to endpoints add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newEndpoints("myendpoints", "default") + id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eps := obj.(*v1.Endpoints) + Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newEPs := new.(*v1.Endpoints) + Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue()) + Expect(len(newEPs.Subsets)).To(Equal(1)) + }, + DeleteFunc: func(obj interface{}) { + eps := obj.(*v1.Endpoints) + Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) + }, + }) + + endpoints = append(endpoints, added) + endpointsWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Subsets = append(added.Subsets, v1.EndpointSubset{ + Ports: []v1.EndpointPort{ + { + Name: "foobar", + Port: 1234, + }, + }, + }) + endpointsWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + endpoints = endpoints[:0] + endpointsWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(endpointsType, id) + close(stop) + }) + + It("responds to service add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newService("myservice", "default") + id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + service := obj.(*v1.Service) + Expect(reflect.DeepEqual(service, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newService := new.(*v1.Service) + Expect(reflect.DeepEqual(newService, added)).To(BeTrue()) + Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1")) + }, + DeleteFunc: func(obj interface{}) { + service := obj.(*v1.Service) + Expect(reflect.DeepEqual(service, added)).To(BeTrue()) + }, + }) + + services = append(services, added) + serviceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.ClusterIP = "1.1.1.1" + serviceWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + services = services[:0] + serviceWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(serviceType, id) + close(stop) + }) + + It("stops processing events after the handler is removed", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNamespace("default") + id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }) + + namespaces = append(namespaces, added) + namespaceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + wf.RemoveNamespaceHandler(id) + + added2 := newNamespace("other") + namespaces = append(namespaces, added2) + namespaceWatch.Add(added2) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + added2.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(added2) + Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + namespaces = []*v1.Namespace{added} + namespaceWatch.Delete(added2) + Consistently(func() int { return numDeleted }, 2).Should(Equal(0)) + + close(stop) + }) + + It("filters correctly by label and namespace", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + passesFilter := newPod("pod1", "default") + passesFilter.ObjectMeta.Labels["blah"] = "foobar" + failsFilter := newPod("pod2", "default") + failsFilter.ObjectMeta.Labels["blah"] = "baz" + failsFilter2 := newPod("pod3", "otherns") + failsFilter2.ObjectMeta.Labels["blah"] = "foobar" + + addFilteredHandler(wf, + podType, + "default", + &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newPod := new.(*v1.Pod) + Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue()) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) + }, + }) + + pods = append(pods, passesFilter) + podWatch.Add(passesFilter) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + + // numAdded should remain 1 + pods = append(pods, failsFilter) + podWatch.Add(failsFilter) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + // numAdded should remain 1 + pods = append(pods, failsFilter2) + podWatch.Add(failsFilter2) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + passesFilter.Status.Phase = v1.PodFailed + podWatch.Modify(passesFilter) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + + // numAdded should remain 1 + failsFilter.Status.Phase = v1.PodFailed + podWatch.Modify(failsFilter) + Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + + failsFilter2.Status.Phase = v1.PodFailed + podWatch.Modify(failsFilter2) + Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + + pods = []*v1.Pod{failsFilter, failsFilter2} + podWatch.Delete(passesFilter) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + close(stop) + }) + + It("correctly handles object updates that cause filter changes", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + pod := newPod("pod1", "default") + pod.ObjectMeta.Labels["blah"] = "baz" + + equalPod := pod + id := addFilteredHandler(wf, + podType, + "default", + &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p := obj.(*v1.Pod) + Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) { + p := obj.(*v1.Pod) + Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) + }, + }) + + pods = append(pods, pod) + + // Pod doesn't pass filter; shouldn't be added + podWatch.Add(pod) + Consistently(func() int { return numAdded }, 2).Should(Equal(0)) + + // Update pod to pass filter; should be treated as add. Need + // to deep-copy pod when modifying because it's a pointer all + // the way through when using FakeClient + podCopy := pod.DeepCopy() + podCopy.ObjectMeta.Labels["blah"] = "foobar" + pods = []*v1.Pod{podCopy} + equalPod = podCopy + podWatch.Modify(podCopy) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + + // Update pod to fail filter; should be treated as delete + pod.ObjectMeta.Labels["blah"] = "baz" + podWatch.Modify(pod) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + + wf.RemovePodHandler(id) + close(stop) + }) +}) diff --git a/internal/pkg/kube/.gitkeep b/internal/pkg/kube/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/kube/.gitkeep +++ /dev/null diff --git a/internal/pkg/kube/kube.go b/internal/pkg/kube/kube.go new file mode 100644 index 0000000..cc7c29b --- /dev/null +++ b/internal/pkg/kube/kube.go @@ -0,0 +1,141 @@ +package kube + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + kapi "k8s.io/api/core/v1" + kapisnetworking "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// Interface represents the exported methods for dealing with getting/setting +// kubernetes resources +type Interface interface { + SetAnnotationOnPod(pod *kapi.Pod, key, value string) error + SetAnnotationOnNode(node *kapi.Node, key, value string) error + SetAnnotationOnNamespace(ns *kapi.Namespace, key, value string) error + GetAnnotationsOnPod(namespace, name string) (map[string]string, error) + GetPod(namespace, name string) (*kapi.Pod, error) + GetPods(namespace string) (*kapi.PodList, error) + GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) + GetNodes() (*kapi.NodeList, error) + GetNode(name string) (*kapi.Node, error) + GetService(namespace, name string) (*kapi.Service, error) + GetEndpoints(namespace string) (*kapi.EndpointsList, error) + GetNamespace(name string) (*kapi.Namespace, error) + GetNamespaces() (*kapi.NamespaceList, error) + GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) +} + +// Kube is the structure object upon which the Interface is implemented +type Kube struct { + KClient kubernetes.Interface +} + +// SetAnnotationOnPod takes the pod object and key/value string pair to set it as an annotation +func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error { + logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name) + patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) + _, err := k.KClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData)) + if err != nil { + logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err) + } + return err +} + +// SetAnnotationOnNode takes the node object and key/value string pair to set it as an annotation +func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error { + logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name) + patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) + _, err := k.KClient.Core().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData)) + if err != nil { + logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err) + } + return err +} + +// SetAnnotationOnNamespace takes the Namespace object and key/value pair +// to set it as an annotation +func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key, + value string) error { + logrus.Infof("Setting annotations %s=%s on namespace %s", key, value, + ns.Name) + patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, + value) + _, err := k.KClient.Core().Namespaces().Patch(ns.Name, + types.MergePatchType, []byte(patchData)) + if err != nil { + logrus.Errorf("Error in setting annotation on namespace %s: %v", + ns.Name, err) + } + return err +} + +// GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace +func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) { + pod, err := k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return pod.ObjectMeta.Annotations, nil +} + +// GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace +func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) { + return k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{}) +} + +// GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace +func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) { + return k.KClient.Core().Pods(namespace).List(metav1.ListOptions{}) +} + +// GetPodsByLabels obtains the Pod resources from kubernetes apiserver, +// given the namespace and label +func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) { + options := metav1.ListOptions{} + options.LabelSelector = selector.String() + return k.KClient.Core().Pods(namespace).List(options) +} + +// GetNodes returns the list of all Node objects from kubernetes +func (k *Kube) GetNodes() (*kapi.NodeList, error) { + return k.KClient.Core().Nodes().List(metav1.ListOptions{}) +} + +// GetNode returns the Node resource from kubernetes apiserver, given its name +func (k *Kube) GetNode(name string) (*kapi.Node, error) { + return k.KClient.Core().Nodes().Get(name, metav1.GetOptions{}) +} + +// GetService returns the Service resource from kubernetes apiserver, given its name and namespace +func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) { + return k.KClient.Core().Services(namespace).Get(name, metav1.GetOptions{}) +} + +// GetEndpoints returns all the Endpoint resources from kubernetes +// apiserver, given namespace +func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) { + return k.KClient.Core().Endpoints(namespace).List(metav1.ListOptions{}) +} + +// GetNamespace returns the Namespace resource from kubernetes apiserver, +// given its name +func (k *Kube) GetNamespace(name string) (*kapi.Namespace, error) { + return k.KClient.Core().Namespaces().Get(name, metav1.GetOptions{}) +} + +// GetNamespaces returns all Namespace resource from kubernetes apiserver +func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) { + return k.KClient.Core().Namespaces().List(metav1.ListOptions{}) +} + +// GetNetworkPolicies returns all network policy objects from kubernetes +func (k *Kube) GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) { + return k.KClient.Networking().NetworkPolicies(namespace).List(metav1.ListOptions{}) +} diff --git a/internal/pkg/ovn/.gitkeep b/internal/pkg/ovn/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/ovn/.gitkeep +++ /dev/null diff --git a/internal/pkg/ovn/common.go b/internal/pkg/ovn/common.go new file mode 100644 index 0000000..16923ea --- /dev/null +++ b/internal/pkg/ovn/common.go @@ -0,0 +1,89 @@ +package ovn + +import ( + "encoding/json" + "fmt" + "github.com/sirupsen/logrus" + "strings" +) + +func (oc *Controller) getIPFromOvnAnnotation(ovnAnnotation string) string { + if ovnAnnotation == "" { + return "" + } + + var ovnAnnotationMap map[string]string + err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap) + if err != nil { + logrus.Errorf("Error in json unmarshaling ovn annotation "+ + "(%v)", err) + return "" + } + + ipAddressMask := strings.Split(ovnAnnotationMap["ip_address"], "/") + if len(ipAddressMask) != 2 { + logrus.Errorf("Error in splitting ip address") + return "" + } + + return ipAddressMask[0] +} + +func (oc *Controller) getMacFromOvnAnnotation(ovnAnnotation string) string { + if ovnAnnotation == "" { + return "" + } + + var ovnAnnotationMap map[string]string + err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap) + if err != nil { + logrus.Errorf("Error in json unmarshaling ovn annotation "+ + "(%v)", err) + return "" + } + + return ovnAnnotationMap["mac_address"] +} + +func stringSliceMembership(slice []string, key string) bool { + for _, val := range slice { + if val == key { + return true + } + } + return false +} + +func (oc *Controller) getNetworkFromOvnAnnotation(ovnAnnotation string) string { + if ovnAnnotation == "" { + logrus.Errorf("getNetworkFromOvnAnnotation ovnAnnotation: %s", ovnAnnotation) + return "" + } + logrus.Infof("getNetworkFromOvnAnnotation ovnAnnotation: %s", ovnAnnotation) + + var ovnAnnotationMap map[string]string + err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap) + if err != nil { + logrus.Errorf("Error in json unmarshaling ovn annotation "+ + "(%v)", err) + return "" + } + for key, value := range ovnAnnotationMap { + logrus.Infof("getNetworkFromOvnAnnotation %s: %s", key, value) + } + return ovnAnnotationMap["name"] +} + +func (oc *Controller) parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) { + var ovnNet []map[string]interface{} + + if ovnnetwork == "" { + return nil, fmt.Errorf("parseOvnNetworkObject:error") + } + + if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil { + return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork) + } + + return ovnNet, nil +} diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go new file mode 100644 index 0000000..ec2ccbd --- /dev/null +++ b/internal/pkg/ovn/ovn.go @@ -0,0 +1,71 @@ +package ovn + +import ( + "fmt" + kapi "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "ovn4nfv-k8s-plugin/internal/pkg/factory" + "ovn4nfv-k8s-plugin/internal/pkg/kube" +) + +// Controller structure is the object which holds the controls for starting +// and reacting upon the watched resources (e.g. pods, endpoints) +type Controller struct { + kube kube.Interface + watchFactory *factory.WatchFactory + + gatewayCache map[string]string + // A cache of all logical switches seen by the watcher + logicalSwitchCache map[string]bool + // A cache of all logical ports seen by the watcher and + // its corresponding logical switch + logicalPortCache map[string]string +} + +// NewOvnController creates a new OVN controller for creating logical network +// infrastructure and policy +func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory) *Controller { + return &Controller{ + kube: &kube.Kube{KClient: kubeClient}, + watchFactory: wf, + logicalSwitchCache: make(map[string]bool), + logicalPortCache: make(map[string]string), + gatewayCache: make(map[string]string), + } +} + +// Run starts the actual watching. Also initializes any local structures needed. +func (oc *Controller) Run() error { + fmt.Println("ovn4nfvk8s watching Pods") + for _, f := range []func() error{oc.WatchPods} { + if err := f(); err != nil { + return err + } + } + return nil +} + +// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic +func (oc *Controller) WatchPods() error { + _, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*kapi.Pod) + if pod.Spec.NodeName != "" { + oc.addLogicalPort(pod) + } + }, + UpdateFunc: func(old, newer interface{}) { + podNew := newer.(*kapi.Pod) + podOld := old.(*kapi.Pod) + if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" { + oc.addLogicalPort(podNew) + } + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*kapi.Pod) + oc.deleteLogicalPort(pod) + }, + }, oc.syncPods) + return err +} diff --git a/internal/pkg/ovn/ovn_test.go b/internal/pkg/ovn/ovn_test.go new file mode 100644 index 0000000..2e558a6 --- /dev/null +++ b/internal/pkg/ovn/ovn_test.go @@ -0,0 +1,116 @@ +package ovn + +import ( + "fmt" + "testing" + + "github.com/urfave/cli" + fakeexec "k8s.io/utils/exec/testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "ovn4nfv-k8s-plugin/internal/pkg/config" + "ovn4nfv-k8s-plugin/internal/pkg/factory" + ovntest "ovn4nfv-k8s-plugin/internal/pkg/testing" + "ovn4nfv-k8s-plugin/internal/pkg/util" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestOvn(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "OVN/Pod Test Suite") +} + +var _ = AfterSuite(func() { +}) + +var _ = Describe("Add logical Port", func() { + var app *cli.App + + BeforeEach(func() { + // Restore global default values before each testcase + //config.RestoreDefaultConfig() + + app = cli.NewApp() + app.Name = "test" + app.Flags = config.Flags + }) + + It("tests Pod", func() { + app.Action = func(ctx *cli.Context) error { + const ( + gwIP string = "10.1.1.1" + gwCIDR string = gwIP + "/24" + netName string = "ovn-prot-net" + portName string = "_ok_net0" + macIPAddress string = "0a:00:00:00:00:01 192.168.1.3" + ) + fakeCmds := ovntest.AddFakeCmd(nil, &ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch " + "name=" + netName, + Output: netName, + }) + fakeCmds = ovntest.AddFakeCmdsNoOutputNoError(fakeCmds, []string{ + "ovn-nbctl --timeout=15 --wait=sb -- --may-exist lsp-add " + netName + " " + portName + " -- lsp-set-addresses " + portName + " dynamic -- set logical_switch_port " + portName + " external-ids:namespace= external-ids:logical_switch=" + netName + " external-ids:pod=true", + }) + + fakeCmds = ovntest.AddFakeCmd(fakeCmds, &ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 --if-exists get logical_switch " + netName + " external_ids:gateway_ip", + Output: gwCIDR, + }) + fakeCmds = ovntest.AddFakeCmd(fakeCmds, &ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 get logical_switch_port " + portName + " dynamic_addresses", + Output: macIPAddress, + }) + + fexec := &fakeexec.FakeExec{ + CommandScript: fakeCmds, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + + err := util.SetExec(fexec) + Expect(err).NotTo(HaveOccurred()) + + _, err = config.InitConfig(ctx, fexec, nil) + Expect(err).NotTo(HaveOccurred()) + + fakeClient := &fake.Clientset{} + var fakeWatchFactory factory.WatchFactory + + ovnController := NewOvnController(fakeClient, &fakeWatchFactory) + Expect(err).NotTo(HaveOccurred()) + var ( + okPod = v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ok", + Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\" , \"defaultGateway\": \"true\"}]"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "by-name", + }, + {}, + }, + }, + } + ) + + ovnController.addLogicalPort(&okPod) + _, _ = ovnController.kube.GetAnnotationsOnPod("", "ok") + + return nil + } + + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + }) +}) diff --git a/internal/pkg/ovn/pods.go b/internal/pkg/ovn/pods.go new file mode 100644 index 0000000..cc3d459 --- /dev/null +++ b/internal/pkg/ovn/pods.go @@ -0,0 +1,267 @@ +package ovn + +import ( + "fmt" + "strings" + "time" + + "github.com/sirupsen/logrus" + kapi "k8s.io/api/core/v1" + "ovn4nfv-k8s-plugin/internal/pkg/util" +) + +func (oc *Controller) syncPods(pods []interface{}) { +} +func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string, error) { + var gatewayIPMaskStr, stderr string + var ok bool + var err error + logrus.Infof("getGatewayFromSwitch: %s", logicalSwitch) + if gatewayIPMaskStr, ok = oc.gatewayCache[logicalSwitch]; !ok { + gatewayIPMaskStr, stderr, err = util.RunOVNNbctlUnix("--if-exists", + "get", "logical_switch", logicalSwitch, + "external_ids:gateway_ip") + if err != nil { + logrus.Errorf("Failed to get gateway IP: %s, stderr: %q, %v", + gatewayIPMaskStr, stderr, err) + return "", "", err + } + if gatewayIPMaskStr == "" { + return "", "", fmt.Errorf("Empty gateway IP in logical switch %s", + logicalSwitch) + } + oc.gatewayCache[logicalSwitch] = gatewayIPMaskStr + } + gatewayIPMask := strings.Split(gatewayIPMaskStr, "/") + if len(gatewayIPMask) != 2 { + return "", "", fmt.Errorf("Failed to get IP and Mask from gateway CIDR: %s", + gatewayIPMaskStr) + } + gatewayIP := gatewayIPMask[0] + mask := gatewayIPMask[1] + return gatewayIP, mask, nil +} + +func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) { + + if pod.Spec.HostNetwork { + return + } + + logrus.Infof("Deleting pod: %s", pod.Name) + logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name) + + // get the list of logical ports from OVN + output, stderr, err := util.RunOVNNbctlUnix("--data=bare", "--no-heading", + "--columns=name", "find", "logical_switch_port", "external_ids:pod=true") + if err != nil { + logrus.Errorf("Error in obtaining list of logical ports, "+ + "stderr: %q, err: %v", + stderr, err) + return + } + logrus.Infof("Exising Ports : %s. ", output) + existingLogicalPorts := strings.Fields(output) + for _, existingPort := range existingLogicalPorts { + if strings.Contains(existingPort, logicalPort) { + // found, delete this logical port + logrus.Infof("Deleting: %s. ", existingPort) + out, stderr, err := util.RunOVNNbctlUnix("--if-exists", "lsp-del", + existingPort) + if err != nil { + logrus.Errorf("Error in deleting pod's logical port "+ + "stdout: %q, stderr: %q err: %v", + out, stderr, err) + } else { + delete(oc.logicalPortCache, existingPort) + } + } + } + return +} + +func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, interfaceName string) (annotation string) { + var out, stderr string + var err error + var isStaticIP bool + if pod.Spec.HostNetwork { + return + } + + if !oc.logicalSwitchCache[logicalSwitch] { + oc.logicalSwitchCache[logicalSwitch] = true + } + var portName string + if interfaceName != "" { + portName = fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, interfaceName) + } else { + return + } + + logrus.Infof("Creating logical port for %s on switch %s", portName, logicalSwitch) + + if ipAddress != "" && macAddress != "" { + isStaticIP = true + } + if ipAddress != "" && macAddress == "" { + macAddress = util.GenerateMac() + isStaticIP = true + } + + if isStaticIP { + out, stderr, err = util.RunOVNNbctlUnix("--may-exist", "lsp-add", + logicalSwitch, portName, "--", "lsp-set-addresses", portName, + fmt.Sprintf("%s %s", macAddress, ipAddress), "--", "--if-exists", + "clear", "logical_switch_port", portName, "dynamic_addresses") + if err != nil { + logrus.Errorf("Failed to add logical port to switch "+ + "stdout: %q, stderr: %q (%v)", + out, stderr, err) + return + } + } else { + out, stderr, err = util.RunOVNNbctlUnix("--wait=sb", "--", + "--may-exist", "lsp-add", logicalSwitch, portName, + "--", "lsp-set-addresses", + portName, "dynamic", "--", "set", + "logical_switch_port", portName, + "external-ids:namespace="+pod.Namespace, + "external-ids:logical_switch="+logicalSwitch, + "external-ids:pod=true") + if err != nil { + logrus.Errorf("Error while creating logical port %s "+ + "stdout: %q, stderr: %q (%v)", + portName, out, stderr, err) + return + } + } + oc.logicalPortCache[portName] = logicalSwitch + gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch) + if err != nil { + logrus.Errorf("Error obtaining gateway address for switch %s: %s", logicalSwitch, err) + return + } + + count := 30 + for count > 0 { + if isStaticIP { + out, stderr, err = util.RunOVNNbctlUnix("get", + "logical_switch_port", portName, "addresses") + } else { + out, stderr, err = util.RunOVNNbctlUnix("get", + "logical_switch_port", portName, "dynamic_addresses") + } + if err == nil && out != "[]" { + break + } + if err != nil { + logrus.Errorf("Error while obtaining addresses for %s - %v", portName, + err) + return + } + time.Sleep(time.Second) + count-- + } + if count == 0 { + logrus.Errorf("Error while obtaining addresses for %s "+ + "stdout: %q, stderr: %q, (%v)", portName, out, stderr, err) + return + } + + // static addresses have format ["0a:00:00:00:00:01 192.168.1.3"], while + // dynamic addresses have format "0a:00:00:00:00:01 192.168.1.3". + outStr := strings.TrimLeft(out, `[`) + outStr = strings.TrimRight(outStr, `]`) + outStr = strings.Trim(outStr, `"`) + addresses := strings.Split(outStr, " ") + if len(addresses) != 2 { + logrus.Errorf("Error while obtaining addresses for %s", portName) + return + } + annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP) + return annotation +} + +func findLogicalSwitch(name string) bool { + // get logical switch from OVN + output, stderr, err := util.RunOVNNbctlUnix("--data=bare", "--no-heading", + "--columns=name", "find", "logical_switch", "name="+name) + if err != nil { + logrus.Errorf("Error in obtaining list of logical switch, "+ + "stderr: %q, err: %v", + stderr, err) + return false + } + + if strings.Compare(name, output) == 0 { + return true + } else { + logrus.Errorf("Error finding Switch %v", name) + return false + } +} + +func (oc *Controller) addLogicalPort(pod *kapi.Pod) { + var logicalSwitch string + var ipAddress, macAddress, interfaceName, defaultGateway string + + annotation := pod.Annotations["ovnNetwork"] + + if annotation != "" { + ovnNetObjs, err := oc.parseOvnNetworkObject(annotation) + if err != nil { + logrus.Errorf("addLogicalPort : Error Parsing OvnNetwork List") + return + } + var ovnString, outStr string + ovnString = "[" + for _, net := range ovnNetObjs { + logicalSwitch = net["name"].(string) + if _, ok := net["interface"]; ok { + interfaceName = net["interface"].(string) + } else { + interfaceName = "" + } + if _, ok := net["ipAddress"]; ok { + ipAddress = net["ipAddress"].(string) + } else { + ipAddress = "" + } + if _, ok := net["macAddress"]; ok { + macAddress = net["macAddress"].(string) + } else { + macAddress = "" + } + if _, ok := net["defaultGateway"]; ok { + defaultGateway = net["defaultGateway"].(string) + } else { + defaultGateway = "false" + } + if !findLogicalSwitch(logicalSwitch) { + return + } + if interfaceName == "" { + logrus.Errorf("Interface name must be provided") + return + } + outStr = oc.addLogicalPortWithSwitch(pod, logicalSwitch, ipAddress, macAddress, interfaceName) + if outStr == "" { + return + } + last := len(outStr) - 1 + tmpString := outStr[:last] + tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + defaultGateway + "\\\"" + tmpString += "," + "\\\"interface\\\":" + "\\\"" + interfaceName + "\\\"}" + ovnString += tmpString + ovnString += "," + } + last := len(ovnString) - 1 + ovnString = ovnString[:last] + ovnString += "]" + logrus.Debugf("ovnIfaceList - %v", ovnString) + err = oc.kube.SetAnnotationOnPod(pod, "ovnIfaceList", ovnString) + if err != nil { + logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err) + } + } +} diff --git a/internal/pkg/ovn/router.go b/internal/pkg/ovn/router.go new file mode 100644 index 0000000..d98c463 --- /dev/null +++ b/internal/pkg/ovn/router.go @@ -0,0 +1,50 @@ +package ovn + +import ( + "github.com/sirupsen/logrus" + "ovn4nfv-k8s-plugin/internal/pkg/util" +) + +func SetupMaster(name string) error { + + // Make sure br-int is created. + stdout, stderr, err := util.RunOVSVsctl("--", "--may-exist", "add-br", "br-int") + if err != nil { + logrus.Errorf("Failed to create br-int, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + // Create a single common distributed router for the cluster. + stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes") + if err != nil { + logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + // Create a logical switch called "ovn4nfv-join" that will be used to connect gateway routers to the distributed router. + // The "ovn4nfv-join" will be allocated IP addresses in the range 100.64.1.0/24. + stdout, stderr, err = util.RunOVNNbctlUnix("--may-exist", "ls-add", "ovn4nfv-join") + if err != nil { + logrus.Errorf("Failed to create logical switch called \"ovn4nfv-join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + // Connect the distributed router to "ovn4nfv-join". + routerMac, stderr, err := util.RunOVNNbctlUnix("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac") + if err != nil { + logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", name, stderr, err) + return err + } + if routerMac == "" { + routerMac = util.GenerateMac() + stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes") + if err != nil { + logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", name, stdout, stderr, err) + return err + } + } + // Connect the switch "ovn4nfv-join" to the router. + stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"") + if err != nil { + logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + return nil +} diff --git a/internal/pkg/testing/testing.go b/internal/pkg/testing/testing.go new file mode 100644 index 0000000..4c2afad --- /dev/null +++ b/internal/pkg/testing/testing.go @@ -0,0 +1,55 @@ +package testing + +import ( + "strings" + + kexec "k8s.io/utils/exec" + fakeexec "k8s.io/utils/exec/testing" + + "github.com/onsi/gomega" +) + +// ExpectedCmd contains properties that the testcase expects a called command +// to have as well as the output that the fake command should return +type ExpectedCmd struct { + // Cmd should be the command-line string of the executable name and all arguments it is expected to be called with + Cmd string + // Output is any stdout output which Cmd should produce + Output string + // Stderr is any stderr output which Cmd should produce + Stderr string + // Err is any error that should be returned for the invocation of Cmd + Err error +} + +// AddFakeCmd takes the ExpectedCmd and appends its runner function to +// a fake command action list +func AddFakeCmd(fakeCmds []fakeexec.FakeCommandAction, expected *ExpectedCmd) []fakeexec.FakeCommandAction { + return append(fakeCmds, func(cmd string, args ...string) kexec.Cmd { + parts := strings.Split(expected.Cmd, " ") + gomega.Expect(cmd).To(gomega.Equal("/fake-bin/" + parts[0])) + gomega.Expect(strings.Join(args, " ")).To(gomega.Equal(strings.Join(parts[1:], " "))) + return &fakeexec.FakeCmd{ + Argv: parts[1:], + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { + return []byte(expected.Output), expected.Err + }, + }, + RunScript: []fakeexec.FakeRunAction{ + func() ([]byte, []byte, error) { + return []byte(expected.Output), []byte(expected.Stderr), expected.Err + }, + }, + } + }) +} + +// AddFakeCmdsNoOutputNoError takes a list of commands and appends those commands +// to the expected command set. The command cannot return any output or error. +func AddFakeCmdsNoOutputNoError(fakeCmds []fakeexec.FakeCommandAction, commands []string) []fakeexec.FakeCommandAction { + for _, cmd := range commands { + fakeCmds = AddFakeCmd(fakeCmds, &ExpectedCmd{Cmd: cmd}) + } + return fakeCmds +} diff --git a/internal/pkg/util/.gitkeep b/internal/pkg/util/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/util/.gitkeep +++ /dev/null diff --git a/internal/pkg/util/net.go b/internal/pkg/util/net.go new file mode 100644 index 0000000..1ff7fbb --- /dev/null +++ b/internal/pkg/util/net.go @@ -0,0 +1,34 @@ +package util + +import ( + "fmt" + "math/big" + "math/rand" + "net" + "time" +) + +// GenerateMac generates mac address. +func GenerateMac() string { + prefix := "00:00:00" + newRand := rand.New(rand.NewSource(time.Now().UnixNano())) + mac := fmt.Sprintf("%s:%02x:%02x:%02x", prefix, newRand.Intn(255), newRand.Intn(255), newRand.Intn(255)) + return mac +} + +// NextIP returns IP incremented by 1 +func NextIP(ip net.IP) net.IP { + i := ipToInt(ip) + return intToIP(i.Add(i, big.NewInt(1))) +} + +func ipToInt(ip net.IP) *big.Int { + if v := ip.To4(); v != nil { + return big.NewInt(0).SetBytes(v) + } + return big.NewInt(0).SetBytes(ip.To16()) +} + +func intToIP(i *big.Int) net.IP { + return net.IP(i.Bytes()) +} diff --git a/internal/pkg/util/ovs.go b/internal/pkg/util/ovs.go new file mode 100644 index 0000000..22b42b9 --- /dev/null +++ b/internal/pkg/util/ovs.go @@ -0,0 +1,126 @@ +package util + +import ( + "bytes" + "fmt" + "strings" + "time" + "unicode" + + "github.com/sirupsen/logrus" + kexec "k8s.io/utils/exec" +) + +const ( + ovsCommandTimeout = 15 + ovsVsctlCommand = "ovs-vsctl" + ovsOfctlCommand = "ovs-ofctl" + ovnNbctlCommand = "ovn-nbctl" + ipCommand = "ip" +) + +// Exec runs various OVN and OVS utilities +type execHelper struct { + exec kexec.Interface + ofctlPath string + vsctlPath string + nbctlPath string + ipPath string +} + +var runner *execHelper + +// SetExec validates executable paths and saves the given exec interface +// to be used for running various OVS and OVN utilites +func SetExec(exec kexec.Interface) error { + var err error + + runner = &execHelper{exec: exec} + runner.ofctlPath, err = exec.LookPath(ovsOfctlCommand) + if err != nil { + return err + } + runner.vsctlPath, err = exec.LookPath(ovsVsctlCommand) + if err != nil { + return err + } + runner.nbctlPath, err = exec.LookPath(ovnNbctlCommand) + if err != nil { + return err + } + runner.ipPath, err = exec.LookPath(ipCommand) + if err != nil { + return err + } + return nil +} + +// Run the ovn-ctl command and retry if "Connection refused" +// poll waitng for service to become available +func runOVNretry(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) { + + retriesLeft := 200 + for { + stdout, stderr, err := run(cmdPath, args...) + if err == nil { + return stdout, stderr, err + } + + // Connection refused + // Master may not be up so keep trying + if strings.Contains(stderr.String(), "Connection refused") { + if retriesLeft == 0 { + return stdout, stderr, err + } + retriesLeft-- + time.Sleep(2 * time.Second) + } else { + // Some other problem for caller to handle + return stdout, stderr, err + } + } +} + +func run(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + cmd := runner.exec.Command(cmdPath, args...) + cmd.SetStdout(stdout) + cmd.SetStderr(stderr) + logrus.Debugf("exec: %s %s", cmdPath, strings.Join(args, " ")) + err := cmd.Run() + if err != nil { + logrus.Debugf("exec: %s %s => %v", cmdPath, strings.Join(args, " "), err) + } + return stdout, stderr, err +} + +// RunOVSOfctl runs a command via ovs-ofctl. +func RunOVSOfctl(args ...string) (string, string, error) { + stdout, stderr, err := run(runner.ofctlPath, args...) + return strings.Trim(stdout.String(), "\" \n"), stderr.String(), err +} + +// RunOVSVsctl runs a command via ovs-vsctl. +func RunOVSVsctl(args ...string) (string, string, error) { + cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} + cmdArgs = append(cmdArgs, args...) + stdout, stderr, err := run(runner.vsctlPath, cmdArgs...) + return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err +} + +// RunOVNNbctlUnix runs command via ovn-nbctl, with ovn-nbctl using the unix +// domain sockets to connect to the ovsdb-server backing the OVN NB database. +func RunOVNNbctlUnix(args ...string) (string, string, error) { + cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} + cmdArgs = append(cmdArgs, args...) + stdout, stderr, err := runOVNretry(runner.nbctlPath, cmdArgs...) + return strings.Trim(strings.TrimFunc(stdout.String(), unicode.IsSpace), "\""), + stderr.String(), err +} + +// RunIP runs a command via the iproute2 "ip" utility +func RunIP(args ...string) (string, string, error) { + stdout, stderr, err := run(runner.ipPath, args...) + return strings.TrimSpace(stdout.String()), stderr.String(), err +} @@ -0,0 +1,25 @@ +# SPDX-license-identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2018 +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +[tox] +minversion = 1.6 +skipsdist = True +envlist = checker + +[testenv] +passenv = http_proxy HTTP_PROXY https_proxy HTTPS_PROXY no_proxy NO_PROXY +usedevelop = False +install_command = pip install {opts} {packages} + +[testenv:checker] +deps = + rstcheck +whitelist_externals = bash +commands = bash -c "find {toxinidir}/ \ + -name \*.rst -type f -print0 | xargs -0 rstcheck --report warning" |