aboutsummaryrefslogtreecommitdiffstats
path: root/internal/pkg/factory/factory.go
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2019-08-06 19:35:42 -0700
committerRitu Sood <ritu.sood@intel.com>2019-08-15 10:03:47 -0700
commit8295a28f6d6e14f5adb62138271de393015061e9 (patch)
treed11b1e799de55e89d08bc810180d99ce65e6f21e /internal/pkg/factory/factory.go
parentaa41b49246d84b605a76d169f0c861ba0691a4fb (diff)
Use controller runtime and operator sdk
Changing the framework to use controller runtime and operator sdk. This allows to add CRD controllers for Network, Provider Network etc in the same operator. Binary renamed to nfn-operator (Network funtion networking). Change-Id: Ic25a3c3f5f1418fc0614f3aede48b41d9c1156cd Signed-off-by: Ritu Sood <ritu.sood@intel.com>
Diffstat (limited to 'internal/pkg/factory/factory.go')
-rw-r--r--internal/pkg/factory/factory.go318
1 files changed, 0 insertions, 318 deletions
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go
deleted file mode 100644
index a635e3f..0000000
--- a/internal/pkg/factory/factory.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package factory
-
-import (
- "fmt"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/sirupsen/logrus"
-
- kapi "k8s.io/api/core/v1"
- knet "k8s.io/api/networking/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- informerfactory "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
-)
-
-type informer struct {
- sync.Mutex
- oType reflect.Type
- inf cache.SharedIndexInformer
- handlers map[uint64]cache.ResourceEventHandler
-}
-
-func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) {
- i.Lock()
- defer i.Unlock()
-
- objType := reflect.TypeOf(obj)
- if objType != i.oType {
- logrus.Errorf("object type %v did not match expected %v", objType, i.oType)
- return
- }
-
- for id, handler := range i.handlers {
- f(id, handler)
- }
-}
-
-// WatchFactory initializes and manages common kube watches
-type WatchFactory struct {
- iFactory informerfactory.SharedInformerFactory
- informers map[reflect.Type]*informer
- handlerCounter uint64
-}
-
-const (
- resyncInterval = 12 * time.Hour
-)
-
-func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer {
- return &informer{
- oType: oType,
- inf: inf,
- handlers: make(map[uint64]cache.ResourceEventHandler),
- }
-}
-
-var (
- podType reflect.Type = reflect.TypeOf(&kapi.Pod{})
- serviceType reflect.Type = reflect.TypeOf(&kapi.Service{})
- endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{})
- policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{})
- namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{})
- nodeType reflect.Type = reflect.TypeOf(&kapi.Node{})
-)
-
-// NewWatchFactory initializes a new watch factory
-func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) {
- // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
- // any race condition where a resync may be required e.g. cni executable on node watching for
- // events on pods and assuming that an 'ADD' event will contain the annotations put in by
- // ovnkube master (currently, it is just a 'get' loop)
- // the downside of making it tight (like 10 minutes) is needless spinning on all resources
- wf := &WatchFactory{
- iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
- informers: make(map[reflect.Type]*informer),
- }
-
- // Create shared informers we know we'll use
- wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer())
- wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer())
- wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer())
- wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer())
- wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer())
- wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer())
-
- wf.iFactory.Start(stopChan)
- res := wf.iFactory.WaitForCacheSync(stopChan)
- for oType, synced := range res {
- if !synced {
- return nil, fmt.Errorf("error in syncing cache for %v informer", oType)
- }
- informer := wf.informers[oType]
- informer.inf.AddEventHandler(wf.newFederatedHandler(informer))
- }
-
- return wf, nil
-}
-
-func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs {
- return cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v ADD event for handler %d", inf.oType, id)
- handler.OnAdd(obj)
- })
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id)
- handler.OnUpdate(oldObj, newObj)
- })
- },
- DeleteFunc: func(obj interface{}) {
- if inf.oType != reflect.TypeOf(obj) {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- logrus.Errorf("couldn't get object from tombstone: %+v", obj)
- return
- }
- obj = tombstone.Obj
- objType := reflect.TypeOf(obj)
- if inf.oType != objType {
- logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType)
- return
- }
- }
- inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id)
- handler.OnDelete(obj)
- })
- },
- }
-}
-
-func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
- switch objType {
- case podType:
- if pod, ok := obj.(*kapi.Pod); ok {
- return &pod.ObjectMeta, nil
- }
- case serviceType:
- if service, ok := obj.(*kapi.Service); ok {
- return &service.ObjectMeta, nil
- }
- case endpointsType:
- if endpoints, ok := obj.(*kapi.Endpoints); ok {
- return &endpoints.ObjectMeta, nil
- }
- case policyType:
- if policy, ok := obj.(*knet.NetworkPolicy); ok {
- return &policy.ObjectMeta, nil
- }
- case namespaceType:
- if namespace, ok := obj.(*kapi.Namespace); ok {
- return &namespace.ObjectMeta, nil
- }
- case nodeType:
- if node, ok := obj.(*kapi.Node); ok {
- return &node.ObjectMeta, nil
- }
- }
- return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType)
-}
-
-func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- inf, ok := wf.informers[objType]
- if !ok {
- return 0, fmt.Errorf("unknown object type %v", objType)
- }
-
- sel, err := metav1.LabelSelectorAsSelector(lsel)
- if err != nil {
- return 0, fmt.Errorf("error creating label selector: %v", err)
- }
-
- filterFunc := func(obj interface{}) bool {
- if namespace == "" && lsel == nil {
- // Unfiltered handler
- return true
- }
- meta, err := getObjectMeta(objType, obj)
- if err != nil {
- logrus.Errorf("watch handler filter error: %v", err)
- return false
- }
- if namespace != "" && meta.Namespace != namespace {
- return false
- }
- if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) {
- return false
- }
- return true
- }
-
- // Process existing items as a set so the caller can clean up
- // after a restart or whatever
- existingItems := inf.inf.GetStore().List()
- if processExisting != nil {
- items := make([]interface{}, 0)
- for _, obj := range existingItems {
- if filterFunc(obj) {
- items = append(items, obj)
- }
- }
- processExisting(items)
- }
-
- handlerID := atomic.AddUint64(&wf.handlerCounter, 1)
-
- inf.Lock()
- defer inf.Unlock()
-
- inf.handlers[handlerID] = cache.FilteringResourceEventHandler{
- FilterFunc: filterFunc,
- Handler: funcs,
- }
- logrus.Debugf("added %v event handler %d", objType, handlerID)
-
- // Send existing items to the handler's add function; informers usually
- // do this but since we share informers, it's long-since happened so
- // we must emulate that here
- for _, obj := range existingItems {
- inf.handlers[handlerID].OnAdd(obj)
- }
-
- return handlerID, nil
-}
-
-func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error {
- inf, ok := wf.informers[objType]
- if !ok {
- return fmt.Errorf("tried to remove unknown object type %v event handler", objType)
- }
-
- inf.Lock()
- defer inf.Unlock()
- if _, ok := inf.handlers[handlerID]; !ok {
- return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID)
- }
- delete(inf.handlers, handlerID)
- logrus.Debugf("removed %v event handler %d", objType, handlerID)
- return nil
-}
-
-// AddPodHandler adds a handler function that will be executed on Pod object changes
-func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(podType, "", nil, handlerFuncs, processExisting)
-}
-
-// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change
-func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting)
-}
-
-// RemovePodHandler removes a Pod object event handler function
-func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error {
- return wf.removeHandler(podType, handlerID)
-}
-
-// AddServiceHandler adds a handler function that will be executed on Service object changes
-func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveServiceHandler removes a Service object event handler function
-func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error {
- return wf.removeHandler(serviceType, handlerID)
-}
-
-// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes
-func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveEndpointsHandler removes a Endpoints object event handler function
-func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error {
- return wf.removeHandler(endpointsType, handlerID)
-}
-
-// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes
-func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemovePolicyHandler removes a NetworkPolicy object event handler function
-func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error {
- return wf.removeHandler(policyType, handlerID)
-}
-
-// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
-func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting)
-}
-
-// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change
-func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting)
-}
-
-// RemoveNamespaceHandler removes a Namespace object event handler function
-func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error {
- return wf.removeHandler(namespaceType, handlerID)
-}
-
-// AddNodeHandler adds a handler function that will be executed on Node object changes
-func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveNodeHandler removes a Node object event handler function
-func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error {
- return wf.removeHandler(nodeType, handlerID)
-}