aboutsummaryrefslogtreecommitdiffstats
path: root/internal/pkg/factory
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2018-11-10 09:56:52 -0800
committerVictor Morales <victor.morales@intel.com>2018-11-20 01:50:58 -0800
commit5026d1d89b05eac5e004279b742df6745a73d93a (patch)
tree8f9aed1e476706e008b746debda6d616bd0ac7a5 /internal/pkg/factory
parent9506ae48eb545d502cc3685a99862740d28e7afb (diff)
Seed code for the Plugin
The code includes ovn4nfvk8s Plugin & CNI. It implements multiple OVN interfaces for Pods and assumes Multus (or similar CNI) calls its CNI not as first CNI. Change-Id: I524c1d18752eb6dbc8d34addd3b60d5bbaa06ff4 Signed-off-by: Ritu Sood <ritu.sood@intel.com> Signed-off-by: Victor Morales <victor.morales@intel.com>
Diffstat (limited to 'internal/pkg/factory')
-rw-r--r--internal/pkg/factory/.gitkeep0
-rw-r--r--internal/pkg/factory/factory.go318
-rw-r--r--internal/pkg/factory/factory_test.go686
3 files changed, 1004 insertions, 0 deletions
diff --git a/internal/pkg/factory/.gitkeep b/internal/pkg/factory/.gitkeep
deleted file mode 100644
index e69de29..0000000
--- a/internal/pkg/factory/.gitkeep
+++ /dev/null
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go
new file mode 100644
index 0000000..a635e3f
--- /dev/null
+++ b/internal/pkg/factory/factory.go
@@ -0,0 +1,318 @@
+package factory
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/sirupsen/logrus"
+
+ kapi "k8s.io/api/core/v1"
+ knet "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ informerfactory "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+)
+
+type informer struct {
+ sync.Mutex
+ oType reflect.Type
+ inf cache.SharedIndexInformer
+ handlers map[uint64]cache.ResourceEventHandler
+}
+
+func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) {
+ i.Lock()
+ defer i.Unlock()
+
+ objType := reflect.TypeOf(obj)
+ if objType != i.oType {
+ logrus.Errorf("object type %v did not match expected %v", objType, i.oType)
+ return
+ }
+
+ for id, handler := range i.handlers {
+ f(id, handler)
+ }
+}
+
+// WatchFactory initializes and manages common kube watches
+type WatchFactory struct {
+ iFactory informerfactory.SharedInformerFactory
+ informers map[reflect.Type]*informer
+ handlerCounter uint64
+}
+
+const (
+ resyncInterval = 12 * time.Hour
+)
+
+func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer {
+ return &informer{
+ oType: oType,
+ inf: inf,
+ handlers: make(map[uint64]cache.ResourceEventHandler),
+ }
+}
+
+var (
+ podType reflect.Type = reflect.TypeOf(&kapi.Pod{})
+ serviceType reflect.Type = reflect.TypeOf(&kapi.Service{})
+ endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{})
+ policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{})
+ namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{})
+ nodeType reflect.Type = reflect.TypeOf(&kapi.Node{})
+)
+
+// NewWatchFactory initializes a new watch factory
+func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) {
+ // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
+ // any race condition where a resync may be required e.g. cni executable on node watching for
+ // events on pods and assuming that an 'ADD' event will contain the annotations put in by
+ // ovnkube master (currently, it is just a 'get' loop)
+ // the downside of making it tight (like 10 minutes) is needless spinning on all resources
+ wf := &WatchFactory{
+ iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
+ informers: make(map[reflect.Type]*informer),
+ }
+
+ // Create shared informers we know we'll use
+ wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer())
+ wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer())
+ wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer())
+ wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer())
+ wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer())
+ wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer())
+
+ wf.iFactory.Start(stopChan)
+ res := wf.iFactory.WaitForCacheSync(stopChan)
+ for oType, synced := range res {
+ if !synced {
+ return nil, fmt.Errorf("error in syncing cache for %v informer", oType)
+ }
+ informer := wf.informers[oType]
+ informer.inf.AddEventHandler(wf.newFederatedHandler(informer))
+ }
+
+ return wf, nil
+}
+
+func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs {
+ return cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
+ logrus.Debugf("running %v ADD event for handler %d", inf.oType, id)
+ handler.OnAdd(obj)
+ })
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) {
+ logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id)
+ handler.OnUpdate(oldObj, newObj)
+ })
+ },
+ DeleteFunc: func(obj interface{}) {
+ if inf.oType != reflect.TypeOf(obj) {
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ logrus.Errorf("couldn't get object from tombstone: %+v", obj)
+ return
+ }
+ obj = tombstone.Obj
+ objType := reflect.TypeOf(obj)
+ if inf.oType != objType {
+ logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType)
+ return
+ }
+ }
+ inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
+ logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id)
+ handler.OnDelete(obj)
+ })
+ },
+ }
+}
+
+func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
+ switch objType {
+ case podType:
+ if pod, ok := obj.(*kapi.Pod); ok {
+ return &pod.ObjectMeta, nil
+ }
+ case serviceType:
+ if service, ok := obj.(*kapi.Service); ok {
+ return &service.ObjectMeta, nil
+ }
+ case endpointsType:
+ if endpoints, ok := obj.(*kapi.Endpoints); ok {
+ return &endpoints.ObjectMeta, nil
+ }
+ case policyType:
+ if policy, ok := obj.(*knet.NetworkPolicy); ok {
+ return &policy.ObjectMeta, nil
+ }
+ case namespaceType:
+ if namespace, ok := obj.(*kapi.Namespace); ok {
+ return &namespace.ObjectMeta, nil
+ }
+ case nodeType:
+ if node, ok := obj.(*kapi.Node); ok {
+ return &node.ObjectMeta, nil
+ }
+ }
+ return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType)
+}
+
+func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ inf, ok := wf.informers[objType]
+ if !ok {
+ return 0, fmt.Errorf("unknown object type %v", objType)
+ }
+
+ sel, err := metav1.LabelSelectorAsSelector(lsel)
+ if err != nil {
+ return 0, fmt.Errorf("error creating label selector: %v", err)
+ }
+
+ filterFunc := func(obj interface{}) bool {
+ if namespace == "" && lsel == nil {
+ // Unfiltered handler
+ return true
+ }
+ meta, err := getObjectMeta(objType, obj)
+ if err != nil {
+ logrus.Errorf("watch handler filter error: %v", err)
+ return false
+ }
+ if namespace != "" && meta.Namespace != namespace {
+ return false
+ }
+ if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) {
+ return false
+ }
+ return true
+ }
+
+ // Process existing items as a set so the caller can clean up
+ // after a restart or whatever
+ existingItems := inf.inf.GetStore().List()
+ if processExisting != nil {
+ items := make([]interface{}, 0)
+ for _, obj := range existingItems {
+ if filterFunc(obj) {
+ items = append(items, obj)
+ }
+ }
+ processExisting(items)
+ }
+
+ handlerID := atomic.AddUint64(&wf.handlerCounter, 1)
+
+ inf.Lock()
+ defer inf.Unlock()
+
+ inf.handlers[handlerID] = cache.FilteringResourceEventHandler{
+ FilterFunc: filterFunc,
+ Handler: funcs,
+ }
+ logrus.Debugf("added %v event handler %d", objType, handlerID)
+
+ // Send existing items to the handler's add function; informers usually
+ // do this but since we share informers, it's long-since happened so
+ // we must emulate that here
+ for _, obj := range existingItems {
+ inf.handlers[handlerID].OnAdd(obj)
+ }
+
+ return handlerID, nil
+}
+
+func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error {
+ inf, ok := wf.informers[objType]
+ if !ok {
+ return fmt.Errorf("tried to remove unknown object type %v event handler", objType)
+ }
+
+ inf.Lock()
+ defer inf.Unlock()
+ if _, ok := inf.handlers[handlerID]; !ok {
+ return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID)
+ }
+ delete(inf.handlers, handlerID)
+ logrus.Debugf("removed %v event handler %d", objType, handlerID)
+ return nil
+}
+
+// AddPodHandler adds a handler function that will be executed on Pod object changes
+func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(podType, "", nil, handlerFuncs, processExisting)
+}
+
+// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change
+func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting)
+}
+
+// RemovePodHandler removes a Pod object event handler function
+func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error {
+ return wf.removeHandler(podType, handlerID)
+}
+
+// AddServiceHandler adds a handler function that will be executed on Service object changes
+func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemoveServiceHandler removes a Service object event handler function
+func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error {
+ return wf.removeHandler(serviceType, handlerID)
+}
+
+// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes
+func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemoveEndpointsHandler removes a Endpoints object event handler function
+func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error {
+ return wf.removeHandler(endpointsType, handlerID)
+}
+
+// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes
+func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemovePolicyHandler removes a NetworkPolicy object event handler function
+func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error {
+ return wf.removeHandler(policyType, handlerID)
+}
+
+// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
+func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting)
+}
+
+// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change
+func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting)
+}
+
+// RemoveNamespaceHandler removes a Namespace object event handler function
+func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error {
+ return wf.removeHandler(namespaceType, handlerID)
+}
+
+// AddNodeHandler adds a handler function that will be executed on Node object changes
+func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
+ return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting)
+}
+
+// RemoveNodeHandler removes a Node object event handler function
+func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error {
+ return wf.removeHandler(nodeType, handlerID)
+}
diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go
new file mode 100644
index 0000000..38fb77e
--- /dev/null
+++ b/internal/pkg/factory/factory_test.go
@@ -0,0 +1,686 @@
+package factory
+
+import (
+ "reflect"
+ "testing"
+
+ "k8s.io/api/core/v1"
+ knet "k8s.io/api/networking/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes/fake"
+ core "k8s.io/client-go/testing"
+ "k8s.io/client-go/tools/cache"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestFactory(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Watch Factory Suite")
+}
+
+func newObjectMeta(name, namespace string) metav1.ObjectMeta {
+ return metav1.ObjectMeta{
+ Name: name,
+ UID: types.UID(name),
+ Namespace: namespace,
+ Labels: map[string]string{
+ "name": name,
+ },
+ }
+}
+
+func newPod(name, namespace string) *v1.Pod {
+ return &v1.Pod{
+ Status: v1.PodStatus{
+ Phase: v1.PodRunning,
+ },
+ ObjectMeta: newObjectMeta(name, namespace),
+ Spec: v1.PodSpec{
+ Containers: []v1.Container{
+ {
+ Name: "containerName",
+ Image: "containerImage",
+ },
+ },
+ NodeName: "mynode",
+ },
+ }
+}
+
+func newNamespace(name string) *v1.Namespace {
+ return &v1.Namespace{
+ Status: v1.NamespaceStatus{
+ Phase: v1.NamespaceActive,
+ },
+ ObjectMeta: newObjectMeta(name, name),
+ }
+}
+
+func newNode(name string) *v1.Node {
+ return &v1.Node{
+ Status: v1.NodeStatus{
+ Phase: v1.NodeRunning,
+ },
+ ObjectMeta: newObjectMeta(name, ""),
+ }
+}
+
+func newPolicy(name, namespace string) *knet.NetworkPolicy {
+ return &knet.NetworkPolicy{
+ ObjectMeta: newObjectMeta(name, namespace),
+ }
+}
+
+func newEndpoints(name, namespace string) *v1.Endpoints {
+ return &v1.Endpoints{
+ ObjectMeta: newObjectMeta(name, namespace),
+ }
+}
+
+func newService(name, namespace string) *v1.Service {
+ return &v1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ UID: types.UID(name),
+ Namespace: namespace,
+ Labels: map[string]string{
+ "name": name,
+ },
+ },
+ }
+}
+
+func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher {
+ w := watch.NewFake()
+ c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil))
+ c.AddReactor("list", objType, listFn)
+ return w
+}
+
+var _ = Describe("Watch Factory Operations", func() {
+ var (
+ fakeClient *fake.Clientset
+ podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher
+ policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher
+ pods []*v1.Pod
+ namespaces []*v1.Namespace
+ nodes []*v1.Node
+ policies []*knet.NetworkPolicy
+ endpoints []*v1.Endpoints
+ services []*v1.Service
+ stop chan struct{}
+ numAdded, numUpdated, numDeleted int
+ )
+
+ BeforeEach(func() {
+ fakeClient = &fake.Clientset{}
+ stop = make(chan struct{})
+
+ pods = make([]*v1.Pod, 0)
+ podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.PodList{}
+ for _, p := range pods {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ namespaces = make([]*v1.Namespace, 0)
+ namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.NamespaceList{}
+ for _, p := range namespaces {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ nodes = make([]*v1.Node, 0)
+ nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.NodeList{}
+ for _, p := range nodes {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ policies = make([]*knet.NetworkPolicy, 0)
+ policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) {
+ obj := &knet.NetworkPolicyList{}
+ for _, p := range policies {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ endpoints = make([]*v1.Endpoints, 0)
+ endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.EndpointsList{}
+ for _, p := range endpoints {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ services = make([]*v1.Service, 0)
+ serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) {
+ obj := &v1.ServiceList{}
+ for _, p := range services {
+ obj.Items = append(obj.Items, *p)
+ }
+ return true, obj, nil
+ })
+
+ numAdded = 0
+ numUpdated = 0
+ numDeleted = 0
+ })
+
+ Context("when a processExisting is given", func() {
+ testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+ id, err := wf.addHandler(objType, namespace, lsel,
+ cache.ResourceEventHandlerFuncs{},
+ func(objs []interface{}) {
+ Expect(len(objs)).To(Equal(1))
+ })
+ Expect(err).NotTo(HaveOccurred())
+ Expect(id).To(BeNumerically(">", uint64(0)))
+ wf.removeHandler(objType, id)
+ close(stop)
+ }
+
+ It("is called for each existing pod", func() {
+ pods = append(pods, newPod("pod1", "default"))
+ testExisting(podType, "", nil)
+ })
+
+ It("is called for each existing namespace", func() {
+ namespaces = append(namespaces, newNamespace("default"))
+ testExisting(namespaceType, "", nil)
+ })
+
+ It("is called for each existing node", func() {
+ nodes = append(nodes, newNode("default"))
+ testExisting(nodeType, "", nil)
+ })
+
+ It("is called for each existing policy", func() {
+ policies = append(policies, newPolicy("denyall", "default"))
+ testExisting(policyType, "", nil)
+ })
+
+ It("is called for each existing endpoints", func() {
+ endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
+ testExisting(endpointsType, "", nil)
+ })
+
+ It("is called for each existing service", func() {
+ services = append(services, newService("myservice", "default"))
+ testExisting(serviceType, "", nil)
+ })
+
+ It("is called for each existing pod that matches a given namespace and label", func() {
+ pod := newPod("pod1", "default")
+ pod.ObjectMeta.Labels["blah"] = "foobar"
+ pods = append(pods, pod)
+ testExisting(podType, "default", &metav1.LabelSelector{
+ MatchLabels: map[string]string{"blah": "foobar"},
+ })
+ })
+ })
+
+ Context("when existing items are known to the informer", func() {
+ testExisting := func(objType reflect.Type) {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+ id, err := wf.addHandler(objType, "", nil,
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ numAdded++
+ },
+ UpdateFunc: func(old, new interface{}) {},
+ DeleteFunc: func(obj interface{}) {},
+ }, nil)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(numAdded).To(Equal(2))
+ wf.removeHandler(objType, id)
+ close(stop)
+ }
+
+ It("calls ADD for each existing pod", func() {
+ pods = append(pods, newPod("pod1", "default"))
+ pods = append(pods, newPod("pod2", "default"))
+ testExisting(podType)
+ })
+
+ It("calls ADD for each existing namespace", func() {
+ namespaces = append(namespaces, newNamespace("default"))
+ namespaces = append(namespaces, newNamespace("default2"))
+ testExisting(namespaceType)
+ })
+
+ It("calls ADD for each existing node", func() {
+ nodes = append(nodes, newNode("default"))
+ nodes = append(nodes, newNode("default2"))
+ testExisting(nodeType)
+ })
+
+ It("calls ADD for each existing policy", func() {
+ policies = append(policies, newPolicy("denyall", "default"))
+ policies = append(policies, newPolicy("denyall2", "default"))
+ testExisting(policyType)
+ })
+
+ It("calls ADD for each existing endpoints", func() {
+ endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
+ endpoints = append(endpoints, newEndpoints("myendpoint2", "default"))
+ testExisting(endpointsType)
+ })
+
+ It("calls ADD for each existing service", func() {
+ services = append(services, newService("myservice", "default"))
+ services = append(services, newService("myservice2", "default"))
+ testExisting(serviceType)
+ })
+ })
+
+ addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 {
+ id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ defer GinkgoRecover()
+ numAdded++
+ funcs.AddFunc(obj)
+ },
+ UpdateFunc: func(old, new interface{}) {
+ defer GinkgoRecover()
+ numUpdated++
+ funcs.UpdateFunc(old, new)
+ },
+ DeleteFunc: func(obj interface{}) {
+ defer GinkgoRecover()
+ numDeleted++
+ funcs.DeleteFunc(obj)
+ },
+ }, nil)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(id).To(BeNumerically(">", uint64(0)))
+ return id
+ }
+
+ addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 {
+ return addFilteredHandler(wf, objType, "", nil, funcs)
+ }
+
+ It("responds to pod add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newPod("pod1", "default")
+ id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newPod := new.(*v1.Pod)
+ Expect(reflect.DeepEqual(newPod, added)).To(BeTrue())
+ Expect(newPod.Spec.NodeName).To(Equal("foobar"))
+ },
+ DeleteFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
+ },
+ })
+
+ pods = append(pods, added)
+ podWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Spec.NodeName = "foobar"
+ podWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ pods = pods[:0]
+ podWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.RemovePodHandler(id)
+ close(stop)
+ })
+
+ It("responds to namespace add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newNamespace("default")
+ id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ ns := obj.(*v1.Namespace)
+ Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNS := new.(*v1.Namespace)
+ Expect(reflect.DeepEqual(newNS, added)).To(BeTrue())
+ Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating))
+ },
+ DeleteFunc: func(obj interface{}) {
+ ns := obj.(*v1.Namespace)
+ Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
+ },
+ })
+
+ namespaces = append(namespaces, added)
+ namespaceWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Status.Phase = v1.NamespaceTerminating
+ namespaceWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ namespaces = namespaces[:0]
+ namespaceWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.RemoveNamespaceHandler(id)
+ close(stop)
+ })
+
+ It("responds to node add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newNode("mynode")
+ id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ node := obj.(*v1.Node)
+ Expect(reflect.DeepEqual(node, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNode := new.(*v1.Node)
+ Expect(reflect.DeepEqual(newNode, added)).To(BeTrue())
+ Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated))
+ },
+ DeleteFunc: func(obj interface{}) {
+ node := obj.(*v1.Node)
+ Expect(reflect.DeepEqual(node, added)).To(BeTrue())
+ },
+ })
+
+ nodes = append(nodes, added)
+ nodeWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Status.Phase = v1.NodeTerminated
+ nodeWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ nodes = nodes[:0]
+ nodeWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(nodeType, id)
+ close(stop)
+ })
+
+ It("responds to policy add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newPolicy("mypolicy", "default")
+ id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ np := obj.(*knet.NetworkPolicy)
+ Expect(reflect.DeepEqual(np, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNP := new.(*knet.NetworkPolicy)
+ Expect(reflect.DeepEqual(newNP, added)).To(BeTrue())
+ Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress}))
+ },
+ DeleteFunc: func(obj interface{}) {
+ np := obj.(*knet.NetworkPolicy)
+ Expect(reflect.DeepEqual(np, added)).To(BeTrue())
+ },
+ })
+
+ policies = append(policies, added)
+ policyWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress}
+ policyWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ policies = policies[:0]
+ policyWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(policyType, id)
+ close(stop)
+ })
+
+ It("responds to endpoints add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newEndpoints("myendpoints", "default")
+ id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ eps := obj.(*v1.Endpoints)
+ Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newEPs := new.(*v1.Endpoints)
+ Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue())
+ Expect(len(newEPs.Subsets)).To(Equal(1))
+ },
+ DeleteFunc: func(obj interface{}) {
+ eps := obj.(*v1.Endpoints)
+ Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
+ },
+ })
+
+ endpoints = append(endpoints, added)
+ endpointsWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Subsets = append(added.Subsets, v1.EndpointSubset{
+ Ports: []v1.EndpointPort{
+ {
+ Name: "foobar",
+ Port: 1234,
+ },
+ },
+ })
+ endpointsWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ endpoints = endpoints[:0]
+ endpointsWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(endpointsType, id)
+ close(stop)
+ })
+
+ It("responds to service add/update/delete events", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newService("myservice", "default")
+ id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ service := obj.(*v1.Service)
+ Expect(reflect.DeepEqual(service, added)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newService := new.(*v1.Service)
+ Expect(reflect.DeepEqual(newService, added)).To(BeTrue())
+ Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1"))
+ },
+ DeleteFunc: func(obj interface{}) {
+ service := obj.(*v1.Service)
+ Expect(reflect.DeepEqual(service, added)).To(BeTrue())
+ },
+ })
+
+ services = append(services, added)
+ serviceWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ added.Spec.ClusterIP = "1.1.1.1"
+ serviceWatch.Modify(added)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+ services = services[:0]
+ serviceWatch.Delete(added)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ wf.removeHandler(serviceType, id)
+ close(stop)
+ })
+
+ It("stops processing events after the handler is removed", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ added := newNamespace("default")
+ id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {},
+ UpdateFunc: func(old, new interface{}) {},
+ DeleteFunc: func(obj interface{}) {},
+ })
+
+ namespaces = append(namespaces, added)
+ namespaceWatch.Add(added)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+ wf.RemoveNamespaceHandler(id)
+
+ added2 := newNamespace("other")
+ namespaces = append(namespaces, added2)
+ namespaceWatch.Add(added2)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+
+ added2.Status.Phase = v1.NamespaceTerminating
+ namespaceWatch.Modify(added2)
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
+ namespaces = []*v1.Namespace{added}
+ namespaceWatch.Delete(added2)
+ Consistently(func() int { return numDeleted }, 2).Should(Equal(0))
+
+ close(stop)
+ })
+
+ It("filters correctly by label and namespace", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ passesFilter := newPod("pod1", "default")
+ passesFilter.ObjectMeta.Labels["blah"] = "foobar"
+ failsFilter := newPod("pod2", "default")
+ failsFilter.ObjectMeta.Labels["blah"] = "baz"
+ failsFilter2 := newPod("pod3", "otherns")
+ failsFilter2.ObjectMeta.Labels["blah"] = "foobar"
+
+ addFilteredHandler(wf,
+ podType,
+ "default",
+ &metav1.LabelSelector{
+ MatchLabels: map[string]string{"blah": "foobar"},
+ },
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newPod := new.(*v1.Pod)
+ Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue())
+ },
+ DeleteFunc: func(obj interface{}) {
+ pod := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
+ },
+ })
+
+ pods = append(pods, passesFilter)
+ podWatch.Add(passesFilter)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+
+ // numAdded should remain 1
+ pods = append(pods, failsFilter)
+ podWatch.Add(failsFilter)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+
+ // numAdded should remain 1
+ pods = append(pods, failsFilter2)
+ podWatch.Add(failsFilter2)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+
+ passesFilter.Status.Phase = v1.PodFailed
+ podWatch.Modify(passesFilter)
+ Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
+
+ // numAdded should remain 1
+ failsFilter.Status.Phase = v1.PodFailed
+ podWatch.Modify(failsFilter)
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
+
+ failsFilter2.Status.Phase = v1.PodFailed
+ podWatch.Modify(failsFilter2)
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
+
+ pods = []*v1.Pod{failsFilter, failsFilter2}
+ podWatch.Delete(passesFilter)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+
+ close(stop)
+ })
+
+ It("correctly handles object updates that cause filter changes", func() {
+ wf, err := NewWatchFactory(fakeClient, stop)
+ Expect(err).NotTo(HaveOccurred())
+
+ pod := newPod("pod1", "default")
+ pod.ObjectMeta.Labels["blah"] = "baz"
+
+ equalPod := pod
+ id := addFilteredHandler(wf,
+ podType,
+ "default",
+ &metav1.LabelSelector{
+ MatchLabels: map[string]string{"blah": "foobar"},
+ },
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ p := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
+ },
+ UpdateFunc: func(old, new interface{}) {},
+ DeleteFunc: func(obj interface{}) {
+ p := obj.(*v1.Pod)
+ Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
+ },
+ })
+
+ pods = append(pods, pod)
+
+ // Pod doesn't pass filter; shouldn't be added
+ podWatch.Add(pod)
+ Consistently(func() int { return numAdded }, 2).Should(Equal(0))
+
+ // Update pod to pass filter; should be treated as add. Need
+ // to deep-copy pod when modifying because it's a pointer all
+ // the way through when using FakeClient
+ podCopy := pod.DeepCopy()
+ podCopy.ObjectMeta.Labels["blah"] = "foobar"
+ pods = []*v1.Pod{podCopy}
+ equalPod = podCopy
+ podWatch.Modify(podCopy)
+ Eventually(func() int { return numAdded }, 2).Should(Equal(1))
+
+ // Update pod to fail filter; should be treated as delete
+ pod.ObjectMeta.Labels["blah"] = "baz"
+ podWatch.Modify(pod)
+ Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
+ Consistently(func() int { return numAdded }, 2).Should(Equal(1))
+ Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
+
+ wf.RemovePodHandler(id)
+ close(stop)
+ })
+})