diff options
author | Ritu Sood <ritu.sood@intel.com> | 2018-11-10 09:56:52 -0800 |
---|---|---|
committer | Victor Morales <victor.morales@intel.com> | 2018-11-20 01:50:58 -0800 |
commit | 5026d1d89b05eac5e004279b742df6745a73d93a (patch) | |
tree | 8f9aed1e476706e008b746debda6d616bd0ac7a5 /internal/pkg | |
parent | 9506ae48eb545d502cc3685a99862740d28e7afb (diff) |
Seed code for the Plugin
The code includes ovn4nfvk8s Plugin & CNI. It implements multiple OVN
interfaces for Pods and assumes Multus (or similar CNI) calls its CNI
not as first CNI.
Change-Id: I524c1d18752eb6dbc8d34addd3b60d5bbaa06ff4
Signed-off-by: Ritu Sood <ritu.sood@intel.com>
Signed-off-by: Victor Morales <victor.morales@intel.com>
Diffstat (limited to 'internal/pkg')
-rw-r--r-- | internal/pkg/config/.gitkeep | 0 | ||||
-rw-r--r-- | internal/pkg/config/config.go | 359 | ||||
-rw-r--r-- | internal/pkg/factory/.gitkeep | 0 | ||||
-rw-r--r-- | internal/pkg/factory/factory.go | 318 | ||||
-rw-r--r-- | internal/pkg/factory/factory_test.go | 686 | ||||
-rw-r--r-- | internal/pkg/kube/.gitkeep | 0 | ||||
-rw-r--r-- | internal/pkg/kube/kube.go | 141 | ||||
-rw-r--r-- | internal/pkg/ovn/.gitkeep | 0 | ||||
-rw-r--r-- | internal/pkg/ovn/common.go | 89 | ||||
-rw-r--r-- | internal/pkg/ovn/ovn.go | 71 | ||||
-rw-r--r-- | internal/pkg/ovn/ovn_test.go | 116 | ||||
-rw-r--r-- | internal/pkg/ovn/pods.go | 267 | ||||
-rw-r--r-- | internal/pkg/ovn/router.go | 50 | ||||
-rw-r--r-- | internal/pkg/testing/testing.go | 55 | ||||
-rw-r--r-- | internal/pkg/util/.gitkeep | 0 | ||||
-rw-r--r-- | internal/pkg/util/net.go | 34 | ||||
-rw-r--r-- | internal/pkg/util/ovs.go | 126 |
17 files changed, 2312 insertions, 0 deletions
diff --git a/internal/pkg/config/.gitkeep b/internal/pkg/config/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/config/.gitkeep +++ /dev/null diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go new file mode 100644 index 0000000..045a0ac --- /dev/null +++ b/internal/pkg/config/config.go @@ -0,0 +1,359 @@ +package config + +import ( + "fmt" + "net/url" + "os" + "path/filepath" + "reflect" + "strings" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli" + gcfg "gopkg.in/gcfg.v1" + + kexec "k8s.io/utils/exec" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/cert" +) + +// The following are global config parameters that other modules may access directly +var ( + // ovn-kubernetes version, to be changed with every release + Version = "0.3.0" + + // Default holds parsed config file parameters and command-line overrides + Default = DefaultConfig{ + MTU: 1400, + } + + // Logging holds logging-related parsed config file parameters and command-line overrides + Logging = LoggingConfig{ + File: "", // do not log to a file by default + Level: 4, + } + + // CNI holds CNI-related parsed config file parameters and command-line overrides + CNI = CNIConfig{ + ConfDir: "/etc/cni/net.d", + Plugin: "ovn4nfvk8s-cni", + } + + // Kubernetes holds Kubernetes-related parsed config file parameters and command-line overrides + Kubernetes = KubernetesConfig{ + APIServer: "http://localhost:8080", + } +) + +// DefaultConfig holds parsed config file parameters and command-line overrides +type DefaultConfig struct { + // MTU value used for the overlay networks. + MTU int `gcfg:"mtu"` +} + +// LoggingConfig holds logging-related parsed config file parameters and command-line overrides +type LoggingConfig struct { + // File is the path of the file to log to + File string `gcfg:"logfile"` + // Level is the logging verbosity level + Level int `gcfg:"loglevel"` +} + +// CNIConfig holds CNI-related parsed config file parameters and command-line overrides +type CNIConfig struct { + // ConfDir specifies the CNI config directory in which to write the overlay CNI config file + ConfDir string `gcfg:"conf-dir"` + // Plugin specifies the name of the CNI plugin + Plugin string `gcfg:"plugin"` +} + +// KubernetesConfig holds Kubernetes-related parsed config file parameters and command-line overrides +type KubernetesConfig struct { + Kubeconfig string `gcfg:"kubeconfig"` + CACert string `gcfg:"cacert"` + APIServer string `gcfg:"apiserver"` + Token string `gcfg:"token"` +} + +// Config is used to read the structured config file and to cache config in testcases +type config struct { + Default DefaultConfig + Logging LoggingConfig + CNI CNIConfig + Kubernetes KubernetesConfig +} + +// copy members of struct 'src' into the corresponding field in struct 'dst' +// if the field in 'src' is a non-zero int or a non-zero-length string. This +// function should be called with pointers to structs. +func overrideFields(dst, src interface{}) { + dstStruct := reflect.ValueOf(dst).Elem() + srcStruct := reflect.ValueOf(src).Elem() + if dstStruct.Kind() != srcStruct.Kind() || dstStruct.Kind() != reflect.Struct { + panic("mismatched value types") + } + if dstStruct.NumField() != srcStruct.NumField() { + panic("mismatched struct types") + } + + for i := 0; i < dstStruct.NumField(); i++ { + dstField := dstStruct.Field(i) + srcField := srcStruct.Field(i) + if dstField.Kind() != srcField.Kind() { + panic("mismatched struct fields") + } + switch srcField.Kind() { + case reflect.String: + if srcField.String() != "" { + dstField.Set(srcField) + } + case reflect.Int: + if srcField.Int() != 0 { + dstField.Set(srcField) + } + default: + panic(fmt.Sprintf("unhandled struct field type: %v", srcField.Kind())) + } + } +} + +var cliConfig config + +// Flags are general command-line flags. Apps should add these flags to their +// own urfave/cli flags and call InitConfig() early in the application. +var Flags = []cli.Flag{ + cli.StringFlag{ + Name: "config-file", + Usage: "configuration file path (default: /etc/openvswitch/ovn4nfv_k8s.conf)", + }, + + // Generic options + cli.IntFlag{ + Name: "mtu", + Usage: "MTU value used for the overlay networks (default: 1400)", + Destination: &cliConfig.Default.MTU, + }, + + // Logging options + cli.IntFlag{ + Name: "loglevel", + Usage: "log verbosity and level: 5=debug, 4=info, 3=warn, 2=error, 1=fatal (default: 4)", + Destination: &cliConfig.Logging.Level, + }, + cli.StringFlag{ + Name: "logfile", + Usage: "path of a file to direct log output to", + Destination: &cliConfig.Logging.File, + }, + + // CNI options + cli.StringFlag{ + Name: "cni-conf-dir", + Usage: "the CNI config directory in which to write the overlay CNI config file (default: /etc/cni/net.d)", + Destination: &cliConfig.CNI.ConfDir, + }, + cli.StringFlag{ + Name: "cni-plugin", + Usage: "the name of the CNI plugin (default: ovn4nfvk8s-cni)", + Destination: &cliConfig.CNI.Plugin, + }, + + // Kubernetes-related options + cli.StringFlag{ + Name: "k8s-kubeconfig", + Usage: "absolute path to the Kubernetes kubeconfig file (not required if the --k8s-apiserver, --k8s-ca-cert, and --k8s-token are given)", + Destination: &cliConfig.Kubernetes.Kubeconfig, + }, + cli.StringFlag{ + Name: "k8s-apiserver", + Usage: "URL of the Kubernetes API server (not required if --k8s-kubeconfig is given) (default: http://localhost:8443)", + Destination: &cliConfig.Kubernetes.APIServer, + }, + cli.StringFlag{ + Name: "k8s-cacert", + Usage: "the absolute path to the Kubernetes API CA certificate (not required if --k8s-kubeconfig is given)", + Destination: &cliConfig.Kubernetes.CACert, + }, + cli.StringFlag{ + Name: "k8s-token", + Usage: "the Kubernetes API authentication token (not required if --k8s-kubeconfig is given)", + Destination: &cliConfig.Kubernetes.Token, + }, +} + +type Defaults struct { + K8sAPIServer bool + K8sToken bool + K8sCert bool +} + +const ( + ovsVsctlCommand = "ovs-vsctl" +) + +func buildKubernetesConfig(exec kexec.Interface, cli, file *config, defaults *Defaults) error { + + // Copy config file values over default values + overrideFields(&Kubernetes, &file.Kubernetes) + // And CLI overrides over config file and default values + overrideFields(&Kubernetes, &cli.Kubernetes) + + if Kubernetes.Kubeconfig != "" && !pathExists(Kubernetes.Kubeconfig) { + return fmt.Errorf("kubernetes kubeconfig file %q not found", Kubernetes.Kubeconfig) + } + if Kubernetes.CACert != "" && !pathExists(Kubernetes.CACert) { + return fmt.Errorf("kubernetes CA certificate file %q not found", Kubernetes.CACert) + } + + url, err := url.Parse(Kubernetes.APIServer) + if err != nil { + return fmt.Errorf("kubernetes API server address %q invalid: %v", Kubernetes.APIServer, err) + } else if url.Scheme != "https" && url.Scheme != "http" { + return fmt.Errorf("kubernetes API server URL scheme %q invalid", url.Scheme) + } + + return nil +} + +// getConfigFilePath returns config file path and 'true' if the config file is +// the fallback path (eg not given by the user), 'false' if given explicitly +// by the user +func getConfigFilePath(ctx *cli.Context) (string, bool) { + configFile := ctx.String("config-file") + if configFile != "" { + return configFile, false + } + + // default + return filepath.Join("/etc", "openvswitch", "ovn4nfv_k8s.conf"), true + +} + +// InitConfig reads the config file and common command-line options and +// constructs the global config object from them. It returns the config file +// path (if explicitly specified) or an error +func InitConfig(ctx *cli.Context, exec kexec.Interface, defaults *Defaults) (string, error) { + return InitConfigWithPath(ctx, exec, "", defaults) +} + +// InitConfigWithPath reads the given config file (or if empty, reads the config file +// specified by command-line arguments, or empty, the default config file) and +// common command-line options and constructs the global config object from +// them. It returns the config file path (if explicitly specified) or an error +func InitConfigWithPath(ctx *cli.Context, exec kexec.Interface, configFile string, defaults *Defaults) (string, error) { + var cfg config + var retConfigFile string + var configFileIsDefault bool + + // If no specific config file was given, try to find one from command-line + // arguments, or the platform-specific default config file path + if configFile == "" { + configFile, configFileIsDefault = getConfigFilePath(ctx) + } + + logrus.SetOutput(os.Stderr) + + if !configFileIsDefault { + // Only return explicitly specified config file + retConfigFile = configFile + } + + f, err := os.Open(configFile) + // Failure to find a default config file is not a hard error + if err != nil && !configFileIsDefault { + return "", fmt.Errorf("failed to open config file %s: %v", configFile, err) + } + if f != nil { + defer f.Close() + + // Parse ovn4nfvk8s config file. + if err = gcfg.ReadInto(&cfg, f); err != nil { + return "", fmt.Errorf("failed to parse config file %s: %v", f.Name(), err) + } + logrus.Infof("Parsed config file %s", f.Name()) + logrus.Infof("Parsed config: %+v", cfg) + } + + if defaults == nil { + defaults = &Defaults{} + } + + // Build config that needs no special processing + overrideFields(&Default, &cfg.Default) + overrideFields(&Default, &cliConfig.Default) + overrideFields(&CNI, &cfg.CNI) + overrideFields(&CNI, &cliConfig.CNI) + + // Logging setup + overrideFields(&Logging, &cfg.Logging) + overrideFields(&Logging, &cliConfig.Logging) + logrus.SetLevel(logrus.Level(Logging.Level)) + if Logging.File != "" { + var file *os.File + file, err = os.OpenFile(Logging.File, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660) + if err != nil { + logrus.Errorf("failed to open logfile %s (%v). Ignoring..", Logging.File, err) + } else { + logrus.SetOutput(file) + } + } + + if err = buildKubernetesConfig(exec, &cliConfig, &cfg, defaults); err != nil { + return "", err + } + logrus.Debugf("Default config: %+v", Default) + logrus.Debugf("Logging config: %+v", Logging) + logrus.Debugf("CNI config: %+v", CNI) + logrus.Debugf("Kubernetes config: %+v", Kubernetes) + + return retConfigFile, nil +} + +func pathExists(path string) bool { + _, err := os.Stat(path) + if err != nil && os.IsNotExist(err) { + return false + } + return true +} + +// NewClientset creates a Kubernetes clientset from either a kubeconfig, +// TLS properties, or an apiserver URL +func NewClientset(conf *KubernetesConfig) (*kubernetes.Clientset, error) { + var kconfig *rest.Config + var err error + + if conf.Kubeconfig != "" { + // uses the current context in kubeconfig + kconfig, err = clientcmd.BuildConfigFromFlags("", conf.Kubeconfig) + } else if strings.HasPrefix(conf.APIServer, "https") { + if conf.APIServer == "" || conf.Token == "" { + return nil, fmt.Errorf("TLS-secured apiservers require token and CA certificate") + } + kconfig = &rest.Config{ + Host: conf.APIServer, + BearerToken: conf.Token, + } + if conf.CACert != "" { + if _, err := cert.NewPool(conf.CACert); err != nil { + return nil, err + } + kconfig.TLSClientConfig = rest.TLSClientConfig{CAFile: conf.CACert} + } + } else if strings.HasPrefix(conf.APIServer, "http") { + kconfig, err = clientcmd.BuildConfigFromFlags(conf.APIServer, "") + } else { + // Assume we are running from a container managed by kubernetes + // and read the apiserver address and tokens from the + // container's environment. + kconfig, err = rest.InClusterConfig() + } + if err != nil { + return nil, err + } + + return kubernetes.NewForConfig(kconfig) +} diff --git a/internal/pkg/factory/.gitkeep b/internal/pkg/factory/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/factory/.gitkeep +++ /dev/null diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go new file mode 100644 index 0000000..a635e3f --- /dev/null +++ b/internal/pkg/factory/factory.go @@ -0,0 +1,318 @@ +package factory + +import ( + "fmt" + "reflect" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + + kapi "k8s.io/api/core/v1" + knet "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + informerfactory "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type informer struct { + sync.Mutex + oType reflect.Type + inf cache.SharedIndexInformer + handlers map[uint64]cache.ResourceEventHandler +} + +func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) { + i.Lock() + defer i.Unlock() + + objType := reflect.TypeOf(obj) + if objType != i.oType { + logrus.Errorf("object type %v did not match expected %v", objType, i.oType) + return + } + + for id, handler := range i.handlers { + f(id, handler) + } +} + +// WatchFactory initializes and manages common kube watches +type WatchFactory struct { + iFactory informerfactory.SharedInformerFactory + informers map[reflect.Type]*informer + handlerCounter uint64 +} + +const ( + resyncInterval = 12 * time.Hour +) + +func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer { + return &informer{ + oType: oType, + inf: inf, + handlers: make(map[uint64]cache.ResourceEventHandler), + } +} + +var ( + podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) + serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) + endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) + policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) + namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) + nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) +) + +// NewWatchFactory initializes a new watch factory +func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) { + // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have + // any race condition where a resync may be required e.g. cni executable on node watching for + // events on pods and assuming that an 'ADD' event will contain the annotations put in by + // ovnkube master (currently, it is just a 'get' loop) + // the downside of making it tight (like 10 minutes) is needless spinning on all resources + wf := &WatchFactory{ + iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval), + informers: make(map[reflect.Type]*informer), + } + + // Create shared informers we know we'll use + wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer()) + wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer()) + wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer()) + wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer()) + wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer()) + wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer()) + + wf.iFactory.Start(stopChan) + res := wf.iFactory.WaitForCacheSync(stopChan) + for oType, synced := range res { + if !synced { + return nil, fmt.Errorf("error in syncing cache for %v informer", oType) + } + informer := wf.informers[oType] + informer.inf.AddEventHandler(wf.newFederatedHandler(informer)) + } + + return wf, nil +} + +func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v ADD event for handler %d", inf.oType, id) + handler.OnAdd(obj) + }) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id) + handler.OnUpdate(oldObj, newObj) + }) + }, + DeleteFunc: func(obj interface{}) { + if inf.oType != reflect.TypeOf(obj) { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + logrus.Errorf("couldn't get object from tombstone: %+v", obj) + return + } + obj = tombstone.Obj + objType := reflect.TypeOf(obj) + if inf.oType != objType { + logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType) + return + } + } + inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id) + handler.OnDelete(obj) + }) + }, + } +} + +func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) { + switch objType { + case podType: + if pod, ok := obj.(*kapi.Pod); ok { + return &pod.ObjectMeta, nil + } + case serviceType: + if service, ok := obj.(*kapi.Service); ok { + return &service.ObjectMeta, nil + } + case endpointsType: + if endpoints, ok := obj.(*kapi.Endpoints); ok { + return &endpoints.ObjectMeta, nil + } + case policyType: + if policy, ok := obj.(*knet.NetworkPolicy); ok { + return &policy.ObjectMeta, nil + } + case namespaceType: + if namespace, ok := obj.(*kapi.Namespace); ok { + return &namespace.ObjectMeta, nil + } + case nodeType: + if node, ok := obj.(*kapi.Node); ok { + return &node.ObjectMeta, nil + } + } + return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType) +} + +func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + inf, ok := wf.informers[objType] + if !ok { + return 0, fmt.Errorf("unknown object type %v", objType) + } + + sel, err := metav1.LabelSelectorAsSelector(lsel) + if err != nil { + return 0, fmt.Errorf("error creating label selector: %v", err) + } + + filterFunc := func(obj interface{}) bool { + if namespace == "" && lsel == nil { + // Unfiltered handler + return true + } + meta, err := getObjectMeta(objType, obj) + if err != nil { + logrus.Errorf("watch handler filter error: %v", err) + return false + } + if namespace != "" && meta.Namespace != namespace { + return false + } + if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) { + return false + } + return true + } + + // Process existing items as a set so the caller can clean up + // after a restart or whatever + existingItems := inf.inf.GetStore().List() + if processExisting != nil { + items := make([]interface{}, 0) + for _, obj := range existingItems { + if filterFunc(obj) { + items = append(items, obj) + } + } + processExisting(items) + } + + handlerID := atomic.AddUint64(&wf.handlerCounter, 1) + + inf.Lock() + defer inf.Unlock() + + inf.handlers[handlerID] = cache.FilteringResourceEventHandler{ + FilterFunc: filterFunc, + Handler: funcs, + } + logrus.Debugf("added %v event handler %d", objType, handlerID) + + // Send existing items to the handler's add function; informers usually + // do this but since we share informers, it's long-since happened so + // we must emulate that here + for _, obj := range existingItems { + inf.handlers[handlerID].OnAdd(obj) + } + + return handlerID, nil +} + +func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error { + inf, ok := wf.informers[objType] + if !ok { + return fmt.Errorf("tried to remove unknown object type %v event handler", objType) + } + + inf.Lock() + defer inf.Unlock() + if _, ok := inf.handlers[handlerID]; !ok { + return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID) + } + delete(inf.handlers, handlerID) + logrus.Debugf("removed %v event handler %d", objType, handlerID) + return nil +} + +// AddPodHandler adds a handler function that will be executed on Pod object changes +func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(podType, "", nil, handlerFuncs, processExisting) +} + +// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change +func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting) +} + +// RemovePodHandler removes a Pod object event handler function +func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error { + return wf.removeHandler(podType, handlerID) +} + +// AddServiceHandler adds a handler function that will be executed on Service object changes +func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting) +} + +// RemoveServiceHandler removes a Service object event handler function +func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error { + return wf.removeHandler(serviceType, handlerID) +} + +// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes +func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting) +} + +// RemoveEndpointsHandler removes a Endpoints object event handler function +func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error { + return wf.removeHandler(endpointsType, handlerID) +} + +// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes +func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting) +} + +// RemovePolicyHandler removes a NetworkPolicy object event handler function +func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error { + return wf.removeHandler(policyType, handlerID) +} + +// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes +func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting) +} + +// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change +func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting) +} + +// RemoveNamespaceHandler removes a Namespace object event handler function +func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error { + return wf.removeHandler(namespaceType, handlerID) +} + +// AddNodeHandler adds a handler function that will be executed on Node object changes +func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting) +} + +// RemoveNodeHandler removes a Node object event handler function +func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error { + return wf.removeHandler(nodeType, handlerID) +} diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go new file mode 100644 index 0000000..38fb77e --- /dev/null +++ b/internal/pkg/factory/factory_test.go @@ -0,0 +1,686 @@ +package factory + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + knet "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestFactory(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Watch Factory Suite") +} + +func newObjectMeta(name, namespace string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + Namespace: namespace, + Labels: map[string]string{ + "name": name, + }, + } +} + +func newPod(name, namespace string) *v1.Pod { + return &v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + ObjectMeta: newObjectMeta(name, namespace), + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "containerName", + Image: "containerImage", + }, + }, + NodeName: "mynode", + }, + } +} + +func newNamespace(name string) *v1.Namespace { + return &v1.Namespace{ + Status: v1.NamespaceStatus{ + Phase: v1.NamespaceActive, + }, + ObjectMeta: newObjectMeta(name, name), + } +} + +func newNode(name string) *v1.Node { + return &v1.Node{ + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(name, ""), + } +} + +func newPolicy(name, namespace string) *knet.NetworkPolicy { + return &knet.NetworkPolicy{ + ObjectMeta: newObjectMeta(name, namespace), + } +} + +func newEndpoints(name, namespace string) *v1.Endpoints { + return &v1.Endpoints{ + ObjectMeta: newObjectMeta(name, namespace), + } +} + +func newService(name, namespace string) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + Namespace: namespace, + Labels: map[string]string{ + "name": name, + }, + }, + } +} + +func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { + w := watch.NewFake() + c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) + c.AddReactor("list", objType, listFn) + return w +} + +var _ = Describe("Watch Factory Operations", func() { + var ( + fakeClient *fake.Clientset + podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher + policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher + pods []*v1.Pod + namespaces []*v1.Namespace + nodes []*v1.Node + policies []*knet.NetworkPolicy + endpoints []*v1.Endpoints + services []*v1.Service + stop chan struct{} + numAdded, numUpdated, numDeleted int + ) + + BeforeEach(func() { + fakeClient = &fake.Clientset{} + stop = make(chan struct{}) + + pods = make([]*v1.Pod, 0) + podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.PodList{} + for _, p := range pods { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + namespaces = make([]*v1.Namespace, 0) + namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.NamespaceList{} + for _, p := range namespaces { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + nodes = make([]*v1.Node, 0) + nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.NodeList{} + for _, p := range nodes { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + policies = make([]*knet.NetworkPolicy, 0) + policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) { + obj := &knet.NetworkPolicyList{} + for _, p := range policies { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + endpoints = make([]*v1.Endpoints, 0) + endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.EndpointsList{} + for _, p := range endpoints { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + services = make([]*v1.Service, 0) + serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.ServiceList{} + for _, p := range services { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + numAdded = 0 + numUpdated = 0 + numDeleted = 0 + }) + + Context("when a processExisting is given", func() { + testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + id, err := wf.addHandler(objType, namespace, lsel, + cache.ResourceEventHandlerFuncs{}, + func(objs []interface{}) { + Expect(len(objs)).To(Equal(1)) + }) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(BeNumerically(">", uint64(0))) + wf.removeHandler(objType, id) + close(stop) + } + + It("is called for each existing pod", func() { + pods = append(pods, newPod("pod1", "default")) + testExisting(podType, "", nil) + }) + + It("is called for each existing namespace", func() { + namespaces = append(namespaces, newNamespace("default")) + testExisting(namespaceType, "", nil) + }) + + It("is called for each existing node", func() { + nodes = append(nodes, newNode("default")) + testExisting(nodeType, "", nil) + }) + + It("is called for each existing policy", func() { + policies = append(policies, newPolicy("denyall", "default")) + testExisting(policyType, "", nil) + }) + + It("is called for each existing endpoints", func() { + endpoints = append(endpoints, newEndpoints("myendpoint", "default")) + testExisting(endpointsType, "", nil) + }) + + It("is called for each existing service", func() { + services = append(services, newService("myservice", "default")) + testExisting(serviceType, "", nil) + }) + + It("is called for each existing pod that matches a given namespace and label", func() { + pod := newPod("pod1", "default") + pod.ObjectMeta.Labels["blah"] = "foobar" + pods = append(pods, pod) + testExisting(podType, "default", &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }) + }) + }) + + Context("when existing items are known to the informer", func() { + testExisting := func(objType reflect.Type) { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + id, err := wf.addHandler(objType, "", nil, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + numAdded++ + }, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(numAdded).To(Equal(2)) + wf.removeHandler(objType, id) + close(stop) + } + + It("calls ADD for each existing pod", func() { + pods = append(pods, newPod("pod1", "default")) + pods = append(pods, newPod("pod2", "default")) + testExisting(podType) + }) + + It("calls ADD for each existing namespace", func() { + namespaces = append(namespaces, newNamespace("default")) + namespaces = append(namespaces, newNamespace("default2")) + testExisting(namespaceType) + }) + + It("calls ADD for each existing node", func() { + nodes = append(nodes, newNode("default")) + nodes = append(nodes, newNode("default2")) + testExisting(nodeType) + }) + + It("calls ADD for each existing policy", func() { + policies = append(policies, newPolicy("denyall", "default")) + policies = append(policies, newPolicy("denyall2", "default")) + testExisting(policyType) + }) + + It("calls ADD for each existing endpoints", func() { + endpoints = append(endpoints, newEndpoints("myendpoint", "default")) + endpoints = append(endpoints, newEndpoints("myendpoint2", "default")) + testExisting(endpointsType) + }) + + It("calls ADD for each existing service", func() { + services = append(services, newService("myservice", "default")) + services = append(services, newService("myservice2", "default")) + testExisting(serviceType) + }) + }) + + addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 { + id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer GinkgoRecover() + numAdded++ + funcs.AddFunc(obj) + }, + UpdateFunc: func(old, new interface{}) { + defer GinkgoRecover() + numUpdated++ + funcs.UpdateFunc(old, new) + }, + DeleteFunc: func(obj interface{}) { + defer GinkgoRecover() + numDeleted++ + funcs.DeleteFunc(obj) + }, + }, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(BeNumerically(">", uint64(0))) + return id + } + + addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 { + return addFilteredHandler(wf, objType, "", nil, funcs) + } + + It("responds to pod add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newPod("pod1", "default") + id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newPod := new.(*v1.Pod) + Expect(reflect.DeepEqual(newPod, added)).To(BeTrue()) + Expect(newPod.Spec.NodeName).To(Equal("foobar")) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) + }, + }) + + pods = append(pods, added) + podWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.NodeName = "foobar" + podWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + pods = pods[:0] + podWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.RemovePodHandler(id) + close(stop) + }) + + It("responds to namespace add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNamespace("default") + id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ns := obj.(*v1.Namespace) + Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNS := new.(*v1.Namespace) + Expect(reflect.DeepEqual(newNS, added)).To(BeTrue()) + Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating)) + }, + DeleteFunc: func(obj interface{}) { + ns := obj.(*v1.Namespace) + Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) + }, + }) + + namespaces = append(namespaces, added) + namespaceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + namespaces = namespaces[:0] + namespaceWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.RemoveNamespaceHandler(id) + close(stop) + }) + + It("responds to node add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNode("mynode") + id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + Expect(reflect.DeepEqual(node, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNode := new.(*v1.Node) + Expect(reflect.DeepEqual(newNode, added)).To(BeTrue()) + Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated)) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1.Node) + Expect(reflect.DeepEqual(node, added)).To(BeTrue()) + }, + }) + + nodes = append(nodes, added) + nodeWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Status.Phase = v1.NodeTerminated + nodeWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + nodes = nodes[:0] + nodeWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(nodeType, id) + close(stop) + }) + + It("responds to policy add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newPolicy("mypolicy", "default") + id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + np := obj.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(np, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNP := new.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(newNP, added)).To(BeTrue()) + Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress})) + }, + DeleteFunc: func(obj interface{}) { + np := obj.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(np, added)).To(BeTrue()) + }, + }) + + policies = append(policies, added) + policyWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress} + policyWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + policies = policies[:0] + policyWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(policyType, id) + close(stop) + }) + + It("responds to endpoints add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newEndpoints("myendpoints", "default") + id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eps := obj.(*v1.Endpoints) + Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newEPs := new.(*v1.Endpoints) + Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue()) + Expect(len(newEPs.Subsets)).To(Equal(1)) + }, + DeleteFunc: func(obj interface{}) { + eps := obj.(*v1.Endpoints) + Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) + }, + }) + + endpoints = append(endpoints, added) + endpointsWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Subsets = append(added.Subsets, v1.EndpointSubset{ + Ports: []v1.EndpointPort{ + { + Name: "foobar", + Port: 1234, + }, + }, + }) + endpointsWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + endpoints = endpoints[:0] + endpointsWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(endpointsType, id) + close(stop) + }) + + It("responds to service add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newService("myservice", "default") + id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + service := obj.(*v1.Service) + Expect(reflect.DeepEqual(service, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newService := new.(*v1.Service) + Expect(reflect.DeepEqual(newService, added)).To(BeTrue()) + Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1")) + }, + DeleteFunc: func(obj interface{}) { + service := obj.(*v1.Service) + Expect(reflect.DeepEqual(service, added)).To(BeTrue()) + }, + }) + + services = append(services, added) + serviceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.ClusterIP = "1.1.1.1" + serviceWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + services = services[:0] + serviceWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(serviceType, id) + close(stop) + }) + + It("stops processing events after the handler is removed", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNamespace("default") + id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }) + + namespaces = append(namespaces, added) + namespaceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + wf.RemoveNamespaceHandler(id) + + added2 := newNamespace("other") + namespaces = append(namespaces, added2) + namespaceWatch.Add(added2) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + added2.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(added2) + Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + namespaces = []*v1.Namespace{added} + namespaceWatch.Delete(added2) + Consistently(func() int { return numDeleted }, 2).Should(Equal(0)) + + close(stop) + }) + + It("filters correctly by label and namespace", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + passesFilter := newPod("pod1", "default") + passesFilter.ObjectMeta.Labels["blah"] = "foobar" + failsFilter := newPod("pod2", "default") + failsFilter.ObjectMeta.Labels["blah"] = "baz" + failsFilter2 := newPod("pod3", "otherns") + failsFilter2.ObjectMeta.Labels["blah"] = "foobar" + + addFilteredHandler(wf, + podType, + "default", + &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newPod := new.(*v1.Pod) + Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue()) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) + }, + }) + + pods = append(pods, passesFilter) + podWatch.Add(passesFilter) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + + // numAdded should remain 1 + pods = append(pods, failsFilter) + podWatch.Add(failsFilter) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + // numAdded should remain 1 + pods = append(pods, failsFilter2) + podWatch.Add(failsFilter2) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + passesFilter.Status.Phase = v1.PodFailed + podWatch.Modify(passesFilter) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + + // numAdded should remain 1 + failsFilter.Status.Phase = v1.PodFailed + podWatch.Modify(failsFilter) + Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + + failsFilter2.Status.Phase = v1.PodFailed + podWatch.Modify(failsFilter2) + Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + + pods = []*v1.Pod{failsFilter, failsFilter2} + podWatch.Delete(passesFilter) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + close(stop) + }) + + It("correctly handles object updates that cause filter changes", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + pod := newPod("pod1", "default") + pod.ObjectMeta.Labels["blah"] = "baz" + + equalPod := pod + id := addFilteredHandler(wf, + podType, + "default", + &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p := obj.(*v1.Pod) + Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) { + p := obj.(*v1.Pod) + Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) + }, + }) + + pods = append(pods, pod) + + // Pod doesn't pass filter; shouldn't be added + podWatch.Add(pod) + Consistently(func() int { return numAdded }, 2).Should(Equal(0)) + + // Update pod to pass filter; should be treated as add. Need + // to deep-copy pod when modifying because it's a pointer all + // the way through when using FakeClient + podCopy := pod.DeepCopy() + podCopy.ObjectMeta.Labels["blah"] = "foobar" + pods = []*v1.Pod{podCopy} + equalPod = podCopy + podWatch.Modify(podCopy) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + + // Update pod to fail filter; should be treated as delete + pod.ObjectMeta.Labels["blah"] = "baz" + podWatch.Modify(pod) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + + wf.RemovePodHandler(id) + close(stop) + }) +}) diff --git a/internal/pkg/kube/.gitkeep b/internal/pkg/kube/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/kube/.gitkeep +++ /dev/null diff --git a/internal/pkg/kube/kube.go b/internal/pkg/kube/kube.go new file mode 100644 index 0000000..cc7c29b --- /dev/null +++ b/internal/pkg/kube/kube.go @@ -0,0 +1,141 @@ +package kube + +import ( + "fmt" + + "github.com/sirupsen/logrus" + + kapi "k8s.io/api/core/v1" + kapisnetworking "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// Interface represents the exported methods for dealing with getting/setting +// kubernetes resources +type Interface interface { + SetAnnotationOnPod(pod *kapi.Pod, key, value string) error + SetAnnotationOnNode(node *kapi.Node, key, value string) error + SetAnnotationOnNamespace(ns *kapi.Namespace, key, value string) error + GetAnnotationsOnPod(namespace, name string) (map[string]string, error) + GetPod(namespace, name string) (*kapi.Pod, error) + GetPods(namespace string) (*kapi.PodList, error) + GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) + GetNodes() (*kapi.NodeList, error) + GetNode(name string) (*kapi.Node, error) + GetService(namespace, name string) (*kapi.Service, error) + GetEndpoints(namespace string) (*kapi.EndpointsList, error) + GetNamespace(name string) (*kapi.Namespace, error) + GetNamespaces() (*kapi.NamespaceList, error) + GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) +} + +// Kube is the structure object upon which the Interface is implemented +type Kube struct { + KClient kubernetes.Interface +} + +// SetAnnotationOnPod takes the pod object and key/value string pair to set it as an annotation +func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error { + logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name) + patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) + _, err := k.KClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData)) + if err != nil { + logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err) + } + return err +} + +// SetAnnotationOnNode takes the node object and key/value string pair to set it as an annotation +func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error { + logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name) + patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) + _, err := k.KClient.Core().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData)) + if err != nil { + logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err) + } + return err +} + +// SetAnnotationOnNamespace takes the Namespace object and key/value pair +// to set it as an annotation +func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key, + value string) error { + logrus.Infof("Setting annotations %s=%s on namespace %s", key, value, + ns.Name) + patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, + value) + _, err := k.KClient.Core().Namespaces().Patch(ns.Name, + types.MergePatchType, []byte(patchData)) + if err != nil { + logrus.Errorf("Error in setting annotation on namespace %s: %v", + ns.Name, err) + } + return err +} + +// GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace +func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) { + pod, err := k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return pod.ObjectMeta.Annotations, nil +} + +// GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace +func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) { + return k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{}) +} + +// GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace +func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) { + return k.KClient.Core().Pods(namespace).List(metav1.ListOptions{}) +} + +// GetPodsByLabels obtains the Pod resources from kubernetes apiserver, +// given the namespace and label +func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) { + options := metav1.ListOptions{} + options.LabelSelector = selector.String() + return k.KClient.Core().Pods(namespace).List(options) +} + +// GetNodes returns the list of all Node objects from kubernetes +func (k *Kube) GetNodes() (*kapi.NodeList, error) { + return k.KClient.Core().Nodes().List(metav1.ListOptions{}) +} + +// GetNode returns the Node resource from kubernetes apiserver, given its name +func (k *Kube) GetNode(name string) (*kapi.Node, error) { + return k.KClient.Core().Nodes().Get(name, metav1.GetOptions{}) +} + +// GetService returns the Service resource from kubernetes apiserver, given its name and namespace +func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) { + return k.KClient.Core().Services(namespace).Get(name, metav1.GetOptions{}) +} + +// GetEndpoints returns all the Endpoint resources from kubernetes +// apiserver, given namespace +func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) { + return k.KClient.Core().Endpoints(namespace).List(metav1.ListOptions{}) +} + +// GetNamespace returns the Namespace resource from kubernetes apiserver, +// given its name +func (k *Kube) GetNamespace(name string) (*kapi.Namespace, error) { + return k.KClient.Core().Namespaces().Get(name, metav1.GetOptions{}) +} + +// GetNamespaces returns all Namespace resource from kubernetes apiserver +func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) { + return k.KClient.Core().Namespaces().List(metav1.ListOptions{}) +} + +// GetNetworkPolicies returns all network policy objects from kubernetes +func (k *Kube) GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) { + return k.KClient.Networking().NetworkPolicies(namespace).List(metav1.ListOptions{}) +} diff --git a/internal/pkg/ovn/.gitkeep b/internal/pkg/ovn/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/ovn/.gitkeep +++ /dev/null diff --git a/internal/pkg/ovn/common.go b/internal/pkg/ovn/common.go new file mode 100644 index 0000000..16923ea --- /dev/null +++ b/internal/pkg/ovn/common.go @@ -0,0 +1,89 @@ +package ovn + +import ( + "encoding/json" + "fmt" + "github.com/sirupsen/logrus" + "strings" +) + +func (oc *Controller) getIPFromOvnAnnotation(ovnAnnotation string) string { + if ovnAnnotation == "" { + return "" + } + + var ovnAnnotationMap map[string]string + err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap) + if err != nil { + logrus.Errorf("Error in json unmarshaling ovn annotation "+ + "(%v)", err) + return "" + } + + ipAddressMask := strings.Split(ovnAnnotationMap["ip_address"], "/") + if len(ipAddressMask) != 2 { + logrus.Errorf("Error in splitting ip address") + return "" + } + + return ipAddressMask[0] +} + +func (oc *Controller) getMacFromOvnAnnotation(ovnAnnotation string) string { + if ovnAnnotation == "" { + return "" + } + + var ovnAnnotationMap map[string]string + err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap) + if err != nil { + logrus.Errorf("Error in json unmarshaling ovn annotation "+ + "(%v)", err) + return "" + } + + return ovnAnnotationMap["mac_address"] +} + +func stringSliceMembership(slice []string, key string) bool { + for _, val := range slice { + if val == key { + return true + } + } + return false +} + +func (oc *Controller) getNetworkFromOvnAnnotation(ovnAnnotation string) string { + if ovnAnnotation == "" { + logrus.Errorf("getNetworkFromOvnAnnotation ovnAnnotation: %s", ovnAnnotation) + return "" + } + logrus.Infof("getNetworkFromOvnAnnotation ovnAnnotation: %s", ovnAnnotation) + + var ovnAnnotationMap map[string]string + err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap) + if err != nil { + logrus.Errorf("Error in json unmarshaling ovn annotation "+ + "(%v)", err) + return "" + } + for key, value := range ovnAnnotationMap { + logrus.Infof("getNetworkFromOvnAnnotation %s: %s", key, value) + } + return ovnAnnotationMap["name"] +} + +func (oc *Controller) parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) { + var ovnNet []map[string]interface{} + + if ovnnetwork == "" { + return nil, fmt.Errorf("parseOvnNetworkObject:error") + } + + if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil { + return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork) + } + + return ovnNet, nil +} diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go new file mode 100644 index 0000000..ec2ccbd --- /dev/null +++ b/internal/pkg/ovn/ovn.go @@ -0,0 +1,71 @@ +package ovn + +import ( + "fmt" + kapi "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "ovn4nfv-k8s-plugin/internal/pkg/factory" + "ovn4nfv-k8s-plugin/internal/pkg/kube" +) + +// Controller structure is the object which holds the controls for starting +// and reacting upon the watched resources (e.g. pods, endpoints) +type Controller struct { + kube kube.Interface + watchFactory *factory.WatchFactory + + gatewayCache map[string]string + // A cache of all logical switches seen by the watcher + logicalSwitchCache map[string]bool + // A cache of all logical ports seen by the watcher and + // its corresponding logical switch + logicalPortCache map[string]string +} + +// NewOvnController creates a new OVN controller for creating logical network +// infrastructure and policy +func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory) *Controller { + return &Controller{ + kube: &kube.Kube{KClient: kubeClient}, + watchFactory: wf, + logicalSwitchCache: make(map[string]bool), + logicalPortCache: make(map[string]string), + gatewayCache: make(map[string]string), + } +} + +// Run starts the actual watching. Also initializes any local structures needed. +func (oc *Controller) Run() error { + fmt.Println("ovn4nfvk8s watching Pods") + for _, f := range []func() error{oc.WatchPods} { + if err := f(); err != nil { + return err + } + } + return nil +} + +// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic +func (oc *Controller) WatchPods() error { + _, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*kapi.Pod) + if pod.Spec.NodeName != "" { + oc.addLogicalPort(pod) + } + }, + UpdateFunc: func(old, newer interface{}) { + podNew := newer.(*kapi.Pod) + podOld := old.(*kapi.Pod) + if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" { + oc.addLogicalPort(podNew) + } + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*kapi.Pod) + oc.deleteLogicalPort(pod) + }, + }, oc.syncPods) + return err +} diff --git a/internal/pkg/ovn/ovn_test.go b/internal/pkg/ovn/ovn_test.go new file mode 100644 index 0000000..2e558a6 --- /dev/null +++ b/internal/pkg/ovn/ovn_test.go @@ -0,0 +1,116 @@ +package ovn + +import ( + "fmt" + "testing" + + "github.com/urfave/cli" + fakeexec "k8s.io/utils/exec/testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "ovn4nfv-k8s-plugin/internal/pkg/config" + "ovn4nfv-k8s-plugin/internal/pkg/factory" + ovntest "ovn4nfv-k8s-plugin/internal/pkg/testing" + "ovn4nfv-k8s-plugin/internal/pkg/util" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestOvn(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "OVN/Pod Test Suite") +} + +var _ = AfterSuite(func() { +}) + +var _ = Describe("Add logical Port", func() { + var app *cli.App + + BeforeEach(func() { + // Restore global default values before each testcase + //config.RestoreDefaultConfig() + + app = cli.NewApp() + app.Name = "test" + app.Flags = config.Flags + }) + + It("tests Pod", func() { + app.Action = func(ctx *cli.Context) error { + const ( + gwIP string = "10.1.1.1" + gwCIDR string = gwIP + "/24" + netName string = "ovn-prot-net" + portName string = "_ok_net0" + macIPAddress string = "0a:00:00:00:00:01 192.168.1.3" + ) + fakeCmds := ovntest.AddFakeCmd(nil, &ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch " + "name=" + netName, + Output: netName, + }) + fakeCmds = ovntest.AddFakeCmdsNoOutputNoError(fakeCmds, []string{ + "ovn-nbctl --timeout=15 --wait=sb -- --may-exist lsp-add " + netName + " " + portName + " -- lsp-set-addresses " + portName + " dynamic -- set logical_switch_port " + portName + " external-ids:namespace= external-ids:logical_switch=" + netName + " external-ids:pod=true", + }) + + fakeCmds = ovntest.AddFakeCmd(fakeCmds, &ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 --if-exists get logical_switch " + netName + " external_ids:gateway_ip", + Output: gwCIDR, + }) + fakeCmds = ovntest.AddFakeCmd(fakeCmds, &ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 get logical_switch_port " + portName + " dynamic_addresses", + Output: macIPAddress, + }) + + fexec := &fakeexec.FakeExec{ + CommandScript: fakeCmds, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + + err := util.SetExec(fexec) + Expect(err).NotTo(HaveOccurred()) + + _, err = config.InitConfig(ctx, fexec, nil) + Expect(err).NotTo(HaveOccurred()) + + fakeClient := &fake.Clientset{} + var fakeWatchFactory factory.WatchFactory + + ovnController := NewOvnController(fakeClient, &fakeWatchFactory) + Expect(err).NotTo(HaveOccurred()) + var ( + okPod = v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "ok", + Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\" , \"defaultGateway\": \"true\"}]"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "by-name", + }, + {}, + }, + }, + } + ) + + ovnController.addLogicalPort(&okPod) + _, _ = ovnController.kube.GetAnnotationsOnPod("", "ok") + + return nil + } + + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + }) +}) diff --git a/internal/pkg/ovn/pods.go b/internal/pkg/ovn/pods.go new file mode 100644 index 0000000..cc3d459 --- /dev/null +++ b/internal/pkg/ovn/pods.go @@ -0,0 +1,267 @@ +package ovn + +import ( + "fmt" + "strings" + "time" + + "github.com/sirupsen/logrus" + kapi "k8s.io/api/core/v1" + "ovn4nfv-k8s-plugin/internal/pkg/util" +) + +func (oc *Controller) syncPods(pods []interface{}) { +} +func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string, error) { + var gatewayIPMaskStr, stderr string + var ok bool + var err error + logrus.Infof("getGatewayFromSwitch: %s", logicalSwitch) + if gatewayIPMaskStr, ok = oc.gatewayCache[logicalSwitch]; !ok { + gatewayIPMaskStr, stderr, err = util.RunOVNNbctlUnix("--if-exists", + "get", "logical_switch", logicalSwitch, + "external_ids:gateway_ip") + if err != nil { + logrus.Errorf("Failed to get gateway IP: %s, stderr: %q, %v", + gatewayIPMaskStr, stderr, err) + return "", "", err + } + if gatewayIPMaskStr == "" { + return "", "", fmt.Errorf("Empty gateway IP in logical switch %s", + logicalSwitch) + } + oc.gatewayCache[logicalSwitch] = gatewayIPMaskStr + } + gatewayIPMask := strings.Split(gatewayIPMaskStr, "/") + if len(gatewayIPMask) != 2 { + return "", "", fmt.Errorf("Failed to get IP and Mask from gateway CIDR: %s", + gatewayIPMaskStr) + } + gatewayIP := gatewayIPMask[0] + mask := gatewayIPMask[1] + return gatewayIP, mask, nil +} + +func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) { + + if pod.Spec.HostNetwork { + return + } + + logrus.Infof("Deleting pod: %s", pod.Name) + logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name) + + // get the list of logical ports from OVN + output, stderr, err := util.RunOVNNbctlUnix("--data=bare", "--no-heading", + "--columns=name", "find", "logical_switch_port", "external_ids:pod=true") + if err != nil { + logrus.Errorf("Error in obtaining list of logical ports, "+ + "stderr: %q, err: %v", + stderr, err) + return + } + logrus.Infof("Exising Ports : %s. ", output) + existingLogicalPorts := strings.Fields(output) + for _, existingPort := range existingLogicalPorts { + if strings.Contains(existingPort, logicalPort) { + // found, delete this logical port + logrus.Infof("Deleting: %s. ", existingPort) + out, stderr, err := util.RunOVNNbctlUnix("--if-exists", "lsp-del", + existingPort) + if err != nil { + logrus.Errorf("Error in deleting pod's logical port "+ + "stdout: %q, stderr: %q err: %v", + out, stderr, err) + } else { + delete(oc.logicalPortCache, existingPort) + } + } + } + return +} + +func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, interfaceName string) (annotation string) { + var out, stderr string + var err error + var isStaticIP bool + if pod.Spec.HostNetwork { + return + } + + if !oc.logicalSwitchCache[logicalSwitch] { + oc.logicalSwitchCache[logicalSwitch] = true + } + var portName string + if interfaceName != "" { + portName = fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, interfaceName) + } else { + return + } + + logrus.Infof("Creating logical port for %s on switch %s", portName, logicalSwitch) + + if ipAddress != "" && macAddress != "" { + isStaticIP = true + } + if ipAddress != "" && macAddress == "" { + macAddress = util.GenerateMac() + isStaticIP = true + } + + if isStaticIP { + out, stderr, err = util.RunOVNNbctlUnix("--may-exist", "lsp-add", + logicalSwitch, portName, "--", "lsp-set-addresses", portName, + fmt.Sprintf("%s %s", macAddress, ipAddress), "--", "--if-exists", + "clear", "logical_switch_port", portName, "dynamic_addresses") + if err != nil { + logrus.Errorf("Failed to add logical port to switch "+ + "stdout: %q, stderr: %q (%v)", + out, stderr, err) + return + } + } else { + out, stderr, err = util.RunOVNNbctlUnix("--wait=sb", "--", + "--may-exist", "lsp-add", logicalSwitch, portName, + "--", "lsp-set-addresses", + portName, "dynamic", "--", "set", + "logical_switch_port", portName, + "external-ids:namespace="+pod.Namespace, + "external-ids:logical_switch="+logicalSwitch, + "external-ids:pod=true") + if err != nil { + logrus.Errorf("Error while creating logical port %s "+ + "stdout: %q, stderr: %q (%v)", + portName, out, stderr, err) + return + } + } + oc.logicalPortCache[portName] = logicalSwitch + gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch) + if err != nil { + logrus.Errorf("Error obtaining gateway address for switch %s: %s", logicalSwitch, err) + return + } + + count := 30 + for count > 0 { + if isStaticIP { + out, stderr, err = util.RunOVNNbctlUnix("get", + "logical_switch_port", portName, "addresses") + } else { + out, stderr, err = util.RunOVNNbctlUnix("get", + "logical_switch_port", portName, "dynamic_addresses") + } + if err == nil && out != "[]" { + break + } + if err != nil { + logrus.Errorf("Error while obtaining addresses for %s - %v", portName, + err) + return + } + time.Sleep(time.Second) + count-- + } + if count == 0 { + logrus.Errorf("Error while obtaining addresses for %s "+ + "stdout: %q, stderr: %q, (%v)", portName, out, stderr, err) + return + } + + // static addresses have format ["0a:00:00:00:00:01 192.168.1.3"], while + // dynamic addresses have format "0a:00:00:00:00:01 192.168.1.3". + outStr := strings.TrimLeft(out, `[`) + outStr = strings.TrimRight(outStr, `]`) + outStr = strings.Trim(outStr, `"`) + addresses := strings.Split(outStr, " ") + if len(addresses) != 2 { + logrus.Errorf("Error while obtaining addresses for %s", portName) + return + } + annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP) + return annotation +} + +func findLogicalSwitch(name string) bool { + // get logical switch from OVN + output, stderr, err := util.RunOVNNbctlUnix("--data=bare", "--no-heading", + "--columns=name", "find", "logical_switch", "name="+name) + if err != nil { + logrus.Errorf("Error in obtaining list of logical switch, "+ + "stderr: %q, err: %v", + stderr, err) + return false + } + + if strings.Compare(name, output) == 0 { + return true + } else { + logrus.Errorf("Error finding Switch %v", name) + return false + } +} + +func (oc *Controller) addLogicalPort(pod *kapi.Pod) { + var logicalSwitch string + var ipAddress, macAddress, interfaceName, defaultGateway string + + annotation := pod.Annotations["ovnNetwork"] + + if annotation != "" { + ovnNetObjs, err := oc.parseOvnNetworkObject(annotation) + if err != nil { + logrus.Errorf("addLogicalPort : Error Parsing OvnNetwork List") + return + } + var ovnString, outStr string + ovnString = "[" + for _, net := range ovnNetObjs { + logicalSwitch = net["name"].(string) + if _, ok := net["interface"]; ok { + interfaceName = net["interface"].(string) + } else { + interfaceName = "" + } + if _, ok := net["ipAddress"]; ok { + ipAddress = net["ipAddress"].(string) + } else { + ipAddress = "" + } + if _, ok := net["macAddress"]; ok { + macAddress = net["macAddress"].(string) + } else { + macAddress = "" + } + if _, ok := net["defaultGateway"]; ok { + defaultGateway = net["defaultGateway"].(string) + } else { + defaultGateway = "false" + } + if !findLogicalSwitch(logicalSwitch) { + return + } + if interfaceName == "" { + logrus.Errorf("Interface name must be provided") + return + } + outStr = oc.addLogicalPortWithSwitch(pod, logicalSwitch, ipAddress, macAddress, interfaceName) + if outStr == "" { + return + } + last := len(outStr) - 1 + tmpString := outStr[:last] + tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + defaultGateway + "\\\"" + tmpString += "," + "\\\"interface\\\":" + "\\\"" + interfaceName + "\\\"}" + ovnString += tmpString + ovnString += "," + } + last := len(ovnString) - 1 + ovnString = ovnString[:last] + ovnString += "]" + logrus.Debugf("ovnIfaceList - %v", ovnString) + err = oc.kube.SetAnnotationOnPod(pod, "ovnIfaceList", ovnString) + if err != nil { + logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err) + } + } +} diff --git a/internal/pkg/ovn/router.go b/internal/pkg/ovn/router.go new file mode 100644 index 0000000..d98c463 --- /dev/null +++ b/internal/pkg/ovn/router.go @@ -0,0 +1,50 @@ +package ovn + +import ( + "github.com/sirupsen/logrus" + "ovn4nfv-k8s-plugin/internal/pkg/util" +) + +func SetupMaster(name string) error { + + // Make sure br-int is created. + stdout, stderr, err := util.RunOVSVsctl("--", "--may-exist", "add-br", "br-int") + if err != nil { + logrus.Errorf("Failed to create br-int, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + // Create a single common distributed router for the cluster. + stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes") + if err != nil { + logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + // Create a logical switch called "ovn4nfv-join" that will be used to connect gateway routers to the distributed router. + // The "ovn4nfv-join" will be allocated IP addresses in the range 100.64.1.0/24. + stdout, stderr, err = util.RunOVNNbctlUnix("--may-exist", "ls-add", "ovn4nfv-join") + if err != nil { + logrus.Errorf("Failed to create logical switch called \"ovn4nfv-join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + // Connect the distributed router to "ovn4nfv-join". + routerMac, stderr, err := util.RunOVNNbctlUnix("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac") + if err != nil { + logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", name, stderr, err) + return err + } + if routerMac == "" { + routerMac = util.GenerateMac() + stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes") + if err != nil { + logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", name, stdout, stderr, err) + return err + } + } + // Connect the switch "ovn4nfv-join" to the router. + stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"") + if err != nil { + logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + return err + } + return nil +} diff --git a/internal/pkg/testing/testing.go b/internal/pkg/testing/testing.go new file mode 100644 index 0000000..4c2afad --- /dev/null +++ b/internal/pkg/testing/testing.go @@ -0,0 +1,55 @@ +package testing + +import ( + "strings" + + kexec "k8s.io/utils/exec" + fakeexec "k8s.io/utils/exec/testing" + + "github.com/onsi/gomega" +) + +// ExpectedCmd contains properties that the testcase expects a called command +// to have as well as the output that the fake command should return +type ExpectedCmd struct { + // Cmd should be the command-line string of the executable name and all arguments it is expected to be called with + Cmd string + // Output is any stdout output which Cmd should produce + Output string + // Stderr is any stderr output which Cmd should produce + Stderr string + // Err is any error that should be returned for the invocation of Cmd + Err error +} + +// AddFakeCmd takes the ExpectedCmd and appends its runner function to +// a fake command action list +func AddFakeCmd(fakeCmds []fakeexec.FakeCommandAction, expected *ExpectedCmd) []fakeexec.FakeCommandAction { + return append(fakeCmds, func(cmd string, args ...string) kexec.Cmd { + parts := strings.Split(expected.Cmd, " ") + gomega.Expect(cmd).To(gomega.Equal("/fake-bin/" + parts[0])) + gomega.Expect(strings.Join(args, " ")).To(gomega.Equal(strings.Join(parts[1:], " "))) + return &fakeexec.FakeCmd{ + Argv: parts[1:], + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + func() ([]byte, error) { + return []byte(expected.Output), expected.Err + }, + }, + RunScript: []fakeexec.FakeRunAction{ + func() ([]byte, []byte, error) { + return []byte(expected.Output), []byte(expected.Stderr), expected.Err + }, + }, + } + }) +} + +// AddFakeCmdsNoOutputNoError takes a list of commands and appends those commands +// to the expected command set. The command cannot return any output or error. +func AddFakeCmdsNoOutputNoError(fakeCmds []fakeexec.FakeCommandAction, commands []string) []fakeexec.FakeCommandAction { + for _, cmd := range commands { + fakeCmds = AddFakeCmd(fakeCmds, &ExpectedCmd{Cmd: cmd}) + } + return fakeCmds +} diff --git a/internal/pkg/util/.gitkeep b/internal/pkg/util/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/util/.gitkeep +++ /dev/null diff --git a/internal/pkg/util/net.go b/internal/pkg/util/net.go new file mode 100644 index 0000000..1ff7fbb --- /dev/null +++ b/internal/pkg/util/net.go @@ -0,0 +1,34 @@ +package util + +import ( + "fmt" + "math/big" + "math/rand" + "net" + "time" +) + +// GenerateMac generates mac address. +func GenerateMac() string { + prefix := "00:00:00" + newRand := rand.New(rand.NewSource(time.Now().UnixNano())) + mac := fmt.Sprintf("%s:%02x:%02x:%02x", prefix, newRand.Intn(255), newRand.Intn(255), newRand.Intn(255)) + return mac +} + +// NextIP returns IP incremented by 1 +func NextIP(ip net.IP) net.IP { + i := ipToInt(ip) + return intToIP(i.Add(i, big.NewInt(1))) +} + +func ipToInt(ip net.IP) *big.Int { + if v := ip.To4(); v != nil { + return big.NewInt(0).SetBytes(v) + } + return big.NewInt(0).SetBytes(ip.To16()) +} + +func intToIP(i *big.Int) net.IP { + return net.IP(i.Bytes()) +} diff --git a/internal/pkg/util/ovs.go b/internal/pkg/util/ovs.go new file mode 100644 index 0000000..22b42b9 --- /dev/null +++ b/internal/pkg/util/ovs.go @@ -0,0 +1,126 @@ +package util + +import ( + "bytes" + "fmt" + "strings" + "time" + "unicode" + + "github.com/sirupsen/logrus" + kexec "k8s.io/utils/exec" +) + +const ( + ovsCommandTimeout = 15 + ovsVsctlCommand = "ovs-vsctl" + ovsOfctlCommand = "ovs-ofctl" + ovnNbctlCommand = "ovn-nbctl" + ipCommand = "ip" +) + +// Exec runs various OVN and OVS utilities +type execHelper struct { + exec kexec.Interface + ofctlPath string + vsctlPath string + nbctlPath string + ipPath string +} + +var runner *execHelper + +// SetExec validates executable paths and saves the given exec interface +// to be used for running various OVS and OVN utilites +func SetExec(exec kexec.Interface) error { + var err error + + runner = &execHelper{exec: exec} + runner.ofctlPath, err = exec.LookPath(ovsOfctlCommand) + if err != nil { + return err + } + runner.vsctlPath, err = exec.LookPath(ovsVsctlCommand) + if err != nil { + return err + } + runner.nbctlPath, err = exec.LookPath(ovnNbctlCommand) + if err != nil { + return err + } + runner.ipPath, err = exec.LookPath(ipCommand) + if err != nil { + return err + } + return nil +} + +// Run the ovn-ctl command and retry if "Connection refused" +// poll waitng for service to become available +func runOVNretry(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) { + + retriesLeft := 200 + for { + stdout, stderr, err := run(cmdPath, args...) + if err == nil { + return stdout, stderr, err + } + + // Connection refused + // Master may not be up so keep trying + if strings.Contains(stderr.String(), "Connection refused") { + if retriesLeft == 0 { + return stdout, stderr, err + } + retriesLeft-- + time.Sleep(2 * time.Second) + } else { + // Some other problem for caller to handle + return stdout, stderr, err + } + } +} + +func run(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) { + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + cmd := runner.exec.Command(cmdPath, args...) + cmd.SetStdout(stdout) + cmd.SetStderr(stderr) + logrus.Debugf("exec: %s %s", cmdPath, strings.Join(args, " ")) + err := cmd.Run() + if err != nil { + logrus.Debugf("exec: %s %s => %v", cmdPath, strings.Join(args, " "), err) + } + return stdout, stderr, err +} + +// RunOVSOfctl runs a command via ovs-ofctl. +func RunOVSOfctl(args ...string) (string, string, error) { + stdout, stderr, err := run(runner.ofctlPath, args...) + return strings.Trim(stdout.String(), "\" \n"), stderr.String(), err +} + +// RunOVSVsctl runs a command via ovs-vsctl. +func RunOVSVsctl(args ...string) (string, string, error) { + cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} + cmdArgs = append(cmdArgs, args...) + stdout, stderr, err := run(runner.vsctlPath, cmdArgs...) + return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err +} + +// RunOVNNbctlUnix runs command via ovn-nbctl, with ovn-nbctl using the unix +// domain sockets to connect to the ovsdb-server backing the OVN NB database. +func RunOVNNbctlUnix(args ...string) (string, string, error) { + cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} + cmdArgs = append(cmdArgs, args...) + stdout, stderr, err := runOVNretry(runner.nbctlPath, cmdArgs...) + return strings.Trim(strings.TrimFunc(stdout.String(), unicode.IsSpace), "\""), + stderr.String(), err +} + +// RunIP runs a command via the iproute2 "ip" utility +func RunIP(args ...string) (string, string, error) { + stdout, stderr, err := run(runner.ipPath, args...) + return strings.TrimSpace(stdout.String()), stderr.String(), err +} |