diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/pkg/cniserver/cni.go | 76 | ||||
-rw-r--r-- | internal/pkg/cniserver/cniserver.go | 101 | ||||
-rw-r--r-- | internal/pkg/config/config.go | 30 | ||||
-rw-r--r-- | internal/pkg/network/iptables.go | 124 | ||||
-rw-r--r-- | internal/pkg/nfnNotify/proto/nfn.pb.go | 75 | ||||
-rw-r--r-- | internal/pkg/nfnNotify/proto/nfn.proto | 2 | ||||
-rw-r--r-- | internal/pkg/nfnNotify/server.go | 13 | ||||
-rw-r--r-- | internal/pkg/node/node.go | 31 | ||||
-rw-r--r-- | internal/pkg/ovn/ovn.go | 142 |
9 files changed, 460 insertions, 134 deletions
diff --git a/internal/pkg/cniserver/cni.go b/internal/pkg/cniserver/cni.go index 2c91f04..95a41d5 100644 --- a/internal/pkg/cniserver/cni.go +++ b/internal/pkg/cniserver/cni.go @@ -1,26 +1,28 @@ package cniserver import ( - "encoding/json" - "k8s.io/apimachinery/pkg/util/wait" - "fmt" - "net" - "strconv" - "net/http" - "time" - "k8s.io/klog" + "encoding/json" + "fmt" + "net" + "net/http" + "strconv" + "time" - "k8s.io/client-go/kubernetes" - "github.com/containernetworking/cni/pkg/types" - "github.com/containernetworking/cni/pkg/types/current" - "ovn4nfv-k8s-plugin/internal/pkg/kube" - "k8s.io/apimachinery/pkg/api/errors" - "ovn4nfv-k8s-plugin/internal/pkg/config" - "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog" + + "ovn4nfv-k8s-plugin/cmd/ovn4nfvk8s-cni/app" + "ovn4nfv-k8s-plugin/internal/pkg/config" + "ovn4nfv-k8s-plugin/internal/pkg/kube" + + "github.com/containernetworking/cni/pkg/types" + "github.com/containernetworking/cni/pkg/types/current" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/kubernetes" ) const ( - ovn4nfvAnnotationTag = "k8s.plugin.opnfv.org/ovnInterfaces" + ovn4nfvAnnotationTag = "k8s.plugin.opnfv.org/ovnInterfaces" ) func parseOvnNetworkObject(ovnnetwork string) ([]map[string]string, error) { @@ -85,12 +87,12 @@ func prettyPrint(i interface{}) string { } func isNotFoundError(err error) bool { - statusErr, ok := err.(*errors.StatusError) - return ok && statusErr.Status().Code == http.StatusNotFound + statusErr, ok := err.(*errors.StatusError) + return ok && statusErr.Status().Code == http.StatusNotFound } func (cr *CNIServerRequest) addMultipleInterfaces(ovnAnnotation, namespace, podName string) types.Result { - klog.Infof("ovn4nfvk8s-cni: addMultipleInterfaces ") + klog.Infof("ovn4nfvk8s-cni: addMultipleInterfaces ") var ovnAnnotatedMap []map[string]string ovnAnnotatedMap, err := parseOvnNetworkObject(ovnAnnotation) if err != nil { @@ -224,26 +226,26 @@ func (cr *CNIServerRequest) addRoutes(ovnAnnotation string, dstResult types.Resu } func (cr *CNIServerRequest) cmdAdd(kclient kubernetes.Interface) ([]byte, error) { - klog.Infof("ovn4nfvk8s-cni: cmdAdd") + klog.Infof("ovn4nfvk8s-cni: cmdAdd") namespace := cr.PodNamespace - podname := cr.PodName + podname := cr.PodName if namespace == "" || podname == "" { return nil, fmt.Errorf("required CNI variable missing") } - klog.Infof("ovn4nfvk8s-cni: cmdAdd for pod podname:%s and namespace:%s", podname, namespace) + klog.Infof("ovn4nfvk8s-cni: cmdAdd for pod podname:%s and namespace:%s", podname, namespace) kubecli := &kube.Kube{KClient: kclient} // Get the IP address and MAC address from the API server. var annotationBackoff = wait.Backoff{Duration: 1 * time.Second, Steps: 14, Factor: 1.5, Jitter: 0.1} var annotation map[string]string - var err error + var err error if err = wait.ExponentialBackoff(annotationBackoff, func() (bool, error) { annotation, err = kubecli.GetAnnotationsOnPod(namespace, podname) if err != nil { - if isNotFoundError(err) { - return false, fmt.Errorf("Error - pod not found - %v", err) - } - klog.Infof("ovn4nfvk8s-cni: cmdAdd Warning - Error while obtaining pod annotations - %v", err) - return false,nil + if isNotFoundError(err) { + return false, fmt.Errorf("Error - pod not found - %v", err) + } + klog.Infof("ovn4nfvk8s-cni: cmdAdd Warning - Error while obtaining pod annotations - %v", err) + return false, nil } if _, ok := annotation[ovn4nfvAnnotationTag]; ok { return true, nil @@ -258,7 +260,7 @@ func (cr *CNIServerRequest) cmdAdd(kclient kubernetes.Interface) ([]byte, error) if !ok { return nil, fmt.Errorf("Error while obtaining pod annotations") } - result := cr.addMultipleInterfaces(ovnAnnotation, namespace, podname) + result := cr.addMultipleInterfaces(ovnAnnotation, namespace, podname) //Add Routes to the pod if annotation found for routes ovnRouteAnnotation, ok := annotation["ovnNetworkRoutes"] if ok { @@ -266,12 +268,12 @@ func (cr *CNIServerRequest) cmdAdd(kclient kubernetes.Interface) ([]byte, error) result = cr.addRoutes(ovnRouteAnnotation, result) } - if result == nil { - klog.Errorf("result struct the ovn4nfv-k8s-plugin cniserver") - return nil, fmt.Errorf("result is nil from cni server response") - } + if result == nil { + klog.Errorf("result struct the ovn4nfv-k8s-plugin cniserver") + return nil, fmt.Errorf("result is nil from cni server response") + } - responseBytes, err := json.Marshal(result) + responseBytes, err := json.Marshal(result) if err != nil { return nil, fmt.Errorf("failed to marshal pod request response: %v", err) } @@ -280,8 +282,8 @@ func (cr *CNIServerRequest) cmdAdd(kclient kubernetes.Interface) ([]byte, error) } func (cr *CNIServerRequest) cmdDel() ([]byte, error) { - klog.Infof("cmdDel ") - for i := 0; i < 10; i++ { + klog.Infof("cmdDel ") + for i := 0; i < 10; i++ { ifaceName := cr.SandboxID[:14] + strconv.Itoa(i) done, err := app.PlatformSpecificCleanup(ifaceName) if err != nil { @@ -291,5 +293,5 @@ func (cr *CNIServerRequest) cmdDel() ([]byte, error) { break } } - return []byte{}, nil + return []byte{}, nil } diff --git a/internal/pkg/cniserver/cniserver.go b/internal/pkg/cniserver/cniserver.go index eaa7105..7e55767 100644 --- a/internal/pkg/cniserver/cniserver.go +++ b/internal/pkg/cniserver/cniserver.go @@ -4,27 +4,28 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net" "net/http" - "strings" - "os" - "net" - "path/filepath" - "syscall" - "k8s.io/klog" + "os" + "path/filepath" + "strings" + "syscall" + + "k8s.io/klog" + + "ovn4nfv-k8s-plugin/internal/pkg/config" "github.com/containernetworking/cni/pkg/types" "github.com/gorilla/mux" - "k8s.io/client-go/kubernetes" - "ovn4nfv-k8s-plugin/internal/pkg/config" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" ) const CNIServerRunDir string = "/var/run/ovn4nfv-k8s-plugin/cniserver" const CNIServerSocketName string = "ovn4nfv-k8s-plugin-cni-server.sock" const CNIServerSocketPath string = CNIServerRunDir + "/" + CNIServerSocketName - type CNIcommand string const CNIAdd CNIcommand = "ADD" @@ -32,13 +33,13 @@ const CNIUpdate CNIcommand = "UPDATE" const CNIDel CNIcommand = "DEL" type CNIServerRequest struct { - Command CNIcommand + Command CNIcommand PodNamespace string - PodName string - SandboxID string - Netns string - IfName string - CNIConf *types.NetConf + PodName string + SandboxID string + Netns string + IfName string + CNIConf *types.NetConf } type cniServerRequestFunc func(request *CNIServerRequest, k8sclient kubernetes.Interface) ([]byte, error) @@ -51,7 +52,7 @@ type CNIServer struct { http.Server requestFunc cniServerRequestFunc serverrundir string - k8sclient kubernetes.Interface + k8sclient kubernetes.Interface } func NewCNIServer(serverRunSir string, k8sclient kubernetes.Interface) *CNIServer { @@ -66,7 +67,7 @@ func NewCNIServer(serverRunSir string, k8sclient kubernetes.Interface) *CNIServe Handler: router, }, serverrundir: serverRunSir, - k8sclient: k8sclient, + k8sclient: k8sclient, } router.NotFoundHandler = http.HandlerFunc(http.NotFound) router.HandleFunc("/", cs.handleCNIShimRequest).Methods("POST") @@ -100,42 +101,42 @@ func loadCNIRequestToCNIServer(r *CNIEndpointRequest) (*CNIServerRequest, error) Command: CNIcommand(cmd), } - cnishimreq.SandboxID, ok = r.ArgEnv["CNI_CONTAINERID"] - if !ok { - return nil, fmt.Errorf("cnishim req missing CNI_CONTAINERID") - } + cnishimreq.SandboxID, ok = r.ArgEnv["CNI_CONTAINERID"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_CONTAINERID") + } - cnishimreq.Netns, ok = r.ArgEnv["CNI_NETNS"] - if !ok { - return nil, fmt.Errorf("cnishim req missing CNI_NETNS") - } + cnishimreq.Netns, ok = r.ArgEnv["CNI_NETNS"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_NETNS") + } - cnishimreq.IfName, ok = r.ArgEnv["CNI_IFNAME"] - if !ok { - return nil, fmt.Errorf("cnishim req missing CNI_IFNAME") - } + cnishimreq.IfName, ok = r.ArgEnv["CNI_IFNAME"] + if !ok { + return nil, fmt.Errorf("cnishim req missing CNI_IFNAME") + } - cnishimArgs, err := loadCNIShimArgs(r.ArgEnv) - if err != nil { - return nil, err - } + cnishimArgs, err := loadCNIShimArgs(r.ArgEnv) + if err != nil { + return nil, err + } - cnishimreq.PodNamespace, ok = cnishimArgs["K8S_POD_NAMESPACE"] - if !ok { - return nil, fmt.Errorf("cnishim req missing K8S_POD_NAMESPACE") - } + cnishimreq.PodNamespace, ok = cnishimArgs["K8S_POD_NAMESPACE"] + if !ok { + return nil, fmt.Errorf("cnishim req missing K8S_POD_NAMESPACE") + } - cnishimreq.PodName, ok = cnishimArgs["K8S_POD_NAME"] + cnishimreq.PodName, ok = cnishimArgs["K8S_POD_NAME"] if !ok { return nil, fmt.Errorf("cnishim req missing K8S_POD_NAME") } - netconf, err := config.ConfigureNetConf(r.NetConfig) - if err != nil { - return nil, fmt.Errorf("cnishim req CNI arg configuration failed:%v",err) - } + netconf, err := config.ConfigureNetConf(r.NetConfig) + if err != nil { + return nil, fmt.Errorf("cnishim req CNI arg configuration failed:%v", err) + } - cnishimreq.CNIConf = netconf + cnishimreq.CNIConf = netconf return cnishimreq, nil } @@ -160,16 +161,16 @@ func (cs *CNIServer) handleCNIShimRequest(w http.ResponseWriter, r *http.Request } else { w.Header().Set("Content-Type", "application/json") if _, err := w.Write(result); err != nil { - klog.Warningf("Error writing %s HTTP response: %v", req.Command, err) + klog.Warningf("Error writing %s HTTP response: %v", req.Command, err) } - } + } } func HandleCNIcommandRequest(request *CNIServerRequest, k8sclient kubernetes.Interface) ([]byte, error) { - var result []byte + var result []byte var err error - klog.Infof("[PodNamespace:%s/PodName:%s] dispatching pod network request %v", request.PodNamespace, request.PodName, request) - klog.Infof("k8sclient %s", fmt.Sprintf("%v",k8sclient)) + klog.Infof("[PodNamespace:%s/PodName:%s] dispatching pod network request %v", request.PodNamespace, request.PodName, request) + klog.Infof("k8sclient %s", fmt.Sprintf("%v", k8sclient)) switch request.Command { case CNIAdd: result, err = request.cmdAdd(k8sclient) @@ -231,5 +232,5 @@ func (cs *CNIServer) Start(requestFunc cniServerRequestFunc) error { utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err)) } }, 0) - return nil + return nil } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index e9ad3e1..b8ab825 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -1,16 +1,18 @@ package config import ( - "encoding/json" + "crypto/sha1" + "encoding/hex" + "encoding/json" "fmt" "os" "path/filepath" "reflect" + "github.com/containernetworking/cni/pkg/types" + "github.com/containernetworking/cni/pkg/version" "github.com/sirupsen/logrus" "github.com/urfave/cli" - "github.com/containernetworking/cni/pkg/types" - "github.com/containernetworking/cni/pkg/version" gcfg "gopkg.in/gcfg.v1" "k8s.io/client-go/kubernetes" @@ -288,15 +290,23 @@ func NewClientset(conf *KubernetesConfig) (*kubernetes.Clientset, error) { } func ConfigureNetConf(bytes []byte) (*types.NetConf, error) { - conf := &types.NetConf{} + conf := &types.NetConf{} if err := json.Unmarshal(bytes, conf); err != nil { return nil, fmt.Errorf("failed to load netconf: %v", err) } - if conf.RawPrevResult != nil { - if err := version.ParsePrevResult(conf); err != nil { - return nil, err - } - } - return conf, nil + if conf.RawPrevResult != nil { + if err := version.ParsePrevResult(conf); err != nil { + return nil, err + } + } + return conf, nil +} + +func GetNodeIntfName(node string) string { + h := sha1.New() + h.Write([]byte(node)) + bs := h.Sum(nil) + encodednodeStr := hex.EncodeToString(bs) + return fmt.Sprintf("ovn4nfv0-%s", encodednodeStr[:6]) } diff --git a/internal/pkg/network/iptables.go b/internal/pkg/network/iptables.go new file mode 100644 index 0000000..6e71b3f --- /dev/null +++ b/internal/pkg/network/iptables.go @@ -0,0 +1,124 @@ +package network + +import ( + "fmt" + "strings" + + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + + "github.com/coreos/go-iptables/iptables" +) + +var log = logf.Log.WithName("iptables") + +type IPTables interface { + AppendUnique(table string, chain string, rulespec ...string) error + Delete(table string, chain string, rulespec ...string) error + Exists(table string, chain string, rulespec ...string) (bool, error) +} + +type IPTablesRule struct { + table string + chain string + rulespec []string +} + +func MasqRules(ifname string) []IPTablesRule { + return []IPTablesRule{ + // This rule makes sure ifname is SNAT + {"nat", "POSTROUTING", []string{"-o", ifname, "-j", "MASQUERADE"}}, + } +} + +func ForwardRules(ovnNetwork string) []IPTablesRule { + return []IPTablesRule{ + // These rules allow traffic to be forwarded if it is to or from the ovn network range. + {"filter", "FORWARD", []string{"-s", ovnNetwork, "-j", "ACCEPT"}}, + {"filter", "FORWARD", []string{"-d", ovnNetwork, "-j", "ACCEPT"}}, + } +} + +func ipTablesRulesExist(ipt IPTables, rules []IPTablesRule) (bool, error) { + for _, rule := range rules { + exists, err := ipt.Exists(rule.table, rule.chain, rule.rulespec...) + if err != nil { + // this shouldn't ever happen + return false, fmt.Errorf("failed to check rule existence: %v", err) + } + if !exists { + return false, nil + } + } + + return true, nil +} + +func SetupAndEnsureIPTables(rules []IPTablesRule) error { + ipt, err := iptables.New() + if err != nil { + // if we can't find iptables, give up and return + log.Error(err, "Failed to setup IPTables. iptables binary was not found") + return err + } + + // Ensure that all the iptables rules exist every 5 seconds + if err := ensureIPTables(ipt, rules); err != nil { + log.Error(err, "Failed to ensure iptables rules") + return err + } + + return nil + +} + +// DeleteIPTables delete specified iptables rules +func DeleteIPTables(rules []IPTablesRule) error { + ipt, err := iptables.New() + if err != nil { + // if we can't find iptables, give up and return + log.Error(err, "Failed to setup IPTables. iptables binary was not found") + return err + } + teardownIPTables(ipt, rules) + return nil +} + +func ensureIPTables(ipt IPTables, rules []IPTablesRule) error { + exists, err := ipTablesRulesExist(ipt, rules) + if err != nil { + return fmt.Errorf("Error checking rule existence: %v", err) + } + if exists { + // if all the rules already exist, no need to do anything + return nil + } + // Otherwise, teardown all the rules and set them up again + // We do this because the order of the rules is important + log.Info("Some iptables rules are missing; deleting and recreating rules") + teardownIPTables(ipt, rules) + if err = setupIPTables(ipt, rules); err != nil { + return fmt.Errorf("Error setting up rules: %v", err) + } + return nil +} + +func setupIPTables(ipt IPTables, rules []IPTablesRule) error { + for _, rule := range rules { + log.Info("Adding iptables rule: ", "rule", strings.Join(rule.rulespec, " ")) + err := ipt.AppendUnique(rule.table, rule.chain, rule.rulespec...) + if err != nil { + return fmt.Errorf("failed to insert IPTables rule: %v", err) + } + } + + return nil +} + +func teardownIPTables(ipt IPTables, rules []IPTablesRule) { + for _, rule := range rules { + log.Info("Deleting iptables rule: ", "rule", strings.Join(rule.rulespec, " ")) + // We ignore errors here because if there's an error it's almost certainly because the rule + // doesn't exist, which is fine (we don't need to delete rules that don't exist) + ipt.Delete(rule.table, rule.chain, rule.rulespec...) + } +} diff --git a/internal/pkg/nfnNotify/proto/nfn.pb.go b/internal/pkg/nfnNotify/proto/nfn.pb.go index d419af8..750d55b 100644 --- a/internal/pkg/nfnNotify/proto/nfn.pb.go +++ b/internal/pkg/nfnNotify/proto/nfn.pb.go @@ -371,6 +371,8 @@ func (m *DirectInfo) GetProviderIntf() string { } type InSync struct { + NodeIntfIpAddress string `protobuf:"bytes,1,opt,name=node_intf_ip_address,json=nodeIntfIpAddress,proto3" json:"node_intf_ip_address,omitempty"` + NodeIntfMacAddress string `protobuf:"bytes,2,opt,name=node_intf_mac_address,json=nodeIntfMacAddress,proto3" json:"node_intf_mac_address,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -401,6 +403,20 @@ func (m *InSync) XXX_DiscardUnknown() { var xxx_messageInfo_InSync proto.InternalMessageInfo +func (m *InSync) GetNodeIntfIpAddress() string { + if m != nil { + return m.NodeIntfIpAddress + } + return "" +} + +func (m *InSync) GetNodeIntfMacAddress() string { + if m != nil { + return m.NodeIntfMacAddress + } + return "" +} + func init() { proto.RegisterType((*SubscribeContext)(nil), "SubscribeContext") proto.RegisterType((*Notification)(nil), "Notification") @@ -416,34 +432,37 @@ func init() { } var fileDescriptor_5b809db4a7814953 = []byte{ - // 431 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x5d, 0x6f, 0xd3, 0x30, - 0x14, 0x5d, 0xd8, 0x94, 0x34, 0xb7, 0x1d, 0x74, 0x16, 0x1f, 0x05, 0x84, 0x34, 0xbc, 0x97, 0x8a, - 0x87, 0x6c, 0x8c, 0x57, 0x9e, 0x18, 0x42, 0x8b, 0x84, 0x22, 0x94, 0x21, 0x5e, 0x2d, 0xd7, 0x71, - 0x90, 0xb5, 0xf4, 0x3a, 0xf2, 0x4c, 0x4b, 0x7e, 0x00, 0xbf, 0x83, 0xbf, 0xc7, 0xcf, 0x40, 0xb1, - 0x93, 0x35, 0x5d, 0xf7, 0xb2, 0x37, 0xfb, 0x9e, 0xeb, 0x73, 0xce, 0xbd, 0x3a, 0x86, 0x18, 0x4b, - 0x4c, 0x6a, 0xa3, 0xad, 0xa6, 0xa7, 0x30, 0xbd, 0xfa, 0xb5, 0xb8, 0x11, 0x46, 0x2d, 0xe4, 0x85, - 0x46, 0x2b, 0x7f, 0x5b, 0xf2, 0x1a, 0x62, 0xd4, 0x85, 0x64, 0xc8, 0x97, 0x72, 0x16, 0x1c, 0x07, - 0xf3, 0x38, 0x1f, 0xb5, 0x85, 0x8c, 0x2f, 0x25, 0xfd, 0x17, 0xc0, 0x24, 0xd3, 0x56, 0x95, 0x4a, - 0x70, 0xab, 0x34, 0x92, 0x97, 0x30, 0x12, 0xa8, 0x98, 0x6d, 0xea, 0xbe, 0x39, 0x12, 0xa8, 0xbe, - 0x37, 0xb5, 0x24, 0x14, 0x22, 0x85, 0xec, 0xa6, 0x41, 0x31, 0x7b, 0x74, 0x1c, 0xcc, 0xc7, 0xe7, - 0x51, 0x92, 0xe2, 0x55, 0x83, 0xe2, 0x72, 0x2f, 0x0f, 0x95, 0x3b, 0x91, 0x2f, 0x40, 0x6a, 0xa3, - 0x57, 0xaa, 0x90, 0x86, 0xe1, 0x9a, 0x09, 0x23, 0xb9, 0x95, 0xb3, 0x7d, 0xd7, 0xfe, 0x3c, 0xf9, - 0xd6, 0x41, 0x99, 0xb4, 0x6b, 0x6d, 0xae, 0x2f, 0x1c, 0x7a, 0xb9, 0x97, 0x4f, 0xfb, 0x37, 0xd9, - 0xda, 0xd7, 0xee, 0xf2, 0x18, 0xb9, 0xd4, 0x2b, 0x39, 0x3b, 0xb8, 0x9f, 0x27, 0x77, 0xe8, 0x36, - 0x8f, 0xaf, 0x7d, 0x8a, 0x21, 0xaa, 0x79, 0x53, 0x69, 0x5e, 0xd0, 0x3f, 0x01, 0x3c, 0xbb, 0xd7, - 0x00, 0x99, 0xc3, 0x74, 0x28, 0x36, 0x58, 0xd4, 0xe3, 0x0d, 0x61, 0xbb, 0x2e, 0xf2, 0x06, 0x0e, - 0x56, 0x15, 0xc7, 0x6e, 0xfe, 0x38, 0xf9, 0x51, 0x71, 0x4c, 0xb1, 0xd4, 0xb9, 0x2b, 0x93, 0x13, - 0x08, 0x0b, 0x65, 0xa4, 0xb0, 0xdd, 0xc4, 0xe3, 0xe4, 0xb3, 0xbb, 0xba, 0x96, 0x0e, 0xa2, 0x7f, - 0x77, 0x7d, 0x78, 0xb3, 0x0f, 0xf0, 0xf1, 0x0e, 0x8e, 0x5a, 0x41, 0x56, 0xe9, 0x9f, 0x4a, 0xf0, - 0x8a, 0x29, 0xb4, 0xa5, 0x33, 0x15, 0xe7, 0x4f, 0x5a, 0xe0, 0xab, 0xaf, 0xa7, 0x68, 0x4b, 0x72, - 0x06, 0x4f, 0xbd, 0x32, 0xbb, 0x25, 0x77, 0xed, 0xfb, 0xae, 0x9d, 0x78, 0xac, 0x37, 0xd4, 0xbe, - 0xa0, 0xd7, 0x30, 0xea, 0x07, 0x23, 0x2f, 0x20, 0x72, 0x4a, 0xaa, 0xe8, 0xac, 0x84, 0xed, 0x35, - 0x2d, 0xc8, 0x09, 0x1c, 0x6e, 0xf3, 0x79, 0xf9, 0x49, 0x3d, 0x60, 0x22, 0x6f, 0x61, 0xb2, 0x65, - 0xd1, 0x6b, 0x8e, 0xab, 0x8d, 0x3d, 0xfa, 0x1e, 0x60, 0xb3, 0xa4, 0x5d, 0xd6, 0x60, 0x97, 0x95, - 0x8e, 0x20, 0xf4, 0xc1, 0x3b, 0xff, 0xe8, 0xc2, 0xef, 0x02, 0xdc, 0x90, 0x53, 0x88, 0x6f, 0xc3, - 0x4f, 0x8e, 0x92, 0xbb, 0x1f, 0xe1, 0xd5, 0x61, 0x32, 0x4c, 0xfa, 0x59, 0xb0, 0x08, 0xdd, 0xa7, - 0xf9, 0xf0, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x24, 0xa2, 0x9f, 0x85, 0x41, 0x03, 0x00, 0x00, + // 472 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xdd, 0x6e, 0xd3, 0x30, + 0x14, 0x5e, 0xd8, 0xd4, 0x34, 0xa7, 0x1d, 0xb4, 0x47, 0x1b, 0x14, 0x10, 0xd2, 0xc8, 0x6e, 0x2a, + 0x2e, 0xd2, 0x6d, 0xdc, 0x72, 0x03, 0x43, 0x68, 0x91, 0xa0, 0x42, 0x19, 0xe2, 0xd6, 0x72, 0x1d, + 0x07, 0x59, 0x4b, 0x8f, 0xa3, 0xd4, 0xb4, 0xe4, 0x01, 0x78, 0x0e, 0x5e, 0x8f, 0xc7, 0x40, 0xb1, + 0x93, 0xfe, 0xac, 0xbb, 0xe1, 0x2e, 0x39, 0xdf, 0xf9, 0x7e, 0x6c, 0xf9, 0x83, 0x80, 0x32, 0x8a, + 0x8a, 0x52, 0x1b, 0x1d, 0x4e, 0x60, 0x70, 0xfb, 0x73, 0xb6, 0x10, 0xa5, 0x9a, 0xc9, 0x6b, 0x4d, + 0x46, 0xfe, 0x32, 0xf8, 0x12, 0x02, 0xd2, 0xa9, 0x64, 0xc4, 0xe7, 0x72, 0xe4, 0x9d, 0x79, 0xe3, + 0x20, 0xe9, 0xd6, 0x83, 0x29, 0x9f, 0xcb, 0xf0, 0xaf, 0x07, 0xfd, 0xa9, 0x36, 0x2a, 0x53, 0x82, + 0x1b, 0xa5, 0x09, 0x9f, 0x43, 0x57, 0x90, 0x62, 0xa6, 0x2a, 0xda, 0x65, 0x5f, 0x90, 0xfa, 0x56, + 0x15, 0x12, 0x43, 0xf0, 0x15, 0xb1, 0x45, 0x45, 0x62, 0xf4, 0xe8, 0xcc, 0x1b, 0xf7, 0xae, 0xfc, + 0x28, 0xa6, 0xdb, 0x8a, 0xc4, 0xcd, 0x41, 0xd2, 0x51, 0xf6, 0x0b, 0x3f, 0x01, 0x16, 0xa5, 0x5e, + 0xaa, 0x54, 0x96, 0x8c, 0x56, 0x4c, 0x94, 0x92, 0x1b, 0x39, 0x3a, 0xb4, 0xeb, 0x4f, 0xa3, 0xaf, + 0x0d, 0x34, 0x95, 0x66, 0xa5, 0xcb, 0xbb, 0x6b, 0x8b, 0xde, 0x1c, 0x24, 0x83, 0x96, 0x33, 0x5d, + 0xb9, 0xd9, 0x7d, 0x9d, 0x52, 0xce, 0xf5, 0x52, 0x8e, 0x8e, 0x1e, 0xd6, 0x49, 0x2c, 0xba, 0xab, + 0xe3, 0x66, 0x1f, 0x02, 0xf0, 0x0b, 0x5e, 0xe5, 0x9a, 0xa7, 0xe1, 0x6f, 0x0f, 0x4e, 0x1f, 0x0c, + 0x80, 0x63, 0x18, 0x6c, 0x9b, 0x6d, 0x5d, 0xd4, 0xe3, 0x8d, 0x60, 0x7d, 0x5d, 0xf8, 0x0a, 0x8e, + 0x96, 0x39, 0xa7, 0xe6, 0xfc, 0x41, 0xf4, 0x3d, 0xe7, 0x14, 0x53, 0xa6, 0x13, 0x3b, 0xc6, 0x73, + 0xe8, 0xa4, 0xaa, 0x94, 0xc2, 0x34, 0x27, 0xee, 0x45, 0x1f, 0xed, 0xaf, 0x5d, 0x69, 0xa0, 0xf0, + 0xcf, 0x7e, 0x0e, 0x17, 0xf6, 0x3f, 0x72, 0xbc, 0x81, 0x61, 0x6d, 0xc8, 0x72, 0xfd, 0x43, 0x09, + 0x9e, 0x33, 0x45, 0x26, 0xb3, 0xa1, 0x82, 0xe4, 0x49, 0x0d, 0x7c, 0x76, 0xf3, 0x98, 0x4c, 0x86, + 0x17, 0x70, 0xe2, 0x9c, 0xd9, 0x5a, 0xdc, 0xae, 0x1f, 0xda, 0x75, 0x74, 0x58, 0x1b, 0xa8, 0x66, + 0x84, 0x77, 0xd0, 0x6d, 0x0f, 0x86, 0xcf, 0xc0, 0xb7, 0x4e, 0x2a, 0x6d, 0xa2, 0x74, 0xea, 0xdf, + 0x38, 0xc5, 0x73, 0x38, 0xde, 0xd5, 0x73, 0xf6, 0xfd, 0x62, 0x4b, 0x09, 0x5f, 0x43, 0x7f, 0x27, + 0xa2, 0xf3, 0xec, 0xe5, 0x9b, 0x78, 0xe1, 0x25, 0xc0, 0xe6, 0x92, 0xf6, 0x55, 0xbd, 0x7d, 0xd5, + 0x30, 0x87, 0x8e, 0x7b, 0x78, 0x38, 0x81, 0x13, 0xfb, 0xb6, 0xeb, 0x55, 0xa6, 0x0a, 0xc6, 0xd3, + 0xb4, 0x94, 0x8b, 0x45, 0xc3, 0x1a, 0xd6, 0x58, 0xcd, 0x88, 0x8b, 0xf7, 0x0e, 0xc0, 0x4b, 0x38, + 0xdd, 0x10, 0xe6, 0x5c, 0xac, 0x19, 0x2e, 0x3d, 0xb6, 0x8c, 0x2f, 0x5c, 0x34, 0x94, 0xab, 0x77, + 0xb6, 0x60, 0xb6, 0x24, 0x15, 0x4e, 0x20, 0x58, 0x17, 0x0c, 0x87, 0xd1, 0xfd, 0xb2, 0xbd, 0x38, + 0x8e, 0xb6, 0xdb, 0x74, 0xe1, 0xcd, 0x3a, 0xb6, 0x98, 0x6f, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, + 0xad, 0x54, 0x7e, 0xa2, 0xa5, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/pkg/nfnNotify/proto/nfn.proto b/internal/pkg/nfnNotify/proto/nfn.proto index 02855e7..567df29 100644 --- a/internal/pkg/nfnNotify/proto/nfn.proto +++ b/internal/pkg/nfnNotify/proto/nfn.proto @@ -47,4 +47,6 @@ message DirectInfo { } message InSync { + string node_intf_ip_address = 1; + string node_intf_mac_address = 2; } diff --git a/internal/pkg/nfnNotify/server.go b/internal/pkg/nfnNotify/server.go index ac22d68..a201618 100644 --- a/internal/pkg/nfnNotify/server.go +++ b/internal/pkg/nfnNotify/server.go @@ -4,13 +4,14 @@ import ( "fmt" "net" pb "ovn4nfv-k8s-plugin/internal/pkg/nfnNotify/proto" + "ovn4nfv-k8s-plugin/internal/pkg/node" v1alpha1 "ovn4nfv-k8s-plugin/pkg/apis/k8s/v1alpha1" clientset "ovn4nfv-k8s-plugin/pkg/generated/clientset/versioned" "strings" "google.golang.org/grpc" "google.golang.org/grpc/reflection" - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" @@ -45,6 +46,11 @@ func (s *serverDB) Subscribe(sc *pb.SubscribeContext, ss pb.NfnNotify_SubscribeS if nodeName == "" { return fmt.Errorf("Node name can't be empty") } + + nodeIntfIPAddr, nodeIntfMacAddr, err := node.AddNodeLogicalPorts(nodeName) + if err != nil { + return fmt.Errorf("Error in creating node logical port for node- %s: %v", nodeName, err) + } cp := client{ context: sc, stream: ss, @@ -61,7 +67,10 @@ func (s *serverDB) Subscribe(sc *pb.SubscribeContext, ss pb.NfnNotify_SubscribeS inSyncMsg := pb.Notification{ CniType: "ovn4nfv", Payload: &pb.Notification_InSync{ - InSync: &pb.InSync{}, + InSync: &pb.InSync{ + NodeIntfIpAddress: nodeIntfIPAddr, + NodeIntfMacAddress: nodeIntfMacAddr, + }, }, } log.Info("Send Insync") diff --git a/internal/pkg/node/node.go b/internal/pkg/node/node.go new file mode 100644 index 0000000..e989e07 --- /dev/null +++ b/internal/pkg/node/node.go @@ -0,0 +1,31 @@ +package node + +import ( + "ovn4nfv-k8s-plugin/internal/pkg/ovn" + + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" +) + +var log = logf.Log.WithName("node") + +//AddNodeLogicalPorts return nodeIntfMacAddr and nodeIntfIPAddr +func AddNodeLogicalPorts(node string) (nodeIntfMacAddr, nodeIntfIPAddr string, err error) { + ovnCtl, err := ovn.GetOvnController() + if err != nil { + return "", "", err + } + + log.Info("Calling CreateNodeLogicalPorts") + nodeIntfMacAddr, nodeIntfIPAddr, err = ovnCtl.AddNodeLogicalPorts(node) + if err != nil { + return "", "", err + } + return nodeIntfMacAddr, nodeIntfIPAddr, nil +} + +//DeleteNodeLogicalPorts return nil +func DeleteNodeLogicalPorts(name, namesapce string) error { + // Run delete for all controllers; + // Todo + return nil +} diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go index 6f7951a..97dc99d 100644 --- a/internal/pkg/ovn/ovn.go +++ b/internal/pkg/ovn/ovn.go @@ -2,14 +2,16 @@ package ovn import ( "fmt" - "github.com/mitchellh/mapstructure" - kapi "k8s.io/api/core/v1" - kexec "k8s.io/utils/exec" "math/rand" "os" + "ovn4nfv-k8s-plugin/internal/pkg/config" k8sv1alpha1 "ovn4nfv-k8s-plugin/pkg/apis/k8s/v1alpha1" "strings" "time" + + "github.com/mitchellh/mapstructure" + kapi "k8s.io/api/core/v1" + kexec "k8s.io/utils/exec" ) type Controller struct { @@ -32,22 +34,23 @@ const ( var ovnConf *OVNNetworkConf +//GetOvnNetConf return error func GetOvnNetConf() error { ovnConf = &OVNNetworkConf{} ovnConf.Subnet = os.Getenv("OVN_SUBNET") if ovnConf.Subnet == "" { - fmt.Errorf("OVN subnet is not set in nfn-operator configmap env") + return fmt.Errorf("OVN subnet is not set in nfn-operator configmap env") } ovnConf.GatewayIP = os.Getenv("OVN_GATEWAYIP") if ovnConf.GatewayIP == "" { - fmt.Errorf("OVN gatewayIP is not set in nfn-operator configmap env") + log.Info("No Gateway IP address provided - 1st IP address of the subnet range will be used as Gateway", "Subnet", ovnConf.Subnet) } ovnConf.ExcludeIPs = os.Getenv("OVN_EXCLUDEIPS") if ovnConf.ExcludeIPs == "" { - fmt.Errorf("OVN excludeIPs is not set in nfn-operator configmap env") + log.Info("No IP addresses are excluded in the subnet range", "Subnet", ovnConf.Subnet) } return nil @@ -98,6 +101,20 @@ func GetOvnController() (*Controller, error) { return nil, fmt.Errorf("OVN Controller not initialized") } +func (oc *Controller) AddNodeLogicalPorts(node string) (ipAddr, macAddr string, err error) { + nodeName := strings.ToLower(node) + portName := config.GetNodeIntfName(nodeName) + + log.V(1).Info("Creating Node logical port", "node", nodeName, "portName", portName) + + ipAddr, macmacAddr, err := oc.addNodeLogicalPortWithSwitch(Ovn4nfvDefaultNw, portName) + if err != nil { + return "", "", err + } + + return ipAddr, macmacAddr, nil +} + // AddLogicalPorts adds ports to the Pod func (oc *Controller) AddLogicalPorts(pod *kapi.Pod, ovnNetObjs []map[string]interface{}) (key, value string) { @@ -344,6 +361,110 @@ func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string return gatewayIP, mask, nil } +func (oc *Controller) addNodeLogicalPortWithSwitch(logicalSwitch, portName string) (ipAddr, macAddr string, r error) { + var out, stderr string + var err error + + log.V(1).Info("Creating Node logical port for on switch", "portName", portName, "logicalSwitch", logicalSwitch) + + out, stderr, err = RunOVNNbctl("--wait=sb", "--", + "--may-exist", "lsp-add", logicalSwitch, portName, + "--", "lsp-set-addresses", + portName, "dynamic") + if err != nil { + log.Error(err, "Error while creating logical port %s ", "portName", portName, "stdout", out, "stderr", stderr) + return "", "", err + } + + count := 30 + for count > 0 { + out, stderr, err = RunOVNNbctl("get", + "logical_switch_port", portName, "dynamic_addresses") + + if err == nil && out != "[]" { + break + } + if err != nil { + log.Error(err, "Error while obtaining addresses for", "portName", portName) + return "", "", err + } + time.Sleep(time.Second) + count-- + } + if count == 0 { + log.Error(err, "Error while obtaining addresses for", "portName", portName, "stdout", out, "stderr", stderr) + return "", "", err + } + + // static addresses have format ["0a:00:00:00:00:01 192.168.1.3"], while + // dynamic addresses have format "0a:00:00:00:00:01 192.168.1.3". + outStr := strings.TrimLeft(out, `[`) + outStr = strings.TrimRight(outStr, `]`) + outStr = strings.Trim(outStr, `"`) + addresses := strings.Split(outStr, " ") + if len(addresses) != 2 { + log.Info("Error while obtaining addresses for", "portName", portName) + return "", "", err + } + + _, mask, err := oc.getGatewayFromSwitch(logicalSwitch) + if err != nil { + log.Error(err, "Error obtaining gateway address for switch", "logicalSwitch", logicalSwitch) + return "", "", err + } + + ipAddr = fmt.Sprintf("%s/%s", addresses[1], mask) + macAddr = fmt.Sprintf("%s", addresses[0]) + + return ipAddr, macAddr, nil +} + +func (oc *Controller) getNodeLogicalPortIPAddr(pod *kapi.Pod) (ipAddress string, r error) { + var out, stderr, nodeName, portName string + var err error + + nodeName = strings.ToLower(pod.Spec.NodeName) + portName = config.GetNodeIntfName(nodeName) + + log.V(1).Info("Get Node logical port", "pod", pod.GetName(), "node", nodeName, "portName", portName) + + count := 30 + for count > 0 { + out, stderr, err = RunOVNNbctl("get", + "logical_switch_port", portName, "dynamic_addresses") + + if err == nil && out != "[]" { + break + } + if err != nil { + log.Error(err, "Error while obtaining addresses for", "portName", portName) + return "", err + } + time.Sleep(time.Second) + count-- + } + if count == 0 { + log.Error(err, "Error while obtaining addresses for", "portName", portName, "stdout", out, "stderr", stderr) + return "", err + } + + // static addresses have format ["0a:00:00:00:00:01 192.168.1.3"], while + // dynamic addresses have format "0a:00:00:00:00:01 192.168.1.3". + outStr := strings.TrimLeft(out, `[`) + outStr = strings.TrimRight(outStr, `]`) + outStr = strings.Trim(outStr, `"`) + addresses := strings.Split(outStr, " ") + if len(addresses) != 2 { + log.Info("Error while obtaining addresses for", "portName", portName) + return "", err + } + + ipAddr := fmt.Sprintf("%s", addresses[1]) + log.V(1).Info("Get Node logical port", "pod", pod.GetName(), "node", nodeName, "portName", portName, "Node port IP", ipAddr) + + return ipAddr, nil +} + func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, portName string) (annotation string) { var out, stderr string var err error @@ -425,11 +546,18 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA return } - gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch) + _, mask, err := oc.getGatewayFromSwitch(logicalSwitch) if err != nil { log.Error(err, "Error obtaining gateway address for switch", "logicalSwitch", logicalSwitch) return } + + gatewayIP, err := oc.getNodeLogicalPortIPAddr(pod) + if err != nil { + log.Error(err, "Error obtaining gateway address for switch", "logicalSwitch", logicalSwitch) + return + } + annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP) return annotation |