aboutsummaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2018-11-10 09:56:52 -0800
committerVictor Morales <victor.morales@intel.com>2018-11-20 01:50:58 -0800
commit5026d1d89b05eac5e004279b742df6745a73d93a (patch)
tree8f9aed1e476706e008b746debda6d616bd0ac7a5 /internal
parent9506ae48eb545d502cc3685a99862740d28e7afb (diff)
Seed code for the Plugin
The code includes ovn4nfvk8s Plugin & CNI. It implements multiple OVN interfaces for Pods and assumes Multus (or similar CNI) calls its CNI not as first CNI. Change-Id: I524c1d18752eb6dbc8d34addd3b60d5bbaa06ff4 Signed-off-by: Ritu Sood <ritu.sood@intel.com> Signed-off-by: Victor Morales <victor.morales@intel.com>
Diffstat (limited to 'internal')
-rw-r--r--internal/pkg/config/.gitkeep0
-rw-r--r--internal/pkg/config/config.go359
-rw-r--r--internal/pkg/factory/.gitkeep0
-rw-r--r--internal/pkg/factory/factory.go318
-rw-r--r--internal/pkg/factory/factory_test.go686
-rw-r--r--internal/pkg/kube/.gitkeep0
-rw-r--r--internal/pkg/kube/kube.go141
-rw-r--r--internal/pkg/ovn/.gitkeep0
-rw-r--r--internal/pkg/ovn/common.go89
-rw-r--r--internal/pkg/ovn/ovn.go71
-rw-r--r--internal/pkg/ovn/ovn_test.go116
-rw-r--r--internal/pkg/ovn/pods.go267
-rw-r--r--internal/pkg/ovn/router.go50
-rw-r--r--internal/pkg/testing/testing.go55
-rw-r--r--internal/pkg/util/.gitkeep0
-rw-r--r--internal/pkg/util/net.go34
-rw-r--r--internal/pkg/util/ovs.go126
17 files changed, 2312 insertions, 0 deletions
diff --git a/internal/pkg/config/.gitkeep b/internal/pkg/config/.gitkeep
deleted file mode 100644
index e69de29..0000000
--- a/internal/pkg/config/.gitkeep
+++ /dev/null
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
new file mode 100644
index 0000000..045a0ac
--- /dev/null
+++ b/internal/pkg/config/config.go
@@ -0,0 +1,359 @@
+package config
+
+import (
+ "fmt"
+ "net/url"
+ "os"
+ "path/filepath"
+ "reflect"
+ "strings"
+
+ "github.com/sirupsen/logrus"
+ "github.com/urfave/cli"
+ gcfg "gopkg.in/gcfg.v1"
+
+ kexec "k8s.io/utils/exec"
+
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
+ "k8s.io/client-go/util/cert"
+)
+
+// The following are global config parameters that other modules may access directly
+var (
+ // ovn-kubernetes version, to be changed with every release
+ Version = "0.3.0"
+
+ // Default holds parsed config file parameters and command-line overrides
+ Default = DefaultConfig{
+ MTU: 1400,
+ }
+
+ // Logging holds logging-related parsed config file parameters and command-line overrides
+ Logging = LoggingConfig{
+ File: "", // do not log to a file by default
+ Level: 4,
+ }
+
+ // CNI holds CNI-related parsed config file parameters and command-line overrides
+ CNI = CNIConfig{
+ ConfDir: "/etc/cni/net.d",
+ Plugin: "ovn4nfvk8s-cni",
+ }
+
+ // Kubernetes holds Kubernetes-related parsed config file parameters and command-line overrides
+ Kubernetes = KubernetesConfig{
+ APIServer: "http://localhost:8080",
+ }
+)
+
+// DefaultConfig holds parsed config file parameters and command-line overrides
+type DefaultConfig struct {
+ // MTU value used for the overlay networks.
+ MTU int `gcfg:"mtu"`
+}
+
+// LoggingConfig holds logging-related parsed config file parameters and command-line overrides
+type LoggingConfig struct {
+ // File is the path of the file to log to
+ File string `gcfg:"logfile"`
+ // Level is the logging verbosity level
+ Level int `gcfg:"loglevel"`
+}
+
+// CNIConfig holds CNI-related parsed config file parameters and command-line overrides
+type CNIConfig struct {
+ // ConfDir specifies the CNI config directory in which to write the overlay CNI config file
+ ConfDir string `gcfg:"conf-dir"`
+ // Plugin specifies the name of the CNI plugin
+ Plugin string `gcfg:"plugin"`
+}
+
+// KubernetesConfig holds Kubernetes-related parsed config file parameters and command-line overrides
+type KubernetesConfig struct {
+ Kubeconfig string `gcfg:"kubeconfig"`
+ CACert string `gcfg:"cacert"`
+ APIServer string `gcfg:"apiserver"`
+ Token string `gcfg:"token"`
+}
+
+// Config is used to read the structured config file and to cache config in testcases
+type config struct {
+ Default DefaultConfig
+ Logging LoggingConfig
+ CNI CNIConfig
+ Kubernetes KubernetesConfig
+}
+
+// copy members of struct 'src' into the corresponding field in struct 'dst'
+// if the field in 'src' is a non-zero int or a non-zero-length string. This
+// function should be called with pointers to structs.
+func overrideFields(dst, src interface{}) {
+ dstStruct := reflect.ValueOf(dst).Elem()
+ srcStruct := reflect.ValueOf(src).Elem()
+ if dstStruct.Kind() != srcStruct.Kind() || dstStruct.Kind() != reflect.Struct {
+ panic("mismatched value types")
+ }
+ if dstStruct.NumField() != srcStruct.NumField() {
+ panic("mismatched struct types")
+ }
+
+ for i := 0; i < dstStruct.NumField(); i++ {
+ dstField := dstStruct.Field(i)
+ srcField := srcStruct.Field(i)
+ if dstField.Kind() != srcField.Kind() {
+ panic("mismatched struct fields")
+ }
+ switch srcField.Kind() {
+ case reflect.String:
+ if srcField.String() != "" {
+ dstField.Set(srcField)
+ }
+ case reflect.Int:
+ if srcField.Int() != 0 {
+ dstField.Set(srcField)
+ }
+ default:
+ panic(fmt.Sprintf("unhandled struct field type: %v", srcField.Kind()))
+ }
+ }
+}
+
+var cliConfig config
+
+// Flags are general command-line flags. Apps should add these flags to their
+// own urfave/cli flags and call InitConfig() early in the application.
+var Flags = []cli.Flag{
+ cli.StringFlag{
+ Name: "config-file",
+ Usage: "configuration file path (default: /etc/openvswitch/ovn4nfv_k8s.conf)",
+ },
+
+ // Generic options
+ cli.IntFlag{
+ Name: "mtu",
+ Usage: "MTU value used for the overlay networks (default: 1400)",
+ Destination: &cliConfig.Default.MTU,
+ },
+
+ // Logging options
+ cli.IntFlag{
+ Name: "loglevel",
+ Usage: "log verbosity and level: 5=debug, 4=info, 3=warn, 2=error, 1=fatal (default: 4)",
+ Destination: &cliConfig.Logging.Level,
+ },
+ cli.StringFlag{
+ Name: "logfile",
+ Usage: "path of a file to direct log output to",
+ Destination: &cliConfig.Logging.File,
+ },
+
+ // CNI options
+ cli.StringFlag{
+ Name: "cni-conf-dir",
+ Usage: "the CNI config directory in which to write the overlay CNI config file (default: /etc/cni/net.d)",
+ Destination: &cliConfig.CNI.ConfDir,
+ },
+ cli.StringFlag{
+ Name: "cni-plugin",
+ Usage: "the name of the CNI plugin (default: ovn4nfvk8s-cni)",
+ Destination: &cliConfig.CNI.Plugin,
+ },
+
+ // Kubernetes-related options
+ cli.StringFlag{
+ Name: "k8s-kubeconfig",
+ Usage: "absolute path to the Kubernetes kubeconfig file (not required if the --k8s-apiserver, --k8s-ca-cert, and --k8s-token are given)",
+ Destination: &cliConfig.Kubernetes.Kubeconfig,
+ },
+ cli.StringFlag{
+ Name: "k8s-apiserver",
+ Usage: "URL of the Kubernetes API server (not required if --k8s-kubeconfig is given) (default: http://localhost:8443)",
+ Destination: &cliConfig.Kubernetes.APIServer,
+ },
+ cli.StringFlag{
+ Name: "k8s-cacert",
+ Usage: "the absolute path to the Kubernetes API CA certificate (not required if --k8s-kubeconfig is given)",
+ Destination: &cliConfig.Kubernetes.CACert,
+ },
+ cli.StringFlag{
+ Name: "k8s-token",
+ Usage: "the Kubernetes API authentication token (not required if --k8s-kubeconfig is given)",
+ Destination: &cliConfig.Kubernetes.Token,
+ },
+}
+
+type Defaults struct {
+ K8sAPIServer bool
+ K8sToken bool
+ K8sCert bool
+}
+
+const (
+ ovsVsctlCommand = "ovs-vsctl"
+)
+
+func buildKubernetesConfig(exec kexec.Interface, cli, file *config, defaults *Defaults) error {
+
+ // Copy config file values over default values
+ overrideFields(&Kubernetes, &file.Kubernetes)
+ // And CLI overrides over config file and default values
+ overrideFields(&Kubernetes, &cli.Kubernetes)
+
+ if Kubernetes.Kubeconfig != "" && !pathExists(Kubernetes.Kubeconfig) {
+ return fmt.Errorf("kubernetes kubeconfig file %q not found", Kubernetes.Kubeconfig)
+ }
+ if Kubernetes.CACert != "" && !pathExists(Kubernetes.CACert) {
+ return fmt.Errorf("kubernetes CA certificate file %q not found", Kubernetes.CACert)
+ }
+
+ url, err := url.Parse(Kubernetes.APIServer)
+ if err != nil {
+ return fmt.Errorf("kubernetes API server address %q invalid: %v", Kubernetes.APIServer, err)
+ } else if url.Scheme != "https" && url.Scheme != "http" {
+ return fmt.Errorf("kubernetes API server URL scheme %q invalid", url.Scheme)
+ }
+
+ return nil
+}
+
+// getConfigFilePath returns config file path and 'true' if the config file is
+// the fallback path (eg not given by the user), 'false' if given explicitly
+// by the user
+func getConfigFilePath(ctx *cli.Context) (string, bool) {
+ configFile := ctx.String("config-file")
+ if configFile != "" {
+ return configFile, false
+ }
+
+ // default
+ return filepath.Join("/etc", "openvswitch", "ovn4nfv_k8s.conf"), true
+
+}
+
+// InitConfig reads the config file and common command-line options and
+// constructs the global config object from them. It returns the config file
+// path (if explicitly specified) or an error
+func InitConfig(ctx *cli.Context, exec kexec.Interface, defaults *Defaults) (string, error) {
+ return InitConfigWithPath(ctx, exec, "", defaults)
+}
+
+// InitConfigWithPath reads the given config file (or if empty, reads the config file
+// specified by command-line arguments, or empty, the default config file) and
+// common command-line options and constructs the global config object from
+// them. It returns the config file path (if explicitly specified) or an error
+func InitConfigWithPath(ctx *cli.Context, exec kexec.Interface, configFile string, defaults *Defaults) (string, error) {
+ var cfg config
+ var retConfigFile string
+ var configFileIsDefault bool
+
+ // If no specific config file was given, try to find one from command-line
+ // arguments, or the platform-specific default config file path
+ if configFile == "" {
+ configFile, configFileIsDefault = getConfigFilePath(ctx)
+ }
+
+ logrus.SetOutput(os.Stderr)
+
+ if !configFileIsDefault {
+ // Only return explicitly specified config file
+ retConfigFile = configFile
+ }
+
+ f, err := os.Open(configFile)
+ // Failure to find a default config file is not a hard error
+ if err != nil && !configFileIsDefault {
+ return "", fmt.Errorf("failed to open config file %s: %v", configFile, err)
+ }
+ if f != nil {
+ defer f.Close()
+
+ // Parse ovn4nfvk8s config file.
+ if err = gcfg.ReadInto(&cfg, f); err != nil {
+ return "", fmt.Errorf("failed to parse config file %s: %v", f.Name(), err)
+ }
+ logrus.Infof("Parsed config file %s", f.Name())
+ logrus.Infof("Parsed config: %+v", cfg)
+ }
+
+ if defaults == nil {
+ defaults = &Defaults{}
+ }
+
+ // Build config that needs no special processing
+ overrideFields(&Default, &cfg.Default)
+ overrideFields(&Default, &cliConfig.Default)
+ overrideFields(&CNI, &cfg.CNI)
+ overrideFields(&CNI, &cliConfig.CNI)
+
+ // Logging setup
+ overrideFields(&Logging, &cfg.Logging)
+ overrideFields(&Logging, &cliConfig.Logging)
+ logrus.SetLevel(logrus.Level(Logging.Level))
+ if Logging.File != "" {
+ var file *os.File
+ file, err = os.OpenFile(Logging.File, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0660)
+ if err != nil {
+ logrus.Errorf("failed to open logfile %s (%v). Ignoring..", Logging.File, err)
+ } else {
+ logrus.SetOutput(file)
+ }
+ }
+
+ if err = buildKubernetesConfig(exec, &cliConfig, &cfg, defaults); err != nil {
+ return "", err
+ }
+ logrus.Debugf("Default config: %+v", Default)
+ logrus.Debugf("Logging config: %+v", Logging)
+ logrus.Debugf("CNI config: %+v", CNI)
+ logrus.Debugf("Kubernetes config: %+v", Kubernetes)
+
+ return retConfigFile, nil
+}
+
+func pathExists(path string) bool {
+ _, err := os.Stat(path)
+ if err != nil && os.IsNotExist(err) {
+ return false
+ }
+ return true
+}
+
+// NewClientset creates a Kubernetes clientset from either a kubeconfig,
+// TLS properties, or an apiserver URL
+func NewClientset(conf *KubernetesConfig) (*kubernetes.Clientset, error) {
+ var kconfig *rest.Config
+ var err error
+
+ if conf.Kubeconfig != "" {
+ // uses the current context in kubeconfig
+ kconfig, err = clientcmd.BuildConfigFromFlags("", conf.Kubeconfig)
+ } else if strings.HasPrefix(conf.APIServer, "https") {
+ if conf.APIServer == "" || conf.Token == "" {
+ return nil, fmt.Errorf("TLS-secured apiservers require token and CA certificate")
+ }
+ kconfig = &rest.Config{
+ Host: conf.APIServer,
+ BearerToken: conf.Token,
+ }
+ if conf.CACert != "" {
+ if _, err := cert.NewPool(conf.CACert); err != nil {
+ return nil, err
+ }
+ kconfig.TLSClientConfig = rest.TLSClientConfig{CAFile: conf.CACert}
+ }
+ } else if strings.HasPrefix(conf.APIServer, "http") {
+ kconfig, err = clientcmd.BuildConfigFromFlags(conf.APIServer, "")
+ } else {
+ // Assume we are running from a container managed by kubernetes
+ // and read the apiserver address and tokens from the
+ // container's environment.
+ kconfig, err = rest.InClusterConfig()
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return kubernetes.NewForConfig(kconfig)
+}
diff --git a/internal/pkg/factory/.gitkeep b/internal/pkg/factory/.gitkeep
deleted file mode 100644
index e69de29..0000000
--- a/internal/pkg/factory/.gitkeep
+++ /dev/null
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go
new file mode 100644
index 0000000..a635e3f
--- /dev/null
+++ b/internal/pkg/factory/factory.go
@@ -0,0 +1,318 @@
+package factory
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/sirupsen/logrus"
+
+ kapi "k8s.io/api/core/v1"
+ knet "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ informerfactory "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+type informer struct {
+ sync.Mutex
+ oType reflect.Type
+ inf cache.SharedIndexInformer
+ handlers map[uint64]cache.ResourceEventHandler
+}
+
+func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) {
+ i.Lock()
+ defer i.Unlock()
+
+ objType := reflect.TypeOf(obj)
+ if objType != i.oType {
+ logrus.Errorf("object type %v did not match expected %v", objType, i.oType)
+ return
+ }
+
+ for id, handler := range i.handlers {
+ f(id, handler)
+ }
+}
+
+// WatchFactory initializes and manages common kube watches
+type WatchFactory struct {
+ iFactory informerfactory.SharedInformerFactory
+ informers map[reflect.Type]*informer
+ handlerCounter uint64
+}
+
+const (
+ resyncInterval = 12 * time.Hour
+)
+
+func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer {
+ return &informer{
+ oType: oType,
+ inf: inf,
+ handlers: make(map[uint64]cache.ResourceEventHandler),
+ }
+}
+
+var (
+ podType reflect.Type = reflect.TypeOf(&kapi.Pod{})
+ serviceType reflect.Type = reflect.TypeOf(&kapi.Service{})
+ endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{})
+ policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{})
+ namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{})
+ nodeType reflect.Type = reflect.TypeOf(&kapi.Node{})
+)
+
+// NewWatchFactory initializes a new watch factory
+func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) {
+ // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
+ // any race condition where a resync may be required e.g. cni executable on node watching for
+ // events on pods and assuming that an 'ADD' event will contain the annotations put in by
+ // ovnkube master (currently, it is just a 'get' loop)
+ // the downside of making it tight (like 10 minutes) is needless spinning on all resources
+ wf := &WatchFactory{
+ iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
+ informers: make(map[reflect.Type]*informer),
+ }
+
+ // Create shared informers we know we'll use
+ wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer())
+ wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer())
+ wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer())
+ wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer())
+ wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer())
+ wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer())
+
+ wf.iFactory.Start(stopChan)
+ res := wf.iFactory.WaitForCacheSync(stopChan)
+ for oType, synced := range res {
+ if !synced {
+ return nil, fmt.Errorf("error in syncing cache for %v informer", oType)
+ }
+ informer := wf.informers[oType]
+ informer.inf.AddEventHandler(wf.newFederatedHandler(informer))
+ }
+
+ return wf, nil
+}
+
+func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs {
+ return cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
+ logrus.Debugf("running %v ADD event for handler %d", inf.oType, id)
+ handler.OnAdd(obj)
+ })
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) {
+ logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id)
+ handler.OnUpdate(oldObj, newObj)
+ })
+ },
+ DeleteFunc: func(obj interface{}) {
+ if inf.oType != reflect.TypeOf(obj) {
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ logrus.Errorf("couldn't get object from tombstone: %+v", obj)
+ return
+ }
+ obj = tombstone.Obj
+ objType := reflect.TypeOf(obj)
+ if inf.oType != objType {
+ logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType)
+ return
+ }
+ }
+ inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
+ logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id)
+ handler.OnDelete(obj)
+ })
+ },
+ }
+}
+
+func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
+ switch objType {
+ case podType:
+ if pod, ok := obj.(*kapi.Pod); ok {
+ return &pod.ObjectMeta, nil
+ }
+ case serviceType:
+ if service, ok := obj.(*kapi.Service); ok {
+ return &service.ObjectMeta, nil
+ }
+ case endpointsType:
+ if endpoints, ok := obj.(*kapi.Endpoints); ok {
+ return &endpoints.ObjectMeta, nil
+ }
+ case policyType:
+ if policy, ok := obj.(*knet.NetworkPolicy); ok {
+ return &policy.ObjectMeta, nil
+ }
+ case namespaceType:
+ if namespace, ok := obj.(*kapi.Namespace); ok {
+ return &namespace.ObjectMeta, nil
+ }
+ case nodeType:
+ if node, ok := obj.(*kapi.Node); ok {
+ return &node.ObjectMeta, nil
+ }
+ }
+ return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType)
+}
+
+func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ inf, ok := wf.informers[objType]
+ if !ok {
+ return 0, fmt.Errorf("unknown object type %v", objType)
+ }
+
+ sel, err := metav1.LabelSelectorAsSelector(lsel)
+ if err != nil {
+ return 0, fmt.Errorf("error creating label selector: %v", err)
+ }
+
+ filterFunc := func(obj interface{}) bool {
+ if namespace == "" && lsel == nil {
+ // Unfiltered handler
+ return true
+ }
+ meta, err := getObjectMeta(objType, obj)
+ if err != nil {
+ logrus.Errorf("watch handler filter error: %v", err)
+ return false
+ }
+ if namespace != "" && meta.Namespace != namespace {
+ return false
+ }
+ if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) {
+ return false
+ }
+ return true
+ }
+
+ // Process existing items as a set so the caller can clean up
+ // after a restart or whatever
+ existingItems := inf.inf.GetStore().List()
+ if processExisting != nil {
+ items := make([]interface{}, 0)
+ for _, obj := range existingItems {
+ if filterFunc(obj) {
+ items = append(items, obj)
+ }
+ }
+ processExisting(items)
+ }
+
+ handlerID := atomic.AddUint64(&wf.handlerCounter, 1)
+
+ inf.Lock()
+ defer inf.Unlock()
+
+ inf.handlers[handlerID] = cache.FilteringResourceEventHandler{
+ FilterFunc: filterFunc,
+ Handler: funcs,
+ }
+ logrus.Debugf("added %v event handler %d", objType, handlerID)
+
+ // Send existing items to the handler's add function; informers usually
+ // do this but since we share informers, it's long-since happened so
+ // we must emulate that here
+ for _, obj := range existingItems {
+ inf.handlers[handlerID].OnAdd(obj)
+ }
+
+ return handlerID, nil
+}
+
+func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error {
+ inf, ok := wf.informers[objType]
+ if !ok {
+ return fmt.Errorf("tried to remove unknown object type %v event handler", objType)
+ }
+
+ inf.Lock()
+ defer inf.Unlock()
+ if _, ok := inf.handlers[handlerID]; !ok {
+ return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID)
+ }
+ delete(inf.handlers, handlerID)
+ logrus.Debugf("removed %v event handler %d", objType, handlerID)
+ return nil
+}
+
+// AddPodHandler adds a handler function that will be executed on Pod object changes
+func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(podType, "", nil, handlerFuncs, processExisting)
+}
+
+// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change
+func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting)
+}
+
+// RemovePodHandler removes a Pod object event handler function
+func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error {
+ return wf.removeHandler(podType, handlerID)
+}
+
+// AddServiceHandler adds a handler function that will be executed on Service object changes
+func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemoveServiceHandler removes a Service object event handler function
+func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error {
+ return wf.removeHandler(serviceType, handlerID)
+}
+
+// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes
+func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemoveEndpointsHandler removes a Endpoints object event handler function
+func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error {
+ return wf.removeHandler(endpointsType, handlerID)
+}
+
+// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes
+func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemovePolicyHandler removes a NetworkPolicy object event handler function
+func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error {
+ return wf.removeHandler(policyType, handlerID)
+}
+
+// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
+func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting)
+}
+
+// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change
+func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting)
+}
+
+// RemoveNamespaceHandler removes a Namespace object event handler function
+func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error {
+ return wf.removeHandler(namespaceType, handlerID)
+}
+
+// AddNodeHandler adds a handler function that will be executed on Node object changes
+func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemoveNodeHandler removes a Node object event handler function
+func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error {
+ return wf.removeHandler(nodeType, handlerID)
+}
diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go
new file mode 100644
index 0000000..38fb77e
--- /dev/null
+++ b/internal/pkg/factory/factory_test.go
@@ -0,0 +1,686 @@
+package factory
+
+import (
+ "reflect"
+ "testing"
+
+ "k8s.io/api/core/v1"
+ knet "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes/fake"
+ core "k8s.io/client-go/testing"
+ "k8s.io/client-go/tools/cache"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestFactory(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Watch Factory Suite")
+}
+
+func newObjectMeta(name, namespace string) metav1.ObjectMeta {
+ return metav1.ObjectMeta{
+ Name: name,
+ UID: types.UID(name),
+ Namespace: namespace,
+ Labels: map[string]string{
+ "name": name,
+ },
+ }
+}
+
+func newPod(name, namespace string) *v1.Pod {
+ return &v1.Pod{
+ Status: v1.PodStatus{
+ Phase: v1.PodRunning,
+ },
+ ObjectMeta: newObjectMeta(name, namespace),
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "containerName",
+ Image: "containerImage",
+ },
+ },
+ NodeName: "mynode",
+ },
+ }
+}
+
+func newNamespace(name string) *v1.Namespace {
+ return &v1.Namespace{
+ Status: v1.NamespaceStatus{
+ Phase: v1.NamespaceActive,
+ },
+ ObjectMeta: newObjectMeta(name, name),
+ }
+}
+
+func newNode(name string) *v1.Node {
+ return &v1.Node{
+ Status: v1.NodeStatus{
+ Phase: v1.NodeRunning,
+ },
+ ObjectMeta: newObjectMeta(name, ""),
+ }
+}
+
+func newPolicy(name, namespace string) *knet.NetworkPolicy {
+ return &knet.NetworkPolicy{
+ ObjectMeta: newObjectMeta(name, namespace),
+ }
+}
+
+func newEndpoints(name, namespace string) *v1.Endpoints {
+ return &v1.Endpoints{
+ ObjectMeta: newObjectMeta(name, namespace),
+ }
+}
+
+func newService(name, namespace string) *v1.Service {
+ return &v1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ UID: types.UID(name),
+ Namespace: namespace,
+ Labels: map[string]string{
+ "name": name,
+ },
+ },
+ }
+}
+
+func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher {
+ w := watch.NewFake()
+ c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil))
+ c.AddReactor("list", objType, listFn)
+ return w
+}
+
+var _ = Describe("Watch Factory Operations", func() {
+ var (
+ fakeClient *fake.Clientset
+ podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher
+ policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher
+ pods []*v1.Pod
+ namespaces []*v1.Namespace
+ nodes []*v1.Node
+ policies []*knet.NetworkPolicy
+ endpoints []*v1.Endpoints
+ services []*v1.Service
+ stop chan struct{}
+ numAdded, numUpdated, numDeleted int
+ )
+
+ BeforeEach(func() {
+ fakeClient = &fake.Clientset{}
+ stop = make(chan struct{})
+
+ pods = make([]*v1.Pod, 0)
+ podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.PodList{}
+ for _, p := range pods {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ namespaces = make([]*v1.Namespace, 0)
+ namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.NamespaceList{}
+ for _, p := range namespaces {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ nodes = make([]*v1.Node, 0)
+ nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.NodeList{}
+ for _, p := range nodes {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ policies = make([]*knet.NetworkPolicy, 0)
+ policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) {
+ obj := &knet.NetworkPolicyList{}
+ for _, p := range policies {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ endpoints = make([]*v1.Endpoints, 0)
+ endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.EndpointsList{}
+ for _, p := range endpoints {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ services = make([]*v1.Service, 0)
+ serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.ServiceList{}
+ for _, p := range services {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ numAdded = 0
+ numUpdated = 0
+ numDeleted = 0
+ })
+
+ Context("when a processExisting is given", func() {
+ testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+ id, err := wf.addHandler(objType, namespace, lsel,
+ cache.ResourceEventHandlerFuncs{},
+ func(objs []interface{}) {
+ Expect(len(objs)).To(Equal(1))
+ })
+ Expect(err).NotTo(HaveOccurred())
+ Expect(id).To(BeNumerically(">", uint64(0)))
+ wf.removeHandler(objType, id)
+ close(stop)
+ }
+
+ It("is called for each existing pod", func() {
+ pods = append(pods, newPod("pod1", "default"))
+ testExisting(podType, "", nil)
+ })
+
+ It("is called for each existing namespace", func() {
+ namespaces = append(namespaces, newNamespace("default"))
+ testExisting(namespaceType, "", nil)
+ })
+
+ It("is called for each existing node", func() {
+ nodes = append(nodes, newNode("default"))
+ testExisting(nodeType, "", nil)
+ })
+
+ It("is called for each existing policy", func() {
+ policies = append(policies, newPolicy("denyall", "default"))
+ testExisting(policyType, "", nil)
+ })
+
+ It("is called for each existing endpoints", func() {
+ endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
+ testExisting(endpointsType, "", nil)
+ })
+
+ It("is called for each existing service", func() {
+ services = append(services, newService("myservice", "default"))
+ testExisting(serviceType, "", nil)
+ })
+
+ It("is called for each existing pod that matches a given namespace and label", func() {
+ pod := newPod("pod1", "default")
+ pod.ObjectMeta.Labels["blah"] = "foobar"
+ pods = append(pods, pod)
+ testExisting(podType, "default", &metav1.LabelSelector{
+ MatchLabels: map[string]string{"blah": "foobar"},
+ })
+ })
+ })
+
+ Context("when existing items are known to the informer", func() {
+ testExisting := func(objType reflect.Type) {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+ id, err := wf.addHandler(objType, "", nil,
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ numAdded++
+ },
+ UpdateFunc: func(old, new interface{}) {},
+ DeleteFunc: func(obj interface{}) {},
+ }, nil)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(numAdded).To(Equal(2))
+ wf.removeHandler(objType, id)
+ close(stop)
+ }
+
+ It("calls ADD for each existing pod", func() {
+ pods = append(pods, newPod("pod1", "default"))
+ pods = append(pods, newPod("pod2", "default"))
+ testExisting(podType)
+ })
+
+ It("calls ADD for each existing namespace", func() {
+ namespaces = append(namespaces, newNamespace("default"))
+ namespaces = append(namespaces, newNamespace("default2"))
+ testExisting(namespaceType)
+ })
+
+ It("calls ADD for each existing node", func() {
+ nodes = append(nodes, newNode("default"))
+ nodes = append(nodes, newNode("default2"))
+ testExisting(nodeType)
+ })
+
+ It("calls ADD for each existing policy", func() {
+ policies = append(policies, newPolicy("denyall", "default"))
+ policies = append(policies, newPolicy("denyall2", "default"))
+ testExisting(policyType)
+ })
+
+ It("calls ADD for each existing endpoints", func() {
+ endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
+ endpoints = append(endpoints, newEndpoints("myendpoint2", "default"))
+ testExisting(endpointsType)
+ })
+
+ It("calls ADD for each existing service", func() {
+ services = append(services, newService("myservice", "default"))
+ services = append(services, newService("myservice2", "default"))
+ testExisting(serviceType)
+ })
+ })
+
+ addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 {
+ id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ defer GinkgoRecover()
+ numAdded++
+ funcs.AddFunc(obj)
+ },
+ UpdateFunc: func(old, new interface{}) {
+ defer GinkgoRecover()
+ numUpdated++
+ funcs.UpdateFunc(old, new)
+ },
+ DeleteFunc: func(obj interface{}) {
+ defer GinkgoRecover()
+ numDeleted++
+ funcs.DeleteFunc(obj)
+ },
+ }, nil)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(id).To(BeNumerically(">", uint64(0)))
+ return id
+ }
+
+ addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 {
+ return addFilteredHandler(wf, objType, "", nil, funcs)
+ }
+
+ It("responds to pod add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newPod("pod1", "default")
+ id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newPod := new.(*v1.Pod)
+ Expect(reflect.DeepEqual(newPod, added)).To(BeTrue())
+ Expect(newPod.Spec.NodeName).To(Equal("foobar"))
+ },
+ DeleteFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
+ },
+ })
+
+ pods = append(pods, added)
+ podWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Spec.NodeName = "foobar"
+ podWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ pods = pods[:0]
+ podWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.RemovePodHandler(id)
+ close(stop)
+ })
+
+ It("responds to namespace add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newNamespace("default")
+ id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ ns := obj.(*v1.Namespace)
+ Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNS := new.(*v1.Namespace)
+ Expect(reflect.DeepEqual(newNS, added)).To(BeTrue())
+ Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating))
+ },
+ DeleteFunc: func(obj interface{}) {
+ ns := obj.(*v1.Namespace)
+ Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
+ },
+ })
+
+ namespaces = append(namespaces, added)
+ namespaceWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Status.Phase = v1.NamespaceTerminating
+ namespaceWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ namespaces = namespaces[:0]
+ namespaceWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.RemoveNamespaceHandler(id)
+ close(stop)
+ })
+
+ It("responds to node add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newNode("mynode")
+ id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ node := obj.(*v1.Node)
+ Expect(reflect.DeepEqual(node, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNode := new.(*v1.Node)
+ Expect(reflect.DeepEqual(newNode, added)).To(BeTrue())
+ Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated))
+ },
+ DeleteFunc: func(obj interface{}) {
+ node := obj.(*v1.Node)
+ Expect(reflect.DeepEqual(node, added)).To(BeTrue())
+ },
+ })
+
+ nodes = append(nodes, added)
+ nodeWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Status.Phase = v1.NodeTerminated
+ nodeWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ nodes = nodes[:0]
+ nodeWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(nodeType, id)
+ close(stop)
+ })
+
+ It("responds to policy add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newPolicy("mypolicy", "default")
+ id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ np := obj.(*knet.NetworkPolicy)
+ Expect(reflect.DeepEqual(np, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNP := new.(*knet.NetworkPolicy)
+ Expect(reflect.DeepEqual(newNP, added)).To(BeTrue())
+ Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress}))
+ },
+ DeleteFunc: func(obj interface{}) {
+ np := obj.(*knet.NetworkPolicy)
+ Expect(reflect.DeepEqual(np, added)).To(BeTrue())
+ },
+ })
+
+ policies = append(policies, added)
+ policyWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress}
+ policyWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ policies = policies[:0]
+ policyWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(policyType, id)
+ close(stop)
+ })
+
+ It("responds to endpoints add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newEndpoints("myendpoints", "default")
+ id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ eps := obj.(*v1.Endpoints)
+ Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newEPs := new.(*v1.Endpoints)
+ Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue())
+ Expect(len(newEPs.Subsets)).To(Equal(1))
+ },
+ DeleteFunc: func(obj interface{}) {
+ eps := obj.(*v1.Endpoints)
+ Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
+ },
+ })
+
+ endpoints = append(endpoints, added)
+ endpointsWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Subsets = append(added.Subsets, v1.EndpointSubset{
+ Ports: []v1.EndpointPort{
+ {
+ Name: "foobar",
+ Port: 1234,
+ },
+ },
+ })
+ endpointsWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ endpoints = endpoints[:0]
+ endpointsWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(endpointsType, id)
+ close(stop)
+ })
+
+ It("responds to service add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newService("myservice", "default")
+ id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ service := obj.(*v1.Service)
+ Expect(reflect.DeepEqual(service, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newService := new.(*v1.Service)
+ Expect(reflect.DeepEqual(newService, added)).To(BeTrue())
+ Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1"))
+ },
+ DeleteFunc: func(obj interface{}) {
+ service := obj.(*v1.Service)
+ Expect(reflect.DeepEqual(service, added)).To(BeTrue())
+ },
+ })
+
+ services = append(services, added)
+ serviceWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Spec.ClusterIP = "1.1.1.1"
+ serviceWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ services = services[:0]
+ serviceWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(serviceType, id)
+ close(stop)
+ })
+
+ It("stops processing events after the handler is removed", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newNamespace("default")
+ id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {},
+ UpdateFunc: func(old, new interface{}) {},
+ DeleteFunc: func(obj interface{}) {},
+ })
+
+ namespaces = append(namespaces, added)
+ namespaceWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ wf.RemoveNamespaceHandler(id)
+
+ added2 := newNamespace("other")
+ namespaces = append(namespaces, added2)
+ namespaceWatch.Add(added2)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+
+ added2.Status.Phase = v1.NamespaceTerminating
+ namespaceWatch.Modify(added2)
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
+ namespaces = []*v1.Namespace{added}
+ namespaceWatch.Delete(added2)
+ Consistently(func() int { return numDeleted }, 2).Should(Equal(0))
+
+ close(stop)
+ })
+
+ It("filters correctly by label and namespace", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ passesFilter := newPod("pod1", "default")
+ passesFilter.ObjectMeta.Labels["blah"] = "foobar"
+ failsFilter := newPod("pod2", "default")
+ failsFilter.ObjectMeta.Labels["blah"] = "baz"
+ failsFilter2 := newPod("pod3", "otherns")
+ failsFilter2.ObjectMeta.Labels["blah"] = "foobar"
+
+ addFilteredHandler(wf,
+ podType,
+ "default",
+ &metav1.LabelSelector{
+ MatchLabels: map[string]string{"blah": "foobar"},
+ },
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newPod := new.(*v1.Pod)
+ Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue())
+ },
+ DeleteFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
+ },
+ })
+
+ pods = append(pods, passesFilter)
+ podWatch.Add(passesFilter)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+
+ // numAdded should remain 1
+ pods = append(pods, failsFilter)
+ podWatch.Add(failsFilter)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+
+ // numAdded should remain 1
+ pods = append(pods, failsFilter2)
+ podWatch.Add(failsFilter2)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+
+ passesFilter.Status.Phase = v1.PodFailed
+ podWatch.Modify(passesFilter)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+
+ // numAdded should remain 1
+ failsFilter.Status.Phase = v1.PodFailed
+ podWatch.Modify(failsFilter)
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
+
+ failsFilter2.Status.Phase = v1.PodFailed
+ podWatch.Modify(failsFilter2)
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
+
+ pods = []*v1.Pod{failsFilter, failsFilter2}
+ podWatch.Delete(passesFilter)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ close(stop)
+ })
+
+ It("correctly handles object updates that cause filter changes", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ pod := newPod("pod1", "default")
+ pod.ObjectMeta.Labels["blah"] = "baz"
+
+ equalPod := pod
+ id := addFilteredHandler(wf,
+ podType,
+ "default",
+ &metav1.LabelSelector{
+ MatchLabels: map[string]string{"blah": "foobar"},
+ },
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ p := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {},
+ DeleteFunc: func(obj interface{}) {
+ p := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
+ },
+ })
+
+ pods = append(pods, pod)
+
+ // Pod doesn't pass filter; shouldn't be added
+ podWatch.Add(pod)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(0))
+
+ // Update pod to pass filter; should be treated as add. Need
+ // to deep-copy pod when modifying because it's a pointer all
+ // the way through when using FakeClient
+ podCopy := pod.DeepCopy()
+ podCopy.ObjectMeta.Labels["blah"] = "foobar"
+ pods = []*v1.Pod{podCopy}
+ equalPod = podCopy
+ podWatch.Modify(podCopy)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+
+ // Update pod to fail filter; should be treated as delete
+ pod.ObjectMeta.Labels["blah"] = "baz"
+ podWatch.Modify(pod)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
+
+ wf.RemovePodHandler(id)
+ close(stop)
+ })
+})
diff --git a/internal/pkg/kube/.gitkeep b/internal/pkg/kube/.gitkeep
deleted file mode 100644
index e69de29..0000000
--- a/internal/pkg/kube/.gitkeep
+++ /dev/null
diff --git a/internal/pkg/kube/kube.go b/internal/pkg/kube/kube.go
new file mode 100644
index 0000000..cc7c29b
--- /dev/null
+++ b/internal/pkg/kube/kube.go
@@ -0,0 +1,141 @@
+package kube
+
+import (
+ "fmt"
+
+ "github.com/sirupsen/logrus"
+
+ kapi "k8s.io/api/core/v1"
+ kapisnetworking "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/kubernetes"
+)
+
+// Interface represents the exported methods for dealing with getting/setting
+// kubernetes resources
+type Interface interface {
+ SetAnnotationOnPod(pod *kapi.Pod, key, value string) error
+ SetAnnotationOnNode(node *kapi.Node, key, value string) error
+ SetAnnotationOnNamespace(ns *kapi.Namespace, key, value string) error
+ GetAnnotationsOnPod(namespace, name string) (map[string]string, error)
+ GetPod(namespace, name string) (*kapi.Pod, error)
+ GetPods(namespace string) (*kapi.PodList, error)
+ GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error)
+ GetNodes() (*kapi.NodeList, error)
+ GetNode(name string) (*kapi.Node, error)
+ GetService(namespace, name string) (*kapi.Service, error)
+ GetEndpoints(namespace string) (*kapi.EndpointsList, error)
+ GetNamespace(name string) (*kapi.Namespace, error)
+ GetNamespaces() (*kapi.NamespaceList, error)
+ GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error)
+}
+
+// Kube is the structure object upon which the Interface is implemented
+type Kube struct {
+ KClient kubernetes.Interface
+}
+
+// SetAnnotationOnPod takes the pod object and key/value string pair to set it as an annotation
+func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error {
+ logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name)
+ patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
+ _, err := k.KClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData))
+ if err != nil {
+ logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err)
+ }
+ return err
+}
+
+// SetAnnotationOnNode takes the node object and key/value string pair to set it as an annotation
+func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error {
+ logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name)
+ patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
+ _, err := k.KClient.Core().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData))
+ if err != nil {
+ logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err)
+ }
+ return err
+}
+
+// SetAnnotationOnNamespace takes the Namespace object and key/value pair
+// to set it as an annotation
+func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key,
+ value string) error {
+ logrus.Infof("Setting annotations %s=%s on namespace %s", key, value,
+ ns.Name)
+ patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key,
+ value)
+ _, err := k.KClient.Core().Namespaces().Patch(ns.Name,
+ types.MergePatchType, []byte(patchData))
+ if err != nil {
+ logrus.Errorf("Error in setting annotation on namespace %s: %v",
+ ns.Name, err)
+ }
+ return err
+}
+
+// GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace
+func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) {
+ pod, err := k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{})
+ if err != nil {
+ return nil, err
+ }
+ return pod.ObjectMeta.Annotations, nil
+}
+
+// GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace
+func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) {
+ return k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{})
+}
+
+// GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace
+func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) {
+ return k.KClient.Core().Pods(namespace).List(metav1.ListOptions{})
+}
+
+// GetPodsByLabels obtains the Pod resources from kubernetes apiserver,
+// given the namespace and label
+func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) {
+ options := metav1.ListOptions{}
+ options.LabelSelector = selector.String()
+ return k.KClient.Core().Pods(namespace).List(options)
+}
+
+// GetNodes returns the list of all Node objects from kubernetes
+func (k *Kube) GetNodes() (*kapi.NodeList, error) {
+ return k.KClient.Core().Nodes().List(metav1.ListOptions{})
+}
+
+// GetNode returns the Node resource from kubernetes apiserver, given its name
+func (k *Kube) GetNode(name string) (*kapi.Node, error) {
+ return k.KClient.Core().Nodes().Get(name, metav1.GetOptions{})
+}
+
+// GetService returns the Service resource from kubernetes apiserver, given its name and namespace
+func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) {
+ return k.KClient.Core().Services(namespace).Get(name, metav1.GetOptions{})
+}
+
+// GetEndpoints returns all the Endpoint resources from kubernetes
+// apiserver, given namespace
+func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) {
+ return k.KClient.Core().Endpoints(namespace).List(metav1.ListOptions{})
+}
+
+// GetNamespace returns the Namespace resource from kubernetes apiserver,
+// given its name
+func (k *Kube) GetNamespace(name string) (*kapi.Namespace, error) {
+ return k.KClient.Core().Namespaces().Get(name, metav1.GetOptions{})
+}
+
+// GetNamespaces returns all Namespace resource from kubernetes apiserver
+func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) {
+ return k.KClient.Core().Namespaces().List(metav1.ListOptions{})
+}
+
+// GetNetworkPolicies returns all network policy objects from kubernetes
+func (k *Kube) GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) {
+ return k.KClient.Networking().NetworkPolicies(namespace).List(metav1.ListOptions{})
+}
diff --git a/internal/pkg/ovn/.gitkeep b/internal/pkg/ovn/.gitkeep
deleted file mode 100644
index e69de29..0000000
--- a/internal/pkg/ovn/.gitkeep
+++ /dev/null
diff --git a/internal/pkg/ovn/common.go b/internal/pkg/ovn/common.go
new file mode 100644
index 0000000..16923ea
--- /dev/null
+++ b/internal/pkg/ovn/common.go
@@ -0,0 +1,89 @@
+package ovn
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/sirupsen/logrus"
+ "strings"
+)
+
+func (oc *Controller) getIPFromOvnAnnotation(ovnAnnotation string) string {
+ if ovnAnnotation == "" {
+ return ""
+ }
+
+ var ovnAnnotationMap map[string]string
+ err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap)
+ if err != nil {
+ logrus.Errorf("Error in json unmarshaling ovn annotation "+
+ "(%v)", err)
+ return ""
+ }
+
+ ipAddressMask := strings.Split(ovnAnnotationMap["ip_address"], "/")
+ if len(ipAddressMask) != 2 {
+ logrus.Errorf("Error in splitting ip address")
+ return ""
+ }
+
+ return ipAddressMask[0]
+}
+
+func (oc *Controller) getMacFromOvnAnnotation(ovnAnnotation string) string {
+ if ovnAnnotation == "" {
+ return ""
+ }
+
+ var ovnAnnotationMap map[string]string
+ err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap)
+ if err != nil {
+ logrus.Errorf("Error in json unmarshaling ovn annotation "+
+ "(%v)", err)
+ return ""
+ }
+
+ return ovnAnnotationMap["mac_address"]
+}
+
+func stringSliceMembership(slice []string, key string) bool {
+ for _, val := range slice {
+ if val == key {
+ return true
+ }
+ }
+ return false
+}
+
+func (oc *Controller) getNetworkFromOvnAnnotation(ovnAnnotation string) string {
+ if ovnAnnotation == "" {
+ logrus.Errorf("getNetworkFromOvnAnnotation ovnAnnotation: %s", ovnAnnotation)
+ return ""
+ }
+ logrus.Infof("getNetworkFromOvnAnnotation ovnAnnotation: %s", ovnAnnotation)
+
+ var ovnAnnotationMap map[string]string
+ err := json.Unmarshal([]byte(ovnAnnotation), &ovnAnnotationMap)
+ if err != nil {
+ logrus.Errorf("Error in json unmarshaling ovn annotation "+
+ "(%v)", err)
+ return ""
+ }
+ for key, value := range ovnAnnotationMap {
+ logrus.Infof("getNetworkFromOvnAnnotation %s: %s", key, value)
+ }
+ return ovnAnnotationMap["name"]
+}
+
+func (oc *Controller) parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) {
+ var ovnNet []map[string]interface{}
+
+ if ovnnetwork == "" {
+ return nil, fmt.Errorf("parseOvnNetworkObject:error")
+ }
+
+ if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil {
+ return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork)
+ }
+
+ return ovnNet, nil
+}
diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go
new file mode 100644
index 0000000..ec2ccbd
--- /dev/null
+++ b/internal/pkg/ovn/ovn.go
@@ -0,0 +1,71 @@
+package ovn
+
+import (
+ "fmt"
+ kapi "k8s.io/api/core/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+ "ovn4nfv-k8s-plugin/internal/pkg/factory"
+ "ovn4nfv-k8s-plugin/internal/pkg/kube"
+)
+
+// Controller structure is the object which holds the controls for starting
+// and reacting upon the watched resources (e.g. pods, endpoints)
+type Controller struct {
+ kube kube.Interface
+ watchFactory *factory.WatchFactory
+
+ gatewayCache map[string]string
+ // A cache of all logical switches seen by the watcher
+ logicalSwitchCache map[string]bool
+ // A cache of all logical ports seen by the watcher and
+ // its corresponding logical switch
+ logicalPortCache map[string]string
+}
+
+// NewOvnController creates a new OVN controller for creating logical network
+// infrastructure and policy
+func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory) *Controller {
+ return &Controller{
+ kube: &kube.Kube{KClient: kubeClient},
+ watchFactory: wf,
+ logicalSwitchCache: make(map[string]bool),
+ logicalPortCache: make(map[string]string),
+ gatewayCache: make(map[string]string),
+ }
+}
+
+// Run starts the actual watching. Also initializes any local structures needed.
+func (oc *Controller) Run() error {
+ fmt.Println("ovn4nfvk8s watching Pods")
+ for _, f := range []func() error{oc.WatchPods} {
+ if err := f(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic
+func (oc *Controller) WatchPods() error {
+ _, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ pod := obj.(*kapi.Pod)
+ if pod.Spec.NodeName != "" {
+ oc.addLogicalPort(pod)
+ }
+ },
+ UpdateFunc: func(old, newer interface{}) {
+ podNew := newer.(*kapi.Pod)
+ podOld := old.(*kapi.Pod)
+ if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" {
+ oc.addLogicalPort(podNew)
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ pod := obj.(*kapi.Pod)
+ oc.deleteLogicalPort(pod)
+ },
+ }, oc.syncPods)
+ return err
+}
diff --git a/internal/pkg/ovn/ovn_test.go b/internal/pkg/ovn/ovn_test.go
new file mode 100644
index 0000000..2e558a6
--- /dev/null
+++ b/internal/pkg/ovn/ovn_test.go
@@ -0,0 +1,116 @@
+package ovn
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/urfave/cli"
+ fakeexec "k8s.io/utils/exec/testing"
+
+ "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes/fake"
+ "ovn4nfv-k8s-plugin/internal/pkg/config"
+ "ovn4nfv-k8s-plugin/internal/pkg/factory"
+ ovntest "ovn4nfv-k8s-plugin/internal/pkg/testing"
+ "ovn4nfv-k8s-plugin/internal/pkg/util"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestOvn(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "OVN/Pod Test Suite")
+}
+
+var _ = AfterSuite(func() {
+})
+
+var _ = Describe("Add logical Port", func() {
+ var app *cli.App
+
+ BeforeEach(func() {
+ // Restore global default values before each testcase
+ //config.RestoreDefaultConfig()
+
+ app = cli.NewApp()
+ app.Name = "test"
+ app.Flags = config.Flags
+ })
+
+ It("tests Pod", func() {
+ app.Action = func(ctx *cli.Context) error {
+ const (
+ gwIP string = "10.1.1.1"
+ gwCIDR string = gwIP + "/24"
+ netName string = "ovn-prot-net"
+ portName string = "_ok_net0"
+ macIPAddress string = "0a:00:00:00:00:01 192.168.1.3"
+ )
+ fakeCmds := ovntest.AddFakeCmd(nil, &ovntest.ExpectedCmd{
+ Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name find logical_switch " + "name=" + netName,
+ Output: netName,
+ })
+ fakeCmds = ovntest.AddFakeCmdsNoOutputNoError(fakeCmds, []string{
+ "ovn-nbctl --timeout=15 --wait=sb -- --may-exist lsp-add " + netName + " " + portName + " -- lsp-set-addresses " + portName + " dynamic -- set logical_switch_port " + portName + " external-ids:namespace= external-ids:logical_switch=" + netName + " external-ids:pod=true",
+ })
+
+ fakeCmds = ovntest.AddFakeCmd(fakeCmds, &ovntest.ExpectedCmd{
+ Cmd: "ovn-nbctl --timeout=15 --if-exists get logical_switch " + netName + " external_ids:gateway_ip",
+ Output: gwCIDR,
+ })
+ fakeCmds = ovntest.AddFakeCmd(fakeCmds, &ovntest.ExpectedCmd{
+ Cmd: "ovn-nbctl --timeout=15 get logical_switch_port " + portName + " dynamic_addresses",
+ Output: macIPAddress,
+ })
+
+ fexec := &fakeexec.FakeExec{
+ CommandScript: fakeCmds,
+ LookPathFunc: func(file string) (string, error) {
+ return fmt.Sprintf("/fake-bin/%s", file), nil
+ },
+ }
+
+ err := util.SetExec(fexec)
+ Expect(err).NotTo(HaveOccurred())
+
+ _, err = config.InitConfig(ctx, fexec, nil)
+ Expect(err).NotTo(HaveOccurred())
+
+ fakeClient := &fake.Clientset{}
+ var fakeWatchFactory factory.WatchFactory
+
+ ovnController := NewOvnController(fakeClient, &fakeWatchFactory)
+ Expect(err).NotTo(HaveOccurred())
+ var (
+ okPod = v1.Pod{
+ TypeMeta: metav1.TypeMeta{
+ Kind: "Pod",
+ APIVersion: "v1",
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "ok",
+ Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\" , \"defaultGateway\": \"true\"}]"},
+ },
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "by-name",
+ },
+ {},
+ },
+ },
+ }
+ )
+
+ ovnController.addLogicalPort(&okPod)
+ _, _ = ovnController.kube.GetAnnotationsOnPod("", "ok")
+
+ return nil
+ }
+
+ err := app.Run([]string{app.Name})
+ Expect(err).NotTo(HaveOccurred())
+ })
+})
diff --git a/internal/pkg/ovn/pods.go b/internal/pkg/ovn/pods.go
new file mode 100644
index 0000000..cc3d459
--- /dev/null
+++ b/internal/pkg/ovn/pods.go
@@ -0,0 +1,267 @@
+package ovn
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ kapi "k8s.io/api/core/v1"
+ "ovn4nfv-k8s-plugin/internal/pkg/util"
+)
+
+func (oc *Controller) syncPods(pods []interface{}) {
+}
+func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string, error) {
+ var gatewayIPMaskStr, stderr string
+ var ok bool
+ var err error
+ logrus.Infof("getGatewayFromSwitch: %s", logicalSwitch)
+ if gatewayIPMaskStr, ok = oc.gatewayCache[logicalSwitch]; !ok {
+ gatewayIPMaskStr, stderr, err = util.RunOVNNbctlUnix("--if-exists",
+ "get", "logical_switch", logicalSwitch,
+ "external_ids:gateway_ip")
+ if err != nil {
+ logrus.Errorf("Failed to get gateway IP: %s, stderr: %q, %v",
+ gatewayIPMaskStr, stderr, err)
+ return "", "", err
+ }
+ if gatewayIPMaskStr == "" {
+ return "", "", fmt.Errorf("Empty gateway IP in logical switch %s",
+ logicalSwitch)
+ }
+ oc.gatewayCache[logicalSwitch] = gatewayIPMaskStr
+ }
+ gatewayIPMask := strings.Split(gatewayIPMaskStr, "/")
+ if len(gatewayIPMask) != 2 {
+ return "", "", fmt.Errorf("Failed to get IP and Mask from gateway CIDR: %s",
+ gatewayIPMaskStr)
+ }
+ gatewayIP := gatewayIPMask[0]
+ mask := gatewayIPMask[1]
+ return gatewayIP, mask, nil
+}
+
+func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) {
+
+ if pod.Spec.HostNetwork {
+ return
+ }
+
+ logrus.Infof("Deleting pod: %s", pod.Name)
+ logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name)
+
+ // get the list of logical ports from OVN
+ output, stderr, err := util.RunOVNNbctlUnix("--data=bare", "--no-heading",
+ "--columns=name", "find", "logical_switch_port", "external_ids:pod=true")
+ if err != nil {
+ logrus.Errorf("Error in obtaining list of logical ports, "+
+ "stderr: %q, err: %v",
+ stderr, err)
+ return
+ }
+ logrus.Infof("Exising Ports : %s. ", output)
+ existingLogicalPorts := strings.Fields(output)
+ for _, existingPort := range existingLogicalPorts {
+ if strings.Contains(existingPort, logicalPort) {
+ // found, delete this logical port
+ logrus.Infof("Deleting: %s. ", existingPort)
+ out, stderr, err := util.RunOVNNbctlUnix("--if-exists", "lsp-del",
+ existingPort)
+ if err != nil {
+ logrus.Errorf("Error in deleting pod's logical port "+
+ "stdout: %q, stderr: %q err: %v",
+ out, stderr, err)
+ } else {
+ delete(oc.logicalPortCache, existingPort)
+ }
+ }
+ }
+ return
+}
+
+func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, interfaceName string) (annotation string) {
+ var out, stderr string
+ var err error
+ var isStaticIP bool
+ if pod.Spec.HostNetwork {
+ return
+ }
+
+ if !oc.logicalSwitchCache[logicalSwitch] {
+ oc.logicalSwitchCache[logicalSwitch] = true
+ }
+ var portName string
+ if interfaceName != "" {
+ portName = fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, interfaceName)
+ } else {
+ return
+ }
+
+ logrus.Infof("Creating logical port for %s on switch %s", portName, logicalSwitch)
+
+ if ipAddress != "" && macAddress != "" {
+ isStaticIP = true
+ }
+ if ipAddress != "" && macAddress == "" {
+ macAddress = util.GenerateMac()
+ isStaticIP = true
+ }
+
+ if isStaticIP {
+ out, stderr, err = util.RunOVNNbctlUnix("--may-exist", "lsp-add",
+ logicalSwitch, portName, "--", "lsp-set-addresses", portName,
+ fmt.Sprintf("%s %s", macAddress, ipAddress), "--", "--if-exists",
+ "clear", "logical_switch_port", portName, "dynamic_addresses")
+ if err != nil {
+ logrus.Errorf("Failed to add logical port to switch "+
+ "stdout: %q, stderr: %q (%v)",
+ out, stderr, err)
+ return
+ }
+ } else {
+ out, stderr, err = util.RunOVNNbctlUnix("--wait=sb", "--",
+ "--may-exist", "lsp-add", logicalSwitch, portName,
+ "--", "lsp-set-addresses",
+ portName, "dynamic", "--", "set",
+ "logical_switch_port", portName,
+ "external-ids:namespace="+pod.Namespace,
+ "external-ids:logical_switch="+logicalSwitch,
+ "external-ids:pod=true")
+ if err != nil {
+ logrus.Errorf("Error while creating logical port %s "+
+ "stdout: %q, stderr: %q (%v)",
+ portName, out, stderr, err)
+ return
+ }
+ }
+ oc.logicalPortCache[portName] = logicalSwitch
+ gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch)
+ if err != nil {
+ logrus.Errorf("Error obtaining gateway address for switch %s: %s", logicalSwitch, err)
+ return
+ }
+
+ count := 30
+ for count > 0 {
+ if isStaticIP {
+ out, stderr, err = util.RunOVNNbctlUnix("get",
+ "logical_switch_port", portName, "addresses")
+ } else {
+ out, stderr, err = util.RunOVNNbctlUnix("get",
+ "logical_switch_port", portName, "dynamic_addresses")
+ }
+ if err == nil && out != "[]" {
+ break
+ }
+ if err != nil {
+ logrus.Errorf("Error while obtaining addresses for %s - %v", portName,
+ err)
+ return
+ }
+ time.Sleep(time.Second)
+ count--
+ }
+ if count == 0 {
+ logrus.Errorf("Error while obtaining addresses for %s "+
+ "stdout: %q, stderr: %q, (%v)", portName, out, stderr, err)
+ return
+ }
+
+ // static addresses have format ["0a:00:00:00:00:01 192.168.1.3"], while
+ // dynamic addresses have format "0a:00:00:00:00:01 192.168.1.3".
+ outStr := strings.TrimLeft(out, `[`)
+ outStr = strings.TrimRight(outStr, `]`)
+ outStr = strings.Trim(outStr, `"`)
+ addresses := strings.Split(outStr, " ")
+ if len(addresses) != 2 {
+ logrus.Errorf("Error while obtaining addresses for %s", portName)
+ return
+ }
+ annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP)
+ return annotation
+}
+
+func findLogicalSwitch(name string) bool {
+ // get logical switch from OVN
+ output, stderr, err := util.RunOVNNbctlUnix("--data=bare", "--no-heading",
+ "--columns=name", "find", "logical_switch", "name="+name)
+ if err != nil {
+ logrus.Errorf("Error in obtaining list of logical switch, "+
+ "stderr: %q, err: %v",
+ stderr, err)
+ return false
+ }
+
+ if strings.Compare(name, output) == 0 {
+ return true
+ } else {
+ logrus.Errorf("Error finding Switch %v", name)
+ return false
+ }
+}
+
+func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
+ var logicalSwitch string
+ var ipAddress, macAddress, interfaceName, defaultGateway string
+
+ annotation := pod.Annotations["ovnNetwork"]
+
+ if annotation != "" {
+ ovnNetObjs, err := oc.parseOvnNetworkObject(annotation)
+ if err != nil {
+ logrus.Errorf("addLogicalPort : Error Parsing OvnNetwork List")
+ return
+ }
+ var ovnString, outStr string
+ ovnString = "["
+ for _, net := range ovnNetObjs {
+ logicalSwitch = net["name"].(string)
+ if _, ok := net["interface"]; ok {
+ interfaceName = net["interface"].(string)
+ } else {
+ interfaceName = ""
+ }
+ if _, ok := net["ipAddress"]; ok {
+ ipAddress = net["ipAddress"].(string)
+ } else {
+ ipAddress = ""
+ }
+ if _, ok := net["macAddress"]; ok {
+ macAddress = net["macAddress"].(string)
+ } else {
+ macAddress = ""
+ }
+ if _, ok := net["defaultGateway"]; ok {
+ defaultGateway = net["defaultGateway"].(string)
+ } else {
+ defaultGateway = "false"
+ }
+ if !findLogicalSwitch(logicalSwitch) {
+ return
+ }
+ if interfaceName == "" {
+ logrus.Errorf("Interface name must be provided")
+ return
+ }
+ outStr = oc.addLogicalPortWithSwitch(pod, logicalSwitch, ipAddress, macAddress, interfaceName)
+ if outStr == "" {
+ return
+ }
+ last := len(outStr) - 1
+ tmpString := outStr[:last]
+ tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + defaultGateway + "\\\""
+ tmpString += "," + "\\\"interface\\\":" + "\\\"" + interfaceName + "\\\"}"
+ ovnString += tmpString
+ ovnString += ","
+ }
+ last := len(ovnString) - 1
+ ovnString = ovnString[:last]
+ ovnString += "]"
+ logrus.Debugf("ovnIfaceList - %v", ovnString)
+ err = oc.kube.SetAnnotationOnPod(pod, "ovnIfaceList", ovnString)
+ if err != nil {
+ logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err)
+ }
+ }
+}
diff --git a/internal/pkg/ovn/router.go b/internal/pkg/ovn/router.go
new file mode 100644
index 0000000..d98c463
--- /dev/null
+++ b/internal/pkg/ovn/router.go
@@ -0,0 +1,50 @@
+package ovn
+
+import (
+ "github.com/sirupsen/logrus"
+ "ovn4nfv-k8s-plugin/internal/pkg/util"
+)
+
+func SetupMaster(name string) error {
+
+ // Make sure br-int is created.
+ stdout, stderr, err := util.RunOVSVsctl("--", "--may-exist", "add-br", "br-int")
+ if err != nil {
+ logrus.Errorf("Failed to create br-int, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ return err
+ }
+ // Create a single common distributed router for the cluster.
+ stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes")
+ if err != nil {
+ logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ return err
+ }
+ // Create a logical switch called "ovn4nfv-join" that will be used to connect gateway routers to the distributed router.
+ // The "ovn4nfv-join" will be allocated IP addresses in the range 100.64.1.0/24.
+ stdout, stderr, err = util.RunOVNNbctlUnix("--may-exist", "ls-add", "ovn4nfv-join")
+ if err != nil {
+ logrus.Errorf("Failed to create logical switch called \"ovn4nfv-join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ return err
+ }
+ // Connect the distributed router to "ovn4nfv-join".
+ routerMac, stderr, err := util.RunOVNNbctlUnix("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac")
+ if err != nil {
+ logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", name, stderr, err)
+ return err
+ }
+ if routerMac == "" {
+ routerMac = util.GenerateMac()
+ stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes")
+ if err != nil {
+ logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", name, stdout, stderr, err)
+ return err
+ }
+ }
+ // Connect the switch "ovn4nfv-join" to the router.
+ stdout, stderr, err = util.RunOVNNbctlUnix("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"")
+ if err != nil {
+ logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ return err
+ }
+ return nil
+}
diff --git a/internal/pkg/testing/testing.go b/internal/pkg/testing/testing.go
new file mode 100644
index 0000000..4c2afad
--- /dev/null
+++ b/internal/pkg/testing/testing.go
@@ -0,0 +1,55 @@
+package testing
+
+import (
+ "strings"
+
+ kexec "k8s.io/utils/exec"
+ fakeexec "k8s.io/utils/exec/testing"
+
+ "github.com/onsi/gomega"
+)
+
+// ExpectedCmd contains properties that the testcase expects a called command
+// to have as well as the output that the fake command should return
+type ExpectedCmd struct {
+ // Cmd should be the command-line string of the executable name and all arguments it is expected to be called with
+ Cmd string
+ // Output is any stdout output which Cmd should produce
+ Output string
+ // Stderr is any stderr output which Cmd should produce
+ Stderr string
+ // Err is any error that should be returned for the invocation of Cmd
+ Err error
+}
+
+// AddFakeCmd takes the ExpectedCmd and appends its runner function to
+// a fake command action list
+func AddFakeCmd(fakeCmds []fakeexec.FakeCommandAction, expected *ExpectedCmd) []fakeexec.FakeCommandAction {
+ return append(fakeCmds, func(cmd string, args ...string) kexec.Cmd {
+ parts := strings.Split(expected.Cmd, " ")
+ gomega.Expect(cmd).To(gomega.Equal("/fake-bin/" + parts[0]))
+ gomega.Expect(strings.Join(args, " ")).To(gomega.Equal(strings.Join(parts[1:], " ")))
+ return &fakeexec.FakeCmd{
+ Argv: parts[1:],
+ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
+ func() ([]byte, error) {
+ return []byte(expected.Output), expected.Err
+ },
+ },
+ RunScript: []fakeexec.FakeRunAction{
+ func() ([]byte, []byte, error) {
+ return []byte(expected.Output), []byte(expected.Stderr), expected.Err
+ },
+ },
+ }
+ })
+}
+
+// AddFakeCmdsNoOutputNoError takes a list of commands and appends those commands
+// to the expected command set. The command cannot return any output or error.
+func AddFakeCmdsNoOutputNoError(fakeCmds []fakeexec.FakeCommandAction, commands []string) []fakeexec.FakeCommandAction {
+ for _, cmd := range commands {
+ fakeCmds = AddFakeCmd(fakeCmds, &ExpectedCmd{Cmd: cmd})
+ }
+ return fakeCmds
+}
diff --git a/internal/pkg/util/.gitkeep b/internal/pkg/util/.gitkeep
deleted file mode 100644
index e69de29..0000000
--- a/internal/pkg/util/.gitkeep
+++ /dev/null
diff --git a/internal/pkg/util/net.go b/internal/pkg/util/net.go
new file mode 100644
index 0000000..1ff7fbb
--- /dev/null
+++ b/internal/pkg/util/net.go
@@ -0,0 +1,34 @@
+package util
+
+import (
+ "fmt"
+ "math/big"
+ "math/rand"
+ "net"
+ "time"
+)
+
+// GenerateMac generates mac address.
+func GenerateMac() string {
+ prefix := "00:00:00"
+ newRand := rand.New(rand.NewSource(time.Now().UnixNano()))
+ mac := fmt.Sprintf("%s:%02x:%02x:%02x", prefix, newRand.Intn(255), newRand.Intn(255), newRand.Intn(255))
+ return mac
+}
+
+// NextIP returns IP incremented by 1
+func NextIP(ip net.IP) net.IP {
+ i := ipToInt(ip)
+ return intToIP(i.Add(i, big.NewInt(1)))
+}
+
+func ipToInt(ip net.IP) *big.Int {
+ if v := ip.To4(); v != nil {
+ return big.NewInt(0).SetBytes(v)
+ }
+ return big.NewInt(0).SetBytes(ip.To16())
+}
+
+func intToIP(i *big.Int) net.IP {
+ return net.IP(i.Bytes())
+}
diff --git a/internal/pkg/util/ovs.go b/internal/pkg/util/ovs.go
new file mode 100644
index 0000000..22b42b9
--- /dev/null
+++ b/internal/pkg/util/ovs.go
@@ -0,0 +1,126 @@
+package util
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+ "time"
+ "unicode"
+
+ "github.com/sirupsen/logrus"
+ kexec "k8s.io/utils/exec"
+)
+
+const (
+ ovsCommandTimeout = 15
+ ovsVsctlCommand = "ovs-vsctl"
+ ovsOfctlCommand = "ovs-ofctl"
+ ovnNbctlCommand = "ovn-nbctl"
+ ipCommand = "ip"
+)
+
+// Exec runs various OVN and OVS utilities
+type execHelper struct {
+ exec kexec.Interface
+ ofctlPath string
+ vsctlPath string
+ nbctlPath string
+ ipPath string
+}
+
+var runner *execHelper
+
+// SetExec validates executable paths and saves the given exec interface
+// to be used for running various OVS and OVN utilites
+func SetExec(exec kexec.Interface) error {
+ var err error
+
+ runner = &execHelper{exec: exec}
+ runner.ofctlPath, err = exec.LookPath(ovsOfctlCommand)
+ if err != nil {
+ return err
+ }
+ runner.vsctlPath, err = exec.LookPath(ovsVsctlCommand)
+ if err != nil {
+ return err
+ }
+ runner.nbctlPath, err = exec.LookPath(ovnNbctlCommand)
+ if err != nil {
+ return err
+ }
+ runner.ipPath, err = exec.LookPath(ipCommand)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// Run the ovn-ctl command and retry if "Connection refused"
+// poll waitng for service to become available
+func runOVNretry(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) {
+
+ retriesLeft := 200
+ for {
+ stdout, stderr, err := run(cmdPath, args...)
+ if err == nil {
+ return stdout, stderr, err
+ }
+
+ // Connection refused
+ // Master may not be up so keep trying
+ if strings.Contains(stderr.String(), "Connection refused") {
+ if retriesLeft == 0 {
+ return stdout, stderr, err
+ }
+ retriesLeft--
+ time.Sleep(2 * time.Second)
+ } else {
+ // Some other problem for caller to handle
+ return stdout, stderr, err
+ }
+ }
+}
+
+func run(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) {
+ stdout := &bytes.Buffer{}
+ stderr := &bytes.Buffer{}
+ cmd := runner.exec.Command(cmdPath, args...)
+ cmd.SetStdout(stdout)
+ cmd.SetStderr(stderr)
+ logrus.Debugf("exec: %s %s", cmdPath, strings.Join(args, " "))
+ err := cmd.Run()
+ if err != nil {
+ logrus.Debugf("exec: %s %s => %v", cmdPath, strings.Join(args, " "), err)
+ }
+ return stdout, stderr, err
+}
+
+// RunOVSOfctl runs a command via ovs-ofctl.
+func RunOVSOfctl(args ...string) (string, string, error) {
+ stdout, stderr, err := run(runner.ofctlPath, args...)
+ return strings.Trim(stdout.String(), "\" \n"), stderr.String(), err
+}
+
+// RunOVSVsctl runs a command via ovs-vsctl.
+func RunOVSVsctl(args ...string) (string, string, error) {
+ cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)}
+ cmdArgs = append(cmdArgs, args...)
+ stdout, stderr, err := run(runner.vsctlPath, cmdArgs...)
+ return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err
+}
+
+// RunOVNNbctlUnix runs command via ovn-nbctl, with ovn-nbctl using the unix
+// domain sockets to connect to the ovsdb-server backing the OVN NB database.
+func RunOVNNbctlUnix(args ...string) (string, string, error) {
+ cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)}
+ cmdArgs = append(cmdArgs, args...)
+ stdout, stderr, err := runOVNretry(runner.nbctlPath, cmdArgs...)
+ return strings.Trim(strings.TrimFunc(stdout.String(), unicode.IsSpace), "\""),
+ stderr.String(), err
+}
+
+// RunIP runs a command via the iproute2 "ip" utility
+func RunIP(args ...string) (string, string, error) {
+ stdout, stderr, err := run(runner.ipPath, args...)
+ return strings.TrimSpace(stdout.String()), stderr.String(), err
+}