diff options
author | 2018-11-10 09:56:52 -0800 | |
---|---|---|
committer | 2018-11-20 01:50:58 -0800 | |
commit | 5026d1d89b05eac5e004279b742df6745a73d93a (patch) | |
tree | 8f9aed1e476706e008b746debda6d616bd0ac7a5 /internal/pkg/factory | |
parent | 9506ae48eb545d502cc3685a99862740d28e7afb (diff) |
Seed code for the Plugin
The code includes ovn4nfvk8s Plugin & CNI. It implements multiple OVN
interfaces for Pods and assumes Multus (or similar CNI) calls its CNI
not as first CNI.
Change-Id: I524c1d18752eb6dbc8d34addd3b60d5bbaa06ff4
Signed-off-by: Ritu Sood <ritu.sood@intel.com>
Signed-off-by: Victor Morales <victor.morales@intel.com>
Diffstat (limited to 'internal/pkg/factory')
-rw-r--r-- | internal/pkg/factory/.gitkeep | 0 | ||||
-rw-r--r-- | internal/pkg/factory/factory.go | 318 | ||||
-rw-r--r-- | internal/pkg/factory/factory_test.go | 686 |
3 files changed, 1004 insertions, 0 deletions
diff --git a/internal/pkg/factory/.gitkeep b/internal/pkg/factory/.gitkeep deleted file mode 100644 index e69de29..0000000 --- a/internal/pkg/factory/.gitkeep +++ /dev/null diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go new file mode 100644 index 0000000..a635e3f --- /dev/null +++ b/internal/pkg/factory/factory.go @@ -0,0 +1,318 @@ +package factory + +import ( + "fmt" + "reflect" + "sync" + "sync/atomic" + "time" + + "github.com/sirupsen/logrus" + + kapi "k8s.io/api/core/v1" + knet "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + informerfactory "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type informer struct { + sync.Mutex + oType reflect.Type + inf cache.SharedIndexInformer + handlers map[uint64]cache.ResourceEventHandler +} + +func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) { + i.Lock() + defer i.Unlock() + + objType := reflect.TypeOf(obj) + if objType != i.oType { + logrus.Errorf("object type %v did not match expected %v", objType, i.oType) + return + } + + for id, handler := range i.handlers { + f(id, handler) + } +} + +// WatchFactory initializes and manages common kube watches +type WatchFactory struct { + iFactory informerfactory.SharedInformerFactory + informers map[reflect.Type]*informer + handlerCounter uint64 +} + +const ( + resyncInterval = 12 * time.Hour +) + +func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer { + return &informer{ + oType: oType, + inf: inf, + handlers: make(map[uint64]cache.ResourceEventHandler), + } +} + +var ( + podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) + serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) + endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) + policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) + namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) + nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) +) + +// NewWatchFactory initializes a new watch factory +func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) { + // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have + // any race condition where a resync may be required e.g. cni executable on node watching for + // events on pods and assuming that an 'ADD' event will contain the annotations put in by + // ovnkube master (currently, it is just a 'get' loop) + // the downside of making it tight (like 10 minutes) is needless spinning on all resources + wf := &WatchFactory{ + iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval), + informers: make(map[reflect.Type]*informer), + } + + // Create shared informers we know we'll use + wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer()) + wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer()) + wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer()) + wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer()) + wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer()) + wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer()) + + wf.iFactory.Start(stopChan) + res := wf.iFactory.WaitForCacheSync(stopChan) + for oType, synced := range res { + if !synced { + return nil, fmt.Errorf("error in syncing cache for %v informer", oType) + } + informer := wf.informers[oType] + informer.inf.AddEventHandler(wf.newFederatedHandler(informer)) + } + + return wf, nil +} + +func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v ADD event for handler %d", inf.oType, id) + handler.OnAdd(obj) + }) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id) + handler.OnUpdate(oldObj, newObj) + }) + }, + DeleteFunc: func(obj interface{}) { + if inf.oType != reflect.TypeOf(obj) { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + logrus.Errorf("couldn't get object from tombstone: %+v", obj) + return + } + obj = tombstone.Obj + objType := reflect.TypeOf(obj) + if inf.oType != objType { + logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType) + return + } + } + inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { + logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id) + handler.OnDelete(obj) + }) + }, + } +} + +func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) { + switch objType { + case podType: + if pod, ok := obj.(*kapi.Pod); ok { + return &pod.ObjectMeta, nil + } + case serviceType: + if service, ok := obj.(*kapi.Service); ok { + return &service.ObjectMeta, nil + } + case endpointsType: + if endpoints, ok := obj.(*kapi.Endpoints); ok { + return &endpoints.ObjectMeta, nil + } + case policyType: + if policy, ok := obj.(*knet.NetworkPolicy); ok { + return &policy.ObjectMeta, nil + } + case namespaceType: + if namespace, ok := obj.(*kapi.Namespace); ok { + return &namespace.ObjectMeta, nil + } + case nodeType: + if node, ok := obj.(*kapi.Node); ok { + return &node.ObjectMeta, nil + } + } + return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType) +} + +func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + inf, ok := wf.informers[objType] + if !ok { + return 0, fmt.Errorf("unknown object type %v", objType) + } + + sel, err := metav1.LabelSelectorAsSelector(lsel) + if err != nil { + return 0, fmt.Errorf("error creating label selector: %v", err) + } + + filterFunc := func(obj interface{}) bool { + if namespace == "" && lsel == nil { + // Unfiltered handler + return true + } + meta, err := getObjectMeta(objType, obj) + if err != nil { + logrus.Errorf("watch handler filter error: %v", err) + return false + } + if namespace != "" && meta.Namespace != namespace { + return false + } + if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) { + return false + } + return true + } + + // Process existing items as a set so the caller can clean up + // after a restart or whatever + existingItems := inf.inf.GetStore().List() + if processExisting != nil { + items := make([]interface{}, 0) + for _, obj := range existingItems { + if filterFunc(obj) { + items = append(items, obj) + } + } + processExisting(items) + } + + handlerID := atomic.AddUint64(&wf.handlerCounter, 1) + + inf.Lock() + defer inf.Unlock() + + inf.handlers[handlerID] = cache.FilteringResourceEventHandler{ + FilterFunc: filterFunc, + Handler: funcs, + } + logrus.Debugf("added %v event handler %d", objType, handlerID) + + // Send existing items to the handler's add function; informers usually + // do this but since we share informers, it's long-since happened so + // we must emulate that here + for _, obj := range existingItems { + inf.handlers[handlerID].OnAdd(obj) + } + + return handlerID, nil +} + +func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error { + inf, ok := wf.informers[objType] + if !ok { + return fmt.Errorf("tried to remove unknown object type %v event handler", objType) + } + + inf.Lock() + defer inf.Unlock() + if _, ok := inf.handlers[handlerID]; !ok { + return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID) + } + delete(inf.handlers, handlerID) + logrus.Debugf("removed %v event handler %d", objType, handlerID) + return nil +} + +// AddPodHandler adds a handler function that will be executed on Pod object changes +func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(podType, "", nil, handlerFuncs, processExisting) +} + +// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change +func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting) +} + +// RemovePodHandler removes a Pod object event handler function +func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error { + return wf.removeHandler(podType, handlerID) +} + +// AddServiceHandler adds a handler function that will be executed on Service object changes +func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting) +} + +// RemoveServiceHandler removes a Service object event handler function +func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error { + return wf.removeHandler(serviceType, handlerID) +} + +// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes +func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting) +} + +// RemoveEndpointsHandler removes a Endpoints object event handler function +func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error { + return wf.removeHandler(endpointsType, handlerID) +} + +// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes +func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting) +} + +// RemovePolicyHandler removes a NetworkPolicy object event handler function +func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error { + return wf.removeHandler(policyType, handlerID) +} + +// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes +func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting) +} + +// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change +func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting) +} + +// RemoveNamespaceHandler removes a Namespace object event handler function +func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error { + return wf.removeHandler(namespaceType, handlerID) +} + +// AddNodeHandler adds a handler function that will be executed on Node object changes +func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { + return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting) +} + +// RemoveNodeHandler removes a Node object event handler function +func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error { + return wf.removeHandler(nodeType, handlerID) +} diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go new file mode 100644 index 0000000..38fb77e --- /dev/null +++ b/internal/pkg/factory/factory_test.go @@ -0,0 +1,686 @@ +package factory + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + knet "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestFactory(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Watch Factory Suite") +} + +func newObjectMeta(name, namespace string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + Namespace: namespace, + Labels: map[string]string{ + "name": name, + }, + } +} + +func newPod(name, namespace string) *v1.Pod { + return &v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + ObjectMeta: newObjectMeta(name, namespace), + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "containerName", + Image: "containerImage", + }, + }, + NodeName: "mynode", + }, + } +} + +func newNamespace(name string) *v1.Namespace { + return &v1.Namespace{ + Status: v1.NamespaceStatus{ + Phase: v1.NamespaceActive, + }, + ObjectMeta: newObjectMeta(name, name), + } +} + +func newNode(name string) *v1.Node { + return &v1.Node{ + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(name, ""), + } +} + +func newPolicy(name, namespace string) *knet.NetworkPolicy { + return &knet.NetworkPolicy{ + ObjectMeta: newObjectMeta(name, namespace), + } +} + +func newEndpoints(name, namespace string) *v1.Endpoints { + return &v1.Endpoints{ + ObjectMeta: newObjectMeta(name, namespace), + } +} + +func newService(name, namespace string) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(name), + Namespace: namespace, + Labels: map[string]string{ + "name": name, + }, + }, + } +} + +func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { + w := watch.NewFake() + c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) + c.AddReactor("list", objType, listFn) + return w +} + +var _ = Describe("Watch Factory Operations", func() { + var ( + fakeClient *fake.Clientset + podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher + policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher + pods []*v1.Pod + namespaces []*v1.Namespace + nodes []*v1.Node + policies []*knet.NetworkPolicy + endpoints []*v1.Endpoints + services []*v1.Service + stop chan struct{} + numAdded, numUpdated, numDeleted int + ) + + BeforeEach(func() { + fakeClient = &fake.Clientset{} + stop = make(chan struct{}) + + pods = make([]*v1.Pod, 0) + podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.PodList{} + for _, p := range pods { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + namespaces = make([]*v1.Namespace, 0) + namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.NamespaceList{} + for _, p := range namespaces { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + nodes = make([]*v1.Node, 0) + nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.NodeList{} + for _, p := range nodes { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + policies = make([]*knet.NetworkPolicy, 0) + policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) { + obj := &knet.NetworkPolicyList{} + for _, p := range policies { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + endpoints = make([]*v1.Endpoints, 0) + endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.EndpointsList{} + for _, p := range endpoints { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + services = make([]*v1.Service, 0) + serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) { + obj := &v1.ServiceList{} + for _, p := range services { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) + + numAdded = 0 + numUpdated = 0 + numDeleted = 0 + }) + + Context("when a processExisting is given", func() { + testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + id, err := wf.addHandler(objType, namespace, lsel, + cache.ResourceEventHandlerFuncs{}, + func(objs []interface{}) { + Expect(len(objs)).To(Equal(1)) + }) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(BeNumerically(">", uint64(0))) + wf.removeHandler(objType, id) + close(stop) + } + + It("is called for each existing pod", func() { + pods = append(pods, newPod("pod1", "default")) + testExisting(podType, "", nil) + }) + + It("is called for each existing namespace", func() { + namespaces = append(namespaces, newNamespace("default")) + testExisting(namespaceType, "", nil) + }) + + It("is called for each existing node", func() { + nodes = append(nodes, newNode("default")) + testExisting(nodeType, "", nil) + }) + + It("is called for each existing policy", func() { + policies = append(policies, newPolicy("denyall", "default")) + testExisting(policyType, "", nil) + }) + + It("is called for each existing endpoints", func() { + endpoints = append(endpoints, newEndpoints("myendpoint", "default")) + testExisting(endpointsType, "", nil) + }) + + It("is called for each existing service", func() { + services = append(services, newService("myservice", "default")) + testExisting(serviceType, "", nil) + }) + + It("is called for each existing pod that matches a given namespace and label", func() { + pod := newPod("pod1", "default") + pod.ObjectMeta.Labels["blah"] = "foobar" + pods = append(pods, pod) + testExisting(podType, "default", &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }) + }) + }) + + Context("when existing items are known to the informer", func() { + testExisting := func(objType reflect.Type) { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + id, err := wf.addHandler(objType, "", nil, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + numAdded++ + }, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(numAdded).To(Equal(2)) + wf.removeHandler(objType, id) + close(stop) + } + + It("calls ADD for each existing pod", func() { + pods = append(pods, newPod("pod1", "default")) + pods = append(pods, newPod("pod2", "default")) + testExisting(podType) + }) + + It("calls ADD for each existing namespace", func() { + namespaces = append(namespaces, newNamespace("default")) + namespaces = append(namespaces, newNamespace("default2")) + testExisting(namespaceType) + }) + + It("calls ADD for each existing node", func() { + nodes = append(nodes, newNode("default")) + nodes = append(nodes, newNode("default2")) + testExisting(nodeType) + }) + + It("calls ADD for each existing policy", func() { + policies = append(policies, newPolicy("denyall", "default")) + policies = append(policies, newPolicy("denyall2", "default")) + testExisting(policyType) + }) + + It("calls ADD for each existing endpoints", func() { + endpoints = append(endpoints, newEndpoints("myendpoint", "default")) + endpoints = append(endpoints, newEndpoints("myendpoint2", "default")) + testExisting(endpointsType) + }) + + It("calls ADD for each existing service", func() { + services = append(services, newService("myservice", "default")) + services = append(services, newService("myservice2", "default")) + testExisting(serviceType) + }) + }) + + addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 { + id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer GinkgoRecover() + numAdded++ + funcs.AddFunc(obj) + }, + UpdateFunc: func(old, new interface{}) { + defer GinkgoRecover() + numUpdated++ + funcs.UpdateFunc(old, new) + }, + DeleteFunc: func(obj interface{}) { + defer GinkgoRecover() + numDeleted++ + funcs.DeleteFunc(obj) + }, + }, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(BeNumerically(">", uint64(0))) + return id + } + + addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 { + return addFilteredHandler(wf, objType, "", nil, funcs) + } + + It("responds to pod add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newPod("pod1", "default") + id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newPod := new.(*v1.Pod) + Expect(reflect.DeepEqual(newPod, added)).To(BeTrue()) + Expect(newPod.Spec.NodeName).To(Equal("foobar")) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) + }, + }) + + pods = append(pods, added) + podWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.NodeName = "foobar" + podWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + pods = pods[:0] + podWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.RemovePodHandler(id) + close(stop) + }) + + It("responds to namespace add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNamespace("default") + id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ns := obj.(*v1.Namespace) + Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNS := new.(*v1.Namespace) + Expect(reflect.DeepEqual(newNS, added)).To(BeTrue()) + Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating)) + }, + DeleteFunc: func(obj interface{}) { + ns := obj.(*v1.Namespace) + Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) + }, + }) + + namespaces = append(namespaces, added) + namespaceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + namespaces = namespaces[:0] + namespaceWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.RemoveNamespaceHandler(id) + close(stop) + }) + + It("responds to node add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNode("mynode") + id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + Expect(reflect.DeepEqual(node, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNode := new.(*v1.Node) + Expect(reflect.DeepEqual(newNode, added)).To(BeTrue()) + Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated)) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1.Node) + Expect(reflect.DeepEqual(node, added)).To(BeTrue()) + }, + }) + + nodes = append(nodes, added) + nodeWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Status.Phase = v1.NodeTerminated + nodeWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + nodes = nodes[:0] + nodeWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(nodeType, id) + close(stop) + }) + + It("responds to policy add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newPolicy("mypolicy", "default") + id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + np := obj.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(np, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newNP := new.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(newNP, added)).To(BeTrue()) + Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress})) + }, + DeleteFunc: func(obj interface{}) { + np := obj.(*knet.NetworkPolicy) + Expect(reflect.DeepEqual(np, added)).To(BeTrue()) + }, + }) + + policies = append(policies, added) + policyWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress} + policyWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + policies = policies[:0] + policyWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(policyType, id) + close(stop) + }) + + It("responds to endpoints add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newEndpoints("myendpoints", "default") + id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + eps := obj.(*v1.Endpoints) + Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newEPs := new.(*v1.Endpoints) + Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue()) + Expect(len(newEPs.Subsets)).To(Equal(1)) + }, + DeleteFunc: func(obj interface{}) { + eps := obj.(*v1.Endpoints) + Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) + }, + }) + + endpoints = append(endpoints, added) + endpointsWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Subsets = append(added.Subsets, v1.EndpointSubset{ + Ports: []v1.EndpointPort{ + { + Name: "foobar", + Port: 1234, + }, + }, + }) + endpointsWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + endpoints = endpoints[:0] + endpointsWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(endpointsType, id) + close(stop) + }) + + It("responds to service add/update/delete events", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newService("myservice", "default") + id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + service := obj.(*v1.Service) + Expect(reflect.DeepEqual(service, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newService := new.(*v1.Service) + Expect(reflect.DeepEqual(newService, added)).To(BeTrue()) + Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1")) + }, + DeleteFunc: func(obj interface{}) { + service := obj.(*v1.Service) + Expect(reflect.DeepEqual(service, added)).To(BeTrue()) + }, + }) + + services = append(services, added) + serviceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + added.Spec.ClusterIP = "1.1.1.1" + serviceWatch.Modify(added) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + services = services[:0] + serviceWatch.Delete(added) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + wf.removeHandler(serviceType, id) + close(stop) + }) + + It("stops processing events after the handler is removed", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + added := newNamespace("default") + id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) {}, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }) + + namespaces = append(namespaces, added) + namespaceWatch.Add(added) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + wf.RemoveNamespaceHandler(id) + + added2 := newNamespace("other") + namespaces = append(namespaces, added2) + namespaceWatch.Add(added2) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + added2.Status.Phase = v1.NamespaceTerminating + namespaceWatch.Modify(added2) + Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + namespaces = []*v1.Namespace{added} + namespaceWatch.Delete(added2) + Consistently(func() int { return numDeleted }, 2).Should(Equal(0)) + + close(stop) + }) + + It("filters correctly by label and namespace", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + passesFilter := newPod("pod1", "default") + passesFilter.ObjectMeta.Labels["blah"] = "foobar" + failsFilter := newPod("pod2", "default") + failsFilter.ObjectMeta.Labels["blah"] = "baz" + failsFilter2 := newPod("pod3", "otherns") + failsFilter2.ObjectMeta.Labels["blah"] = "foobar" + + addFilteredHandler(wf, + podType, + "default", + &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newPod := new.(*v1.Pod) + Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue()) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) + }, + }) + + pods = append(pods, passesFilter) + podWatch.Add(passesFilter) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + + // numAdded should remain 1 + pods = append(pods, failsFilter) + podWatch.Add(failsFilter) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + // numAdded should remain 1 + pods = append(pods, failsFilter2) + podWatch.Add(failsFilter2) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + + passesFilter.Status.Phase = v1.PodFailed + podWatch.Modify(passesFilter) + Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) + + // numAdded should remain 1 + failsFilter.Status.Phase = v1.PodFailed + podWatch.Modify(failsFilter) + Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + + failsFilter2.Status.Phase = v1.PodFailed + podWatch.Modify(failsFilter2) + Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) + + pods = []*v1.Pod{failsFilter, failsFilter2} + podWatch.Delete(passesFilter) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + + close(stop) + }) + + It("correctly handles object updates that cause filter changes", func() { + wf, err := NewWatchFactory(fakeClient, stop) + Expect(err).NotTo(HaveOccurred()) + + pod := newPod("pod1", "default") + pod.ObjectMeta.Labels["blah"] = "baz" + + equalPod := pod + id := addFilteredHandler(wf, + podType, + "default", + &metav1.LabelSelector{ + MatchLabels: map[string]string{"blah": "foobar"}, + }, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + p := obj.(*v1.Pod) + Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) {}, + DeleteFunc: func(obj interface{}) { + p := obj.(*v1.Pod) + Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) + }, + }) + + pods = append(pods, pod) + + // Pod doesn't pass filter; shouldn't be added + podWatch.Add(pod) + Consistently(func() int { return numAdded }, 2).Should(Equal(0)) + + // Update pod to pass filter; should be treated as add. Need + // to deep-copy pod when modifying because it's a pointer all + // the way through when using FakeClient + podCopy := pod.DeepCopy() + podCopy.ObjectMeta.Labels["blah"] = "foobar" + pods = []*v1.Pod{podCopy} + equalPod = podCopy + podWatch.Modify(podCopy) + Eventually(func() int { return numAdded }, 2).Should(Equal(1)) + + // Update pod to fail filter; should be treated as delete + pod.ObjectMeta.Labels["blah"] = "baz" + podWatch.Modify(pod) + Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) + Consistently(func() int { return numAdded }, 2).Should(Equal(1)) + Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) + + wf.RemovePodHandler(id) + close(stop) + }) +}) |