diff options
Diffstat (limited to 'internal/pkg/factory/factory.go')
-rw-r--r-- | internal/pkg/factory/factory.go | 318 |
1 files changed, 0 insertions, 318 deletions
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go deleted file mode 100644 index a635e3f..0000000 --- a/internal/pkg/factory/factory.go +++ /dev/null @@ -1,318 +0,0 @@ -package factory - -import ( - "fmt" - "reflect" - "sync" - "sync/atomic" - "time" - - "github.com/sirupsen/logrus" - - kapi "k8s.io/api/core/v1" - knet "k8s.io/api/networking/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - informerfactory "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -type informer struct { - sync.Mutex - oType reflect.Type - inf cache.SharedIndexInformer - handlers map[uint64]cache.ResourceEventHandler -} - -func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) { - i.Lock() - defer i.Unlock() - - objType := reflect.TypeOf(obj) - if objType != i.oType { - logrus.Errorf("object type %v did not match expected %v", objType, i.oType) - return - } - - for id, handler := range i.handlers { - f(id, handler) - } -} - -// WatchFactory initializes and manages common kube watches -type WatchFactory struct { - iFactory informerfactory.SharedInformerFactory - informers map[reflect.Type]*informer - handlerCounter uint64 -} - -const ( - resyncInterval = 12 * time.Hour -) - -func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer { - return &informer{ - oType: oType, - inf: inf, - handlers: make(map[uint64]cache.ResourceEventHandler), - } -} - -var ( - podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) - serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) - endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) - policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) - namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) - nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) -) - -// NewWatchFactory initializes a new watch factory -func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) { - // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have - // any race condition where a resync may be required e.g. cni executable on node watching for - // events on pods and assuming that an 'ADD' event will contain the annotations put in by - // ovnkube master (currently, it is just a 'get' loop) - // the downside of making it tight (like 10 minutes) is needless spinning on all resources - wf := &WatchFactory{ - iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval), - informers: make(map[reflect.Type]*informer), - } - - // Create shared informers we know we'll use - wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer()) - wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer()) - wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer()) - wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer()) - wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer()) - wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer()) - - wf.iFactory.Start(stopChan) - res := wf.iFactory.WaitForCacheSync(stopChan) - for oType, synced := range res { - if !synced { - return nil, fmt.Errorf("error in syncing cache for %v informer", oType) - } - informer := wf.informers[oType] - informer.inf.AddEventHandler(wf.newFederatedHandler(informer)) - } - - return wf, nil -} - -func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs { - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { - logrus.Debugf("running %v ADD event for handler %d", inf.oType, id) - handler.OnAdd(obj) - }) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) { - logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id) - handler.OnUpdate(oldObj, newObj) - }) - }, - DeleteFunc: func(obj interface{}) { - if inf.oType != reflect.TypeOf(obj) { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - logrus.Errorf("couldn't get object from tombstone: %+v", obj) - return - } - obj = tombstone.Obj - objType := reflect.TypeOf(obj) - if inf.oType != objType { - logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType) - return - } - } - inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { - logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id) - handler.OnDelete(obj) - }) - }, - } -} - -func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) { - switch objType { - case podType: - if pod, ok := obj.(*kapi.Pod); ok { - return &pod.ObjectMeta, nil - } - case serviceType: - if service, ok := obj.(*kapi.Service); ok { - return &service.ObjectMeta, nil - } - case endpointsType: - if endpoints, ok := obj.(*kapi.Endpoints); ok { - return &endpoints.ObjectMeta, nil - } - case policyType: - if policy, ok := obj.(*knet.NetworkPolicy); ok { - return &policy.ObjectMeta, nil - } - case namespaceType: - if namespace, ok := obj.(*kapi.Namespace); ok { - return &namespace.ObjectMeta, nil - } - case nodeType: - if node, ok := obj.(*kapi.Node); ok { - return &node.ObjectMeta, nil - } - } - return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType) -} - -func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - inf, ok := wf.informers[objType] - if !ok { - return 0, fmt.Errorf("unknown object type %v", objType) - } - - sel, err := metav1.LabelSelectorAsSelector(lsel) - if err != nil { - return 0, fmt.Errorf("error creating label selector: %v", err) - } - - filterFunc := func(obj interface{}) bool { - if namespace == "" && lsel == nil { - // Unfiltered handler - return true - } - meta, err := getObjectMeta(objType, obj) - if err != nil { - logrus.Errorf("watch handler filter error: %v", err) - return false - } - if namespace != "" && meta.Namespace != namespace { - return false - } - if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) { - return false - } - return true - } - - // Process existing items as a set so the caller can clean up - // after a restart or whatever - existingItems := inf.inf.GetStore().List() - if processExisting != nil { - items := make([]interface{}, 0) - for _, obj := range existingItems { - if filterFunc(obj) { - items = append(items, obj) - } - } - processExisting(items) - } - - handlerID := atomic.AddUint64(&wf.handlerCounter, 1) - - inf.Lock() - defer inf.Unlock() - - inf.handlers[handlerID] = cache.FilteringResourceEventHandler{ - FilterFunc: filterFunc, - Handler: funcs, - } - logrus.Debugf("added %v event handler %d", objType, handlerID) - - // Send existing items to the handler's add function; informers usually - // do this but since we share informers, it's long-since happened so - // we must emulate that here - for _, obj := range existingItems { - inf.handlers[handlerID].OnAdd(obj) - } - - return handlerID, nil -} - -func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error { - inf, ok := wf.informers[objType] - if !ok { - return fmt.Errorf("tried to remove unknown object type %v event handler", objType) - } - - inf.Lock() - defer inf.Unlock() - if _, ok := inf.handlers[handlerID]; !ok { - return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID) - } - delete(inf.handlers, handlerID) - logrus.Debugf("removed %v event handler %d", objType, handlerID) - return nil -} - -// AddPodHandler adds a handler function that will be executed on Pod object changes -func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(podType, "", nil, handlerFuncs, processExisting) -} - -// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change -func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting) -} - -// RemovePodHandler removes a Pod object event handler function -func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error { - return wf.removeHandler(podType, handlerID) -} - -// AddServiceHandler adds a handler function that will be executed on Service object changes -func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting) -} - -// RemoveServiceHandler removes a Service object event handler function -func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error { - return wf.removeHandler(serviceType, handlerID) -} - -// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes -func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting) -} - -// RemoveEndpointsHandler removes a Endpoints object event handler function -func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error { - return wf.removeHandler(endpointsType, handlerID) -} - -// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes -func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting) -} - -// RemovePolicyHandler removes a NetworkPolicy object event handler function -func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error { - return wf.removeHandler(policyType, handlerID) -} - -// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes -func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting) -} - -// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change -func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting) -} - -// RemoveNamespaceHandler removes a Namespace object event handler function -func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error { - return wf.removeHandler(namespaceType, handlerID) -} - -// AddNodeHandler adds a handler function that will be executed on Node object changes -func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting) -} - -// RemoveNodeHandler removes a Node object event handler function -func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error { - return wf.removeHandler(nodeType, handlerID) -} |