From 3de63ee756f9d7c0a4524b40a89e92b918a9249f Mon Sep 17 00:00:00 2001 From: Kuralamudhan Ramakrishnan Date: Tue, 21 Apr 2020 17:19:34 +0000 Subject: Adding cnishim and cniserver - inspired from ovn-kubernetes and sdn openshift - cniserver & cnishim concepts - removed cni binary to depend on the host ovs binary installation - encapsulated all the binaries within the ovn and ovs containers - ovn4nfv-k8s cni server run along with nfn-agent - cnishim act as the httpclient and cniserver as httpservers - cnishim wrap all the cni commands to cniserver - cniserver do the actual network pumping work and send result back to cnishim - cnishim print the results as per the cni spec requirement - currently supports only debian installation for ovn daemon - support for debian kernel installation - Consolidated all yaml into single ovn4nfv-k8s-plugin Signed-off-by: Kuralamudhan Ramakrishnan Change-Id: I1e2b114d90f717baa2ee94ff379c849d73b2754e --- internal/pkg/cniserver/cni.go | 295 ++++++++++++++++++++++++++++++++++++ internal/pkg/cniserver/cniserver.go | 235 ++++++++++++++++++++++++++++ 2 files changed, 530 insertions(+) create mode 100644 internal/pkg/cniserver/cni.go create mode 100644 internal/pkg/cniserver/cniserver.go (limited to 'internal/pkg/cniserver') diff --git a/internal/pkg/cniserver/cni.go b/internal/pkg/cniserver/cni.go new file mode 100644 index 0000000..2c91f04 --- /dev/null +++ b/internal/pkg/cniserver/cni.go @@ -0,0 +1,295 @@ +package cniserver + +import ( + "encoding/json" + "k8s.io/apimachinery/pkg/util/wait" + "fmt" + "net" + "strconv" + "net/http" + "time" + "k8s.io/klog" + + "k8s.io/client-go/kubernetes" + "github.com/containernetworking/cni/pkg/types" + "github.com/containernetworking/cni/pkg/types/current" + "ovn4nfv-k8s-plugin/internal/pkg/kube" + "k8s.io/apimachinery/pkg/api/errors" + "ovn4nfv-k8s-plugin/internal/pkg/config" + "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app" +) + +const ( + ovn4nfvAnnotationTag = "k8s.plugin.opnfv.org/ovnInterfaces" +) + +func parseOvnNetworkObject(ovnnetwork string) ([]map[string]string, error) { + var ovnNet []map[string]string + + if ovnnetwork == "" { + return nil, fmt.Errorf("parseOvnNetworkObject:error") + } + + if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil { + return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork) + } + + return ovnNet, nil +} + +func mergeWithResult(srcObj, dstObj types.Result) (types.Result, error) { + + if dstObj == nil { + return srcObj, nil + } + src, err := current.NewResultFromResult(srcObj) + if err != nil { + return nil, fmt.Errorf("Couldn't convert old result to current version: %v", err) + } + dst, err := current.NewResultFromResult(dstObj) + if err != nil { + return nil, fmt.Errorf("Couldn't convert old result to current version: %v", err) + } + + ifacesLength := len(dst.Interfaces) + + for _, iface := range src.Interfaces { + dst.Interfaces = append(dst.Interfaces, iface) + } + for _, ip := range src.IPs { + if ip.Interface != nil && *(ip.Interface) != -1 { + ip.Interface = current.Int(*(ip.Interface) + ifacesLength) + } + dst.IPs = append(dst.IPs, ip) + } + for _, route := range src.Routes { + dst.Routes = append(dst.Routes, route) + } + + for _, ns := range src.DNS.Nameservers { + dst.DNS.Nameservers = append(dst.DNS.Nameservers, ns) + } + for _, s := range src.DNS.Search { + dst.DNS.Search = append(dst.DNS.Search, s) + } + for _, opt := range src.DNS.Options { + dst.DNS.Options = append(dst.DNS.Options, opt) + } + // TODO: what about DNS.domain? + return dst, nil +} + +func prettyPrint(i interface{}) string { + s, _ := json.MarshalIndent(i, "", "\t") + return string(s) +} + +func isNotFoundError(err error) bool { + statusErr, ok := err.(*errors.StatusError) + return ok && statusErr.Status().Code == http.StatusNotFound +} + +func (cr *CNIServerRequest) addMultipleInterfaces(ovnAnnotation, namespace, podName string) types.Result { + klog.Infof("ovn4nfvk8s-cni: addMultipleInterfaces ") + var ovnAnnotatedMap []map[string]string + ovnAnnotatedMap, err := parseOvnNetworkObject(ovnAnnotation) + if err != nil { + klog.Errorf("addLogicalPort : Error Parsing Ovn Network List %v %v", ovnAnnotatedMap, err) + return nil + } + if namespace == "" || podName == "" { + klog.Errorf("required CNI variable missing") + return nil + } + var interfacesArray []*current.Interface + var index int + var result *current.Result + var dstResult types.Result + for _, ovnNet := range ovnAnnotatedMap { + ipAddress := ovnNet["ip_address"] + macAddress := ovnNet["mac_address"] + gatewayIP := ovnNet["gateway_ip"] + defaultGateway := ovnNet["defaultGateway"] + + if ipAddress == "" || macAddress == "" { + klog.Errorf("failed in pod annotation key extract") + return nil + } + + index++ + interfaceName := ovnNet["interface"] + if interfaceName == "" { + klog.Errorf("addMultipleInterfaces: interface can't be null") + return nil + } + klog.Infof("addMultipleInterfaces: ipAddress %v %v", ipAddress, interfaceName) + interfacesArray, err = app.ConfigureInterface(cr.Netns, cr.SandboxID, cr.IfName, namespace, podName, macAddress, ipAddress, gatewayIP, interfaceName, defaultGateway, index, config.Default.MTU) + if err != nil { + klog.Errorf("Failed to configure interface in pod: %v", err) + return nil + } + addr, addrNet, err := net.ParseCIDR(ipAddress) + if err != nil { + klog.Errorf("failed to parse IP address %q: %v", ipAddress, err) + return nil + } + ipVersion := "6" + if addr.To4() != nil { + ipVersion = "4" + } + var routes types.Route + if defaultGateway == "true" { + defaultAddr, defaultAddrNet, _ := net.ParseCIDR("0.0.0.0/0") + routes = types.Route{Dst: net.IPNet{IP: defaultAddr, Mask: defaultAddrNet.Mask}, GW: net.ParseIP(gatewayIP)} + + result = ¤t.Result{ + Interfaces: interfacesArray, + IPs: []*current.IPConfig{ + { + Version: ipVersion, + Interface: current.Int(1), + Address: net.IPNet{IP: addr, Mask: addrNet.Mask}, + Gateway: net.ParseIP(gatewayIP), + }, + }, + Routes: []*types.Route{&routes}, + } + } else { + result = ¤t.Result{ + Interfaces: interfacesArray, + IPs: []*current.IPConfig{ + { + Version: ipVersion, + Interface: current.Int(1), + Address: net.IPNet{IP: addr, Mask: addrNet.Mask}, + Gateway: net.ParseIP(gatewayIP), + }, + }, + } + + } + // Build the result structure to pass back to the runtime + dstResult, err = mergeWithResult(types.Result(result), dstResult) + if err != nil { + klog.Errorf("Failed to merge results: %v", err) + return nil + } + } + klog.Infof("addMultipleInterfaces: results %s", prettyPrint(dstResult)) + return dstResult +} + +func (cr *CNIServerRequest) addRoutes(ovnAnnotation string, dstResult types.Result) types.Result { + klog.Infof("ovn4nfvk8s-cni: addRoutes ") + var ovnAnnotatedMap []map[string]string + ovnAnnotatedMap, err := parseOvnNetworkObject(ovnAnnotation) + if err != nil { + klog.Errorf("addLogicalPort : Error Parsing Ovn Route List %v", err) + return nil + } + + var result types.Result + var routes []*types.Route + for _, ovnNet := range ovnAnnotatedMap { + dst := ovnNet["dst"] + gw := ovnNet["gw"] + dev := ovnNet["dev"] + if dst == "" || gw == "" || dev == "" { + klog.Errorf("failed in pod annotation key extract") + return nil + } + err = app.ConfigureRoute(cr.Netns, dst, gw, dev) + if err != nil { + klog.Errorf("Failed to configure interface in pod: %v", err) + return nil + } + dstAddr, dstAddrNet, _ := net.ParseCIDR(dst) + routes = append(routes, &types.Route{ + Dst: net.IPNet{IP: dstAddr, Mask: dstAddrNet.Mask}, + GW: net.ParseIP(gw), + }) + } + + result = ¤t.Result{ + Routes: routes, + } + // Build the result structure to pass back to the runtime + dstResult, err = mergeWithResult(result, dstResult) + if err != nil { + klog.Errorf("Failed to merge results: %v", err) + return nil + } + klog.Infof("addRoutes: results %s", prettyPrint(dstResult)) + return dstResult +} + +func (cr *CNIServerRequest) cmdAdd(kclient kubernetes.Interface) ([]byte, error) { + klog.Infof("ovn4nfvk8s-cni: cmdAdd") + namespace := cr.PodNamespace + podname := cr.PodName + if namespace == "" || podname == "" { + return nil, fmt.Errorf("required CNI variable missing") + } + klog.Infof("ovn4nfvk8s-cni: cmdAdd for pod podname:%s and namespace:%s", podname, namespace) + kubecli := &kube.Kube{KClient: kclient} + // Get the IP address and MAC address from the API server. + var annotationBackoff = wait.Backoff{Duration: 1 * time.Second, Steps: 14, Factor: 1.5, Jitter: 0.1} + var annotation map[string]string + var err error + if err = wait.ExponentialBackoff(annotationBackoff, func() (bool, error) { + annotation, err = kubecli.GetAnnotationsOnPod(namespace, podname) + if err != nil { + if isNotFoundError(err) { + return false, fmt.Errorf("Error - pod not found - %v", err) + } + klog.Infof("ovn4nfvk8s-cni: cmdAdd Warning - Error while obtaining pod annotations - %v", err) + return false,nil + } + if _, ok := annotation[ovn4nfvAnnotationTag]; ok { + return true, nil + } + return false, nil + }); err != nil { + return nil, fmt.Errorf("failed to get pod annotation - %v", err) + } + + klog.Infof("ovn4nfvk8s-cni: cmdAdd Annotation Found ") + ovnAnnotation, ok := annotation[ovn4nfvAnnotationTag] + if !ok { + return nil, fmt.Errorf("Error while obtaining pod annotations") + } + result := cr.addMultipleInterfaces(ovnAnnotation, namespace, podname) + //Add Routes to the pod if annotation found for routes + ovnRouteAnnotation, ok := annotation["ovnNetworkRoutes"] + if ok { + klog.Infof("ovn4nfvk8s-cni: ovnNetworkRoutes Annotation Found %+v", ovnRouteAnnotation) + result = cr.addRoutes(ovnRouteAnnotation, result) + } + + if result == nil { + klog.Errorf("result struct the ovn4nfv-k8s-plugin cniserver") + return nil, fmt.Errorf("result is nil from cni server response") + } + + responseBytes, err := json.Marshal(result) + if err != nil { + return nil, fmt.Errorf("failed to marshal pod request response: %v", err) + } + + return responseBytes, nil +} + +func (cr *CNIServerRequest) cmdDel() ([]byte, error) { + klog.Infof("cmdDel ") + for i := 0; i < 10; i++ { + ifaceName := cr.SandboxID[:14] + strconv.Itoa(i) + done, err := app.PlatformSpecificCleanup(ifaceName) + if err != nil { + klog.Errorf("Teardown error: %v", err) + } + if done { + break + } + } + return []byte{}, nil +} diff --git a/internal/pkg/cniserver/cniserver.go b/internal/pkg/cniserver/cniserver.go new file mode 100644 index 0000000..eaa7105 --- /dev/null +++ b/internal/pkg/cniserver/cniserver.go @@ -0,0 +1,235 @@ +package cniserver + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + "os" + "net" + "path/filepath" + "syscall" + "k8s.io/klog" + + "github.com/containernetworking/cni/pkg/types" + "github.com/gorilla/mux" + "k8s.io/client-go/kubernetes" + "ovn4nfv-k8s-plugin/internal/pkg/config" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilwait "k8s.io/apimachinery/pkg/util/wait" +) + +const CNIServerRunDir string = "/var/run/ovn4nfv-k8s-plugin/cniserver" +const CNIServerSocketName string = "ovn4nfv-k8s-plugin-cni-server.sock" +const CNIServerSocketPath string = CNIServerRunDir + "/" + CNIServerSocketName + + +type CNIcommand string + +const CNIAdd CNIcommand = "ADD" +const CNIUpdate CNIcommand = "UPDATE" +const CNIDel CNIcommand = "DEL" + +type CNIServerRequest struct { + Command CNIcommand + PodNamespace string + PodName string + SandboxID string + Netns string + IfName string + CNIConf *types.NetConf +} + +type cniServerRequestFunc func(request *CNIServerRequest, k8sclient kubernetes.Interface) ([]byte, error) + +type CNIEndpointRequest struct { + ArgEnv map[string]string `json:"env,omitempty"` + NetConfig []byte `json:"config,omitempty"` +} +type CNIServer struct { + http.Server + requestFunc cniServerRequestFunc + serverrundir string + k8sclient kubernetes.Interface +} + +func NewCNIServer(serverRunSir string, k8sclient kubernetes.Interface) *CNIServer { + klog.Infof("Setting up CNI server in nfn-agent") + if len(serverRunSir) == 0 { + serverRunSir = CNIServerRunDir + } + + router := mux.NewRouter() + cs := &CNIServer{ + Server: http.Server{ + Handler: router, + }, + serverrundir: serverRunSir, + k8sclient: k8sclient, + } + router.NotFoundHandler = http.HandlerFunc(http.NotFound) + router.HandleFunc("/", cs.handleCNIShimRequest).Methods("POST") + return cs +} + +func loadCNIShimArgs(env map[string]string) (map[string]string, error) { + cnishimArgs, ok := env["CNI_ARGS"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_ARGS: '%s'", env) + } + + mapArgs := make(map[string]string) + for _, arg := range strings.Split(cnishimArgs, ";") { + parts := strings.Split(arg, "=") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid CNI_ARG from cnishim '%s'", arg) + } + mapArgs[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) + } + return mapArgs, nil +} + +func loadCNIRequestToCNIServer(r *CNIEndpointRequest) (*CNIServerRequest, error) { + cmd, ok := r.ArgEnv["CNI_COMMAND"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_COMMAND") + } + + cnishimreq := &CNIServerRequest{ + Command: CNIcommand(cmd), + } + + cnishimreq.SandboxID, ok = r.ArgEnv["CNI_CONTAINERID"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_CONTAINERID") + } + + cnishimreq.Netns, ok = r.ArgEnv["CNI_NETNS"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_NETNS") + } + + cnishimreq.IfName, ok = r.ArgEnv["CNI_IFNAME"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_IFNAME") + } + + cnishimArgs, err := loadCNIShimArgs(r.ArgEnv) + if err != nil { + return nil, err + } + + cnishimreq.PodNamespace, ok = cnishimArgs["K8S_POD_NAMESPACE"] + if !ok { + return nil, fmt.Errorf("cnishim req missing K8S_POD_NAMESPACE") + } + + cnishimreq.PodName, ok = cnishimArgs["K8S_POD_NAME"] + if !ok { + return nil, fmt.Errorf("cnishim req missing K8S_POD_NAME") + } + + netconf, err := config.ConfigureNetConf(r.NetConfig) + if err != nil { + return nil, fmt.Errorf("cnishim req CNI arg configuration failed:%v",err) + } + + cnishimreq.CNIConf = netconf + return cnishimreq, nil +} + +func (cs *CNIServer) handleCNIShimRequest(w http.ResponseWriter, r *http.Request) { + var cr CNIEndpointRequest + b, _ := ioutil.ReadAll(r.Body) + if err := json.Unmarshal(b, &cr); err != nil { + http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) + return + } + + req, err := loadCNIRequestToCNIServer(&cr) + if err != nil { + http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) + return + } + + klog.Infof("Waiting for %s result for CNI server pod %s/%s", req.Command, req.PodNamespace, req.PodName) + result, err := cs.requestFunc(req, cs.k8sclient) + if err != nil { + http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) + } else { + w.Header().Set("Content-Type", "application/json") + if _, err := w.Write(result); err != nil { + klog.Warningf("Error writing %s HTTP response: %v", req.Command, err) + } + } +} + +func HandleCNIcommandRequest(request *CNIServerRequest, k8sclient kubernetes.Interface) ([]byte, error) { + var result []byte + var err error + klog.Infof("[PodNamespace:%s/PodName:%s] dispatching pod network request %v", request.PodNamespace, request.PodName, request) + klog.Infof("k8sclient %s", fmt.Sprintf("%v",k8sclient)) + switch request.Command { + case CNIAdd: + result, err = request.cmdAdd(k8sclient) + case CNIDel: + result, err = request.cmdDel() + default: + } + klog.Infof("[PodNamespace:%s/PodName:%s] CNI request %v, result %q, err %v", request.PodNamespace, request.PodName, request, string(result), err) + if err != nil { + return nil, fmt.Errorf("[PodNamespace:%s/PodName:%s] CNI request %v %v", request.PodNamespace, request.PodName, request, err) + } + return result, nil +} + +func (cs *CNIServer) Start(requestFunc cniServerRequestFunc) error { + if requestFunc == nil { + return fmt.Errorf("no CNI request handler") + } + cs.requestFunc = requestFunc + socketPath := filepath.Join(cs.serverrundir, CNIServerSocketName) + if err := os.RemoveAll(cs.serverrundir); err != nil && !os.IsNotExist(err) { + info, err := os.Stat(cs.serverrundir) + if err != nil { + return fmt.Errorf("failed to stat old cni server info socket directory %s: %v", cs.serverrundir, err) + } + tmp := info.Sys() + statt, ok := tmp.(*syscall.Stat_t) + if !ok { + return fmt.Errorf("failed to read CNI Server info socket directory stat info: %T", tmp) + } + if statt.Uid != 0 { + return fmt.Errorf("insecure owner of CNI Server info socket directory %s: %v", cs.serverrundir, statt.Uid) + } + + if info.Mode()&0777 != 0700 { + return fmt.Errorf("insecure permissions on CNI Server info socket directory %s: %v", cs.serverrundir, info.Mode()) + } + + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove old CNI Server info socket %s: %v", socketPath, err) + } + } + if err := os.MkdirAll(cs.serverrundir, 0700); err != nil { + return fmt.Errorf("failed to create CNI Server info socket directory %s: %v", cs.serverrundir, err) + } + + unixListener, err := net.Listen("unix", socketPath) + if err != nil { + return fmt.Errorf("failed to listen on CNI Server info socket: %v", err) + } + if err := os.Chmod(socketPath, 0600); err != nil { + unixListener.Close() + return fmt.Errorf("failed to set CNI Server info socket mode: %v", err) + } + + cs.SetKeepAlivesEnabled(false) + go utilwait.Forever(func() { + if err := cs.Serve(unixListener); err != nil { + utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err)) + } + }, 0) + return nil +} -- cgit 1.2.3-korg