diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/pkg/factory/factory.go | 318 | ||||
-rw-r--r-- | internal/pkg/factory/factory_test.go | 686 | ||||
-rw-r--r-- | internal/pkg/kube/kube.go | 28 | ||||
-rw-r--r-- | internal/pkg/ovn/common.go | 65 | ||||
-rw-r--r-- | internal/pkg/ovn/ovn.go | 368 | ||||
-rw-r--r-- | internal/pkg/ovn/ovn_test.go | 44 | ||||
-rw-r--r-- | internal/pkg/ovn/utils.go | 78 |
7 files changed, 260 insertions, 1327 deletions
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go deleted file mode 100644 index a635e3f..0000000 --- a/internal/pkg/factory/factory.go +++ /dev/null @@ -1,318 +0,0 @@ -package factory - -import ( - "fmt" - "reflect" - "sync" - "sync/atomic" - "time" - - "github.com/sirupsen/logrus" - - kapi "k8s.io/api/core/v1" - knet "k8s.io/api/networking/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - informerfactory "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -type informer struct { - sync.Mutex - oType reflect.Type - inf cache.SharedIndexInformer - handlers map[uint64]cache.ResourceEventHandler -} - -func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) { - i.Lock() - defer i.Unlock() - - objType := reflect.TypeOf(obj) - if objType != i.oType { - logrus.Errorf("object type %v did not match expected %v", objType, i.oType) - return - } - - for id, handler := range i.handlers { - f(id, handler) - } -} - -// WatchFactory initializes and manages common kube watches -type WatchFactory struct { - iFactory informerfactory.SharedInformerFactory - informers map[reflect.Type]*informer - handlerCounter uint64 -} - -const ( - resyncInterval = 12 * time.Hour -) - -func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer { - return &informer{ - oType: oType, - inf: inf, - handlers: make(map[uint64]cache.ResourceEventHandler), - } -} - -var ( - podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) - serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) - endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) - policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) - namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) - nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) -) - -// NewWatchFactory initializes a new watch factory -func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) { - // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have - // any race condition where a resync may be required e.g. cni executable on node watching for - // events on pods and assuming that an 'ADD' event will contain the annotations put in by - // ovnkube master (currently, it is just a 'get' loop) - // the downside of making it tight (like 10 minutes) is needless spinning on all resources - wf := &WatchFactory{ - iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval), - informers: make(map[reflect.Type]*informer), - } - - // Create shared informers we know we'll use - wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer()) - wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer()) - wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer()) - wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer()) - wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer()) - wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer()) - - wf.iFactory.Start(stopChan) - res := wf.iFactory.WaitForCacheSync(stopChan) - for oType, synced := range res { - if !synced { - return nil, fmt.Errorf("error in syncing cache for %v informer", oType) - } - informer := wf.informers[oType] - informer.inf.AddEventHandler(wf.newFederatedHandler(informer)) - } - - return wf, nil -} - -func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs { - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { - logrus.Debugf("running %v ADD event for handler %d", inf.oType, id) - handler.OnAdd(obj) - }) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) { - logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id) - handler.OnUpdate(oldObj, newObj) - }) - }, - DeleteFunc: func(obj interface{}) { - if inf.oType != reflect.TypeOf(obj) { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - logrus.Errorf("couldn't get object from tombstone: %+v", obj) - return - } - obj = tombstone.Obj - objType := reflect.TypeOf(obj) - if inf.oType != objType { - logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType) - return - } - } - inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) { - logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id) - handler.OnDelete(obj) - }) - }, - } -} - -func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) { - switch objType { - case podType: - if pod, ok := obj.(*kapi.Pod); ok { - return &pod.ObjectMeta, nil - } - case serviceType: - if service, ok := obj.(*kapi.Service); ok { - return &service.ObjectMeta, nil - } - case endpointsType: - if endpoints, ok := obj.(*kapi.Endpoints); ok { - return &endpoints.ObjectMeta, nil - } - case policyType: - if policy, ok := obj.(*knet.NetworkPolicy); ok { - return &policy.ObjectMeta, nil - } - case namespaceType: - if namespace, ok := obj.(*kapi.Namespace); ok { - return &namespace.ObjectMeta, nil - } - case nodeType: - if node, ok := obj.(*kapi.Node); ok { - return &node.ObjectMeta, nil - } - } - return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType) -} - -func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - inf, ok := wf.informers[objType] - if !ok { - return 0, fmt.Errorf("unknown object type %v", objType) - } - - sel, err := metav1.LabelSelectorAsSelector(lsel) - if err != nil { - return 0, fmt.Errorf("error creating label selector: %v", err) - } - - filterFunc := func(obj interface{}) bool { - if namespace == "" && lsel == nil { - // Unfiltered handler - return true - } - meta, err := getObjectMeta(objType, obj) - if err != nil { - logrus.Errorf("watch handler filter error: %v", err) - return false - } - if namespace != "" && meta.Namespace != namespace { - return false - } - if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) { - return false - } - return true - } - - // Process existing items as a set so the caller can clean up - // after a restart or whatever - existingItems := inf.inf.GetStore().List() - if processExisting != nil { - items := make([]interface{}, 0) - for _, obj := range existingItems { - if filterFunc(obj) { - items = append(items, obj) - } - } - processExisting(items) - } - - handlerID := atomic.AddUint64(&wf.handlerCounter, 1) - - inf.Lock() - defer inf.Unlock() - - inf.handlers[handlerID] = cache.FilteringResourceEventHandler{ - FilterFunc: filterFunc, - Handler: funcs, - } - logrus.Debugf("added %v event handler %d", objType, handlerID) - - // Send existing items to the handler's add function; informers usually - // do this but since we share informers, it's long-since happened so - // we must emulate that here - for _, obj := range existingItems { - inf.handlers[handlerID].OnAdd(obj) - } - - return handlerID, nil -} - -func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error { - inf, ok := wf.informers[objType] - if !ok { - return fmt.Errorf("tried to remove unknown object type %v event handler", objType) - } - - inf.Lock() - defer inf.Unlock() - if _, ok := inf.handlers[handlerID]; !ok { - return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID) - } - delete(inf.handlers, handlerID) - logrus.Debugf("removed %v event handler %d", objType, handlerID) - return nil -} - -// AddPodHandler adds a handler function that will be executed on Pod object changes -func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(podType, "", nil, handlerFuncs, processExisting) -} - -// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change -func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting) -} - -// RemovePodHandler removes a Pod object event handler function -func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error { - return wf.removeHandler(podType, handlerID) -} - -// AddServiceHandler adds a handler function that will be executed on Service object changes -func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting) -} - -// RemoveServiceHandler removes a Service object event handler function -func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error { - return wf.removeHandler(serviceType, handlerID) -} - -// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes -func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting) -} - -// RemoveEndpointsHandler removes a Endpoints object event handler function -func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error { - return wf.removeHandler(endpointsType, handlerID) -} - -// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes -func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting) -} - -// RemovePolicyHandler removes a NetworkPolicy object event handler function -func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error { - return wf.removeHandler(policyType, handlerID) -} - -// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes -func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting) -} - -// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change -func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting) -} - -// RemoveNamespaceHandler removes a Namespace object event handler function -func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error { - return wf.removeHandler(namespaceType, handlerID) -} - -// AddNodeHandler adds a handler function that will be executed on Node object changes -func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) { - return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting) -} - -// RemoveNodeHandler removes a Node object event handler function -func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error { - return wf.removeHandler(nodeType, handlerID) -} diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go deleted file mode 100644 index 38fb77e..0000000 --- a/internal/pkg/factory/factory_test.go +++ /dev/null @@ -1,686 +0,0 @@ -package factory - -import ( - "reflect" - "testing" - - "k8s.io/api/core/v1" - knet "k8s.io/api/networking/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/fake" - core "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -func TestFactory(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Watch Factory Suite") -} - -func newObjectMeta(name, namespace string) metav1.ObjectMeta { - return metav1.ObjectMeta{ - Name: name, - UID: types.UID(name), - Namespace: namespace, - Labels: map[string]string{ - "name": name, - }, - } -} - -func newPod(name, namespace string) *v1.Pod { - return &v1.Pod{ - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - ObjectMeta: newObjectMeta(name, namespace), - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "containerName", - Image: "containerImage", - }, - }, - NodeName: "mynode", - }, - } -} - -func newNamespace(name string) *v1.Namespace { - return &v1.Namespace{ - Status: v1.NamespaceStatus{ - Phase: v1.NamespaceActive, - }, - ObjectMeta: newObjectMeta(name, name), - } -} - -func newNode(name string) *v1.Node { - return &v1.Node{ - Status: v1.NodeStatus{ - Phase: v1.NodeRunning, - }, - ObjectMeta: newObjectMeta(name, ""), - } -} - -func newPolicy(name, namespace string) *knet.NetworkPolicy { - return &knet.NetworkPolicy{ - ObjectMeta: newObjectMeta(name, namespace), - } -} - -func newEndpoints(name, namespace string) *v1.Endpoints { - return &v1.Endpoints{ - ObjectMeta: newObjectMeta(name, namespace), - } -} - -func newService(name, namespace string) *v1.Service { - return &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(name), - Namespace: namespace, - Labels: map[string]string{ - "name": name, - }, - }, - } -} - -func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { - w := watch.NewFake() - c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) - c.AddReactor("list", objType, listFn) - return w -} - -var _ = Describe("Watch Factory Operations", func() { - var ( - fakeClient *fake.Clientset - podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher - policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher - pods []*v1.Pod - namespaces []*v1.Namespace - nodes []*v1.Node - policies []*knet.NetworkPolicy - endpoints []*v1.Endpoints - services []*v1.Service - stop chan struct{} - numAdded, numUpdated, numDeleted int - ) - - BeforeEach(func() { - fakeClient = &fake.Clientset{} - stop = make(chan struct{}) - - pods = make([]*v1.Pod, 0) - podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) { - obj := &v1.PodList{} - for _, p := range pods { - obj.Items = append(obj.Items, *p) - } - return true, obj, nil - }) - - namespaces = make([]*v1.Namespace, 0) - namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) { - obj := &v1.NamespaceList{} - for _, p := range namespaces { - obj.Items = append(obj.Items, *p) - } - return true, obj, nil - }) - - nodes = make([]*v1.Node, 0) - nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) { - obj := &v1.NodeList{} - for _, p := range nodes { - obj.Items = append(obj.Items, *p) - } - return true, obj, nil - }) - - policies = make([]*knet.NetworkPolicy, 0) - policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) { - obj := &knet.NetworkPolicyList{} - for _, p := range policies { - obj.Items = append(obj.Items, *p) - } - return true, obj, nil - }) - - endpoints = make([]*v1.Endpoints, 0) - endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) { - obj := &v1.EndpointsList{} - for _, p := range endpoints { - obj.Items = append(obj.Items, *p) - } - return true, obj, nil - }) - - services = make([]*v1.Service, 0) - serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) { - obj := &v1.ServiceList{} - for _, p := range services { - obj.Items = append(obj.Items, *p) - } - return true, obj, nil - }) - - numAdded = 0 - numUpdated = 0 - numDeleted = 0 - }) - - Context("when a processExisting is given", func() { - testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - id, err := wf.addHandler(objType, namespace, lsel, - cache.ResourceEventHandlerFuncs{}, - func(objs []interface{}) { - Expect(len(objs)).To(Equal(1)) - }) - Expect(err).NotTo(HaveOccurred()) - Expect(id).To(BeNumerically(">", uint64(0))) - wf.removeHandler(objType, id) - close(stop) - } - - It("is called for each existing pod", func() { - pods = append(pods, newPod("pod1", "default")) - testExisting(podType, "", nil) - }) - - It("is called for each existing namespace", func() { - namespaces = append(namespaces, newNamespace("default")) - testExisting(namespaceType, "", nil) - }) - - It("is called for each existing node", func() { - nodes = append(nodes, newNode("default")) - testExisting(nodeType, "", nil) - }) - - It("is called for each existing policy", func() { - policies = append(policies, newPolicy("denyall", "default")) - testExisting(policyType, "", nil) - }) - - It("is called for each existing endpoints", func() { - endpoints = append(endpoints, newEndpoints("myendpoint", "default")) - testExisting(endpointsType, "", nil) - }) - - It("is called for each existing service", func() { - services = append(services, newService("myservice", "default")) - testExisting(serviceType, "", nil) - }) - - It("is called for each existing pod that matches a given namespace and label", func() { - pod := newPod("pod1", "default") - pod.ObjectMeta.Labels["blah"] = "foobar" - pods = append(pods, pod) - testExisting(podType, "default", &metav1.LabelSelector{ - MatchLabels: map[string]string{"blah": "foobar"}, - }) - }) - }) - - Context("when existing items are known to the informer", func() { - testExisting := func(objType reflect.Type) { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - id, err := wf.addHandler(objType, "", nil, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - numAdded++ - }, - UpdateFunc: func(old, new interface{}) {}, - DeleteFunc: func(obj interface{}) {}, - }, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(numAdded).To(Equal(2)) - wf.removeHandler(objType, id) - close(stop) - } - - It("calls ADD for each existing pod", func() { - pods = append(pods, newPod("pod1", "default")) - pods = append(pods, newPod("pod2", "default")) - testExisting(podType) - }) - - It("calls ADD for each existing namespace", func() { - namespaces = append(namespaces, newNamespace("default")) - namespaces = append(namespaces, newNamespace("default2")) - testExisting(namespaceType) - }) - - It("calls ADD for each existing node", func() { - nodes = append(nodes, newNode("default")) - nodes = append(nodes, newNode("default2")) - testExisting(nodeType) - }) - - It("calls ADD for each existing policy", func() { - policies = append(policies, newPolicy("denyall", "default")) - policies = append(policies, newPolicy("denyall2", "default")) - testExisting(policyType) - }) - - It("calls ADD for each existing endpoints", func() { - endpoints = append(endpoints, newEndpoints("myendpoint", "default")) - endpoints = append(endpoints, newEndpoints("myendpoint2", "default")) - testExisting(endpointsType) - }) - - It("calls ADD for each existing service", func() { - services = append(services, newService("myservice", "default")) - services = append(services, newService("myservice2", "default")) - testExisting(serviceType) - }) - }) - - addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 { - id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - defer GinkgoRecover() - numAdded++ - funcs.AddFunc(obj) - }, - UpdateFunc: func(old, new interface{}) { - defer GinkgoRecover() - numUpdated++ - funcs.UpdateFunc(old, new) - }, - DeleteFunc: func(obj interface{}) { - defer GinkgoRecover() - numDeleted++ - funcs.DeleteFunc(obj) - }, - }, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(id).To(BeNumerically(">", uint64(0))) - return id - } - - addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 { - return addFilteredHandler(wf, objType, "", nil, funcs) - } - - It("responds to pod add/update/delete events", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newPod("pod1", "default") - id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newPod := new.(*v1.Pod) - Expect(reflect.DeepEqual(newPod, added)).To(BeTrue()) - Expect(newPod.Spec.NodeName).To(Equal("foobar")) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - Expect(reflect.DeepEqual(pod, added)).To(BeTrue()) - }, - }) - - pods = append(pods, added) - podWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - added.Spec.NodeName = "foobar" - podWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - pods = pods[:0] - podWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - wf.RemovePodHandler(id) - close(stop) - }) - - It("responds to namespace add/update/delete events", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newNamespace("default") - id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - ns := obj.(*v1.Namespace) - Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newNS := new.(*v1.Namespace) - Expect(reflect.DeepEqual(newNS, added)).To(BeTrue()) - Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating)) - }, - DeleteFunc: func(obj interface{}) { - ns := obj.(*v1.Namespace) - Expect(reflect.DeepEqual(ns, added)).To(BeTrue()) - }, - }) - - namespaces = append(namespaces, added) - namespaceWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - added.Status.Phase = v1.NamespaceTerminating - namespaceWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - namespaces = namespaces[:0] - namespaceWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - wf.RemoveNamespaceHandler(id) - close(stop) - }) - - It("responds to node add/update/delete events", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newNode("mynode") - id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - node := obj.(*v1.Node) - Expect(reflect.DeepEqual(node, added)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newNode := new.(*v1.Node) - Expect(reflect.DeepEqual(newNode, added)).To(BeTrue()) - Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated)) - }, - DeleteFunc: func(obj interface{}) { - node := obj.(*v1.Node) - Expect(reflect.DeepEqual(node, added)).To(BeTrue()) - }, - }) - - nodes = append(nodes, added) - nodeWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - added.Status.Phase = v1.NodeTerminated - nodeWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - nodes = nodes[:0] - nodeWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - wf.removeHandler(nodeType, id) - close(stop) - }) - - It("responds to policy add/update/delete events", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newPolicy("mypolicy", "default") - id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - np := obj.(*knet.NetworkPolicy) - Expect(reflect.DeepEqual(np, added)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newNP := new.(*knet.NetworkPolicy) - Expect(reflect.DeepEqual(newNP, added)).To(BeTrue()) - Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress})) - }, - DeleteFunc: func(obj interface{}) { - np := obj.(*knet.NetworkPolicy) - Expect(reflect.DeepEqual(np, added)).To(BeTrue()) - }, - }) - - policies = append(policies, added) - policyWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress} - policyWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - policies = policies[:0] - policyWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - wf.removeHandler(policyType, id) - close(stop) - }) - - It("responds to endpoints add/update/delete events", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newEndpoints("myendpoints", "default") - id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - eps := obj.(*v1.Endpoints) - Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newEPs := new.(*v1.Endpoints) - Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue()) - Expect(len(newEPs.Subsets)).To(Equal(1)) - }, - DeleteFunc: func(obj interface{}) { - eps := obj.(*v1.Endpoints) - Expect(reflect.DeepEqual(eps, added)).To(BeTrue()) - }, - }) - - endpoints = append(endpoints, added) - endpointsWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - added.Subsets = append(added.Subsets, v1.EndpointSubset{ - Ports: []v1.EndpointPort{ - { - Name: "foobar", - Port: 1234, - }, - }, - }) - endpointsWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - endpoints = endpoints[:0] - endpointsWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - wf.removeHandler(endpointsType, id) - close(stop) - }) - - It("responds to service add/update/delete events", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newService("myservice", "default") - id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - service := obj.(*v1.Service) - Expect(reflect.DeepEqual(service, added)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newService := new.(*v1.Service) - Expect(reflect.DeepEqual(newService, added)).To(BeTrue()) - Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1")) - }, - DeleteFunc: func(obj interface{}) { - service := obj.(*v1.Service) - Expect(reflect.DeepEqual(service, added)).To(BeTrue()) - }, - }) - - services = append(services, added) - serviceWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - added.Spec.ClusterIP = "1.1.1.1" - serviceWatch.Modify(added) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - services = services[:0] - serviceWatch.Delete(added) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - wf.removeHandler(serviceType, id) - close(stop) - }) - - It("stops processing events after the handler is removed", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - added := newNamespace("default") - id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) {}, - UpdateFunc: func(old, new interface{}) {}, - DeleteFunc: func(obj interface{}) {}, - }) - - namespaces = append(namespaces, added) - namespaceWatch.Add(added) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - wf.RemoveNamespaceHandler(id) - - added2 := newNamespace("other") - namespaces = append(namespaces, added2) - namespaceWatch.Add(added2) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) - - added2.Status.Phase = v1.NamespaceTerminating - namespaceWatch.Modify(added2) - Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) - namespaces = []*v1.Namespace{added} - namespaceWatch.Delete(added2) - Consistently(func() int { return numDeleted }, 2).Should(Equal(0)) - - close(stop) - }) - - It("filters correctly by label and namespace", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - passesFilter := newPod("pod1", "default") - passesFilter.ObjectMeta.Labels["blah"] = "foobar" - failsFilter := newPod("pod2", "default") - failsFilter.ObjectMeta.Labels["blah"] = "baz" - failsFilter2 := newPod("pod3", "otherns") - failsFilter2.ObjectMeta.Labels["blah"] = "foobar" - - addFilteredHandler(wf, - podType, - "default", - &metav1.LabelSelector{ - MatchLabels: map[string]string{"blah": "foobar"}, - }, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) { - newPod := new.(*v1.Pod) - Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue()) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue()) - }, - }) - - pods = append(pods, passesFilter) - podWatch.Add(passesFilter) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - - // numAdded should remain 1 - pods = append(pods, failsFilter) - podWatch.Add(failsFilter) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) - - // numAdded should remain 1 - pods = append(pods, failsFilter2) - podWatch.Add(failsFilter2) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) - - passesFilter.Status.Phase = v1.PodFailed - podWatch.Modify(passesFilter) - Eventually(func() int { return numUpdated }, 2).Should(Equal(1)) - - // numAdded should remain 1 - failsFilter.Status.Phase = v1.PodFailed - podWatch.Modify(failsFilter) - Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) - - failsFilter2.Status.Phase = v1.PodFailed - podWatch.Modify(failsFilter2) - Consistently(func() int { return numUpdated }, 2).Should(Equal(1)) - - pods = []*v1.Pod{failsFilter, failsFilter2} - podWatch.Delete(passesFilter) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - - close(stop) - }) - - It("correctly handles object updates that cause filter changes", func() { - wf, err := NewWatchFactory(fakeClient, stop) - Expect(err).NotTo(HaveOccurred()) - - pod := newPod("pod1", "default") - pod.ObjectMeta.Labels["blah"] = "baz" - - equalPod := pod - id := addFilteredHandler(wf, - podType, - "default", - &metav1.LabelSelector{ - MatchLabels: map[string]string{"blah": "foobar"}, - }, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - p := obj.(*v1.Pod) - Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) - }, - UpdateFunc: func(old, new interface{}) {}, - DeleteFunc: func(obj interface{}) { - p := obj.(*v1.Pod) - Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue()) - }, - }) - - pods = append(pods, pod) - - // Pod doesn't pass filter; shouldn't be added - podWatch.Add(pod) - Consistently(func() int { return numAdded }, 2).Should(Equal(0)) - - // Update pod to pass filter; should be treated as add. Need - // to deep-copy pod when modifying because it's a pointer all - // the way through when using FakeClient - podCopy := pod.DeepCopy() - podCopy.ObjectMeta.Labels["blah"] = "foobar" - pods = []*v1.Pod{podCopy} - equalPod = podCopy - podWatch.Modify(podCopy) - Eventually(func() int { return numAdded }, 2).Should(Equal(1)) - - // Update pod to fail filter; should be treated as delete - pod.ObjectMeta.Labels["blah"] = "baz" - podWatch.Modify(pod) - Eventually(func() int { return numDeleted }, 2).Should(Equal(1)) - Consistently(func() int { return numAdded }, 2).Should(Equal(1)) - Consistently(func() int { return numUpdated }, 2).Should(Equal(0)) - - wf.RemovePodHandler(id) - close(stop) - }) -}) diff --git a/internal/pkg/kube/kube.go b/internal/pkg/kube/kube.go index cc7c29b..e51963e 100644 --- a/internal/pkg/kube/kube.go +++ b/internal/pkg/kube/kube.go @@ -41,7 +41,7 @@ type Kube struct { func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error { logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name) patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) - _, err := k.KClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData)) + _, err := k.KClient.CoreV1().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData)) if err != nil { logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err) } @@ -52,7 +52,7 @@ func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error { func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error { logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name) patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) - _, err := k.KClient.Core().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData)) + _, err := k.KClient.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData)) if err != nil { logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err) } @@ -67,7 +67,7 @@ func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key, ns.Name) patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value) - _, err := k.KClient.Core().Namespaces().Patch(ns.Name, + _, err := k.KClient.CoreV1().Namespaces().Patch(ns.Name, types.MergePatchType, []byte(patchData)) if err != nil { logrus.Errorf("Error in setting annotation on namespace %s: %v", @@ -78,7 +78,7 @@ func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key, // GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) { - pod, err := k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + pod, err := k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -87,12 +87,12 @@ func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, e // GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) { - return k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + return k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) } // GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) { - return k.KClient.Core().Pods(namespace).List(metav1.ListOptions{}) + return k.KClient.CoreV1().Pods(namespace).List(metav1.ListOptions{}) } // GetPodsByLabels obtains the Pod resources from kubernetes apiserver, @@ -100,42 +100,42 @@ func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) { func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) { options := metav1.ListOptions{} options.LabelSelector = selector.String() - return k.KClient.Core().Pods(namespace).List(options) + return k.KClient.CoreV1().Pods(namespace).List(options) } // GetNodes returns the list of all Node objects from kubernetes func (k *Kube) GetNodes() (*kapi.NodeList, error) { - return k.KClient.Core().Nodes().List(metav1.ListOptions{}) + return k.KClient.CoreV1().Nodes().List(metav1.ListOptions{}) } // GetNode returns the Node resource from kubernetes apiserver, given its name func (k *Kube) GetNode(name string) (*kapi.Node, error) { - return k.KClient.Core().Nodes().Get(name, metav1.GetOptions{}) + return k.KClient.CoreV1().Nodes().Get(name, metav1.GetOptions{}) } // GetService returns the Service resource from kubernetes apiserver, given its name and namespace func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) { - return k.KClient.Core().Services(namespace).Get(name, metav1.GetOptions{}) + return k.KClient.CoreV1().Services(namespace).Get(name, metav1.GetOptions{}) } // GetEndpoints returns all the Endpoint resources from kubernetes // apiserver, given namespace func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) { - return k.KClient.Core().Endpoints(namespace).List(metav1.ListOptions{}) + return k.KClient.CoreV1().Endpoints(namespace).List(metav1.ListOptions{}) } // GetNamespace returns the Namespace resource from kubernetes apiserver, // given its name func (k *Kube) GetNamespace(name string) (*kapi.Namespace, error) { - return k.KClient.Core().Namespaces().Get(name, metav1.GetOptions{}) + return k.KClient.CoreV1().Namespaces().Get(name, metav1.GetOptions{}) } // GetNamespaces returns all Namespace resource from kubernetes apiserver func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) { - return k.KClient.Core().Namespaces().List(metav1.ListOptions{}) + return k.KClient.CoreV1().Namespaces().List(metav1.ListOptions{}) } // GetNetworkPolicies returns all network policy objects from kubernetes func (k *Kube) GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) { - return k.KClient.Networking().NetworkPolicies(namespace).List(metav1.ListOptions{}) + return k.KClient.NetworkingV1().NetworkPolicies(namespace).List(metav1.ListOptions{}) } diff --git a/internal/pkg/ovn/common.go b/internal/pkg/ovn/common.go index b504440..60cd202 100644 --- a/internal/pkg/ovn/common.go +++ b/internal/pkg/ovn/common.go @@ -3,69 +3,82 @@ package ovn import ( "encoding/json" "fmt" - "github.com/sirupsen/logrus" "math/big" "math/rand" "net" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "strings" "time" ) -func SetupDistributedRouter(name string) error { +var log = logf.Log.WithName("ovn") - // Make sure br-int is created. - stdout, stderr, err := RunOVSVsctlUnix("--", "--may-exist", "add-br", "br-int") - if err != nil { - logrus.Errorf("Failed to create br-int, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) - return err +func parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) { + var ovnNet []map[string]interface{} + + if ovnnetwork == "" { + return nil, fmt.Errorf("parseOvnNetworkObject:error") } + + if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil { + return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork) + } + + return ovnNet, nil +} + +func setupDistributedRouter(name string) error { + // Create a single common distributed router for the cluster. - stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes") + stdout, stderr, err := RunOVNNbctl("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes") if err != nil { - logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + log.Error(err, "Failed to create a single common distributed router for the cluster", "stdout", stdout, "stderr", stderr) return err } // Create a logical switch called "ovn4nfv-join" that will be used to connect gateway routers to the distributed router. // The "ovn4nfv-join" will be allocated IP addresses in the range 100.64.1.0/24. - stdout, stderr, err = RunOVNNbctlUnix("--may-exist", "ls-add", "ovn4nfv-join") + stdout, stderr, err = RunOVNNbctl("--may-exist", "ls-add", "ovn4nfv-join") if err != nil { - logrus.Errorf("Failed to create logical switch called \"ovn4nfv-join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + log.Error(err, "Failed to create logical switch called \"ovn4nfv-join\"", "stdout", stdout, "stderr", stderr) return err } // Connect the distributed router to "ovn4nfv-join". - routerMac, stderr, err := RunOVNNbctlUnix("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac") + routerMac, stderr, err := RunOVNNbctl("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac") if err != nil { - logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", name, stderr, err) + log.Error(err, "Failed to get logical router port rtoj-", "name", name, "stdout", stdout, "stderr", stderr) return err } if routerMac == "" { routerMac = generateMac() - stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes") + stdout, stderr, err = RunOVNNbctl("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes") if err != nil { - logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", name, stdout, stderr, err) + log.Error(err, "Failed to add logical router port rtoj", "name", name, "stdout", stdout, "stderr", stderr) return err } } // Connect the switch "ovn4nfv-join" to the router. - stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"") + stdout, stderr, err = RunOVNNbctl("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"") if err != nil { - logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err) + log.Error(err, "Failed to add logical switch port to logical router", "stdout", stdout, "stderr", stderr) return err } return nil } -func parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) { - var ovnNet []map[string]interface{} - - if ovnnetwork == "" { - return nil, fmt.Errorf("parseOvnNetworkObject:error") +// Find if switch exists +func findLogicalSwitch(name string) bool { + // get logical switch from OVN + output, stderr, err := RunOVNNbctl("--data=bare", "--no-heading", + "--columns=name", "find", "logical_switch", "name="+name) + if err != nil { + log.Error(err, "Error in obtaining list of logical switch", "stderr", stderr) + return false } - if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil { - return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork) + if strings.Compare(name, output) == 0 { + return true } - - return ovnNet, nil + return false } // generateMac generates mac address. diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go index 470416b..dad4641 100644 --- a/internal/pkg/ovn/ovn.go +++ b/internal/pkg/ovn/ovn.go @@ -2,93 +2,174 @@ package ovn import ( "fmt" + "github.com/mitchellh/mapstructure" + kapi "k8s.io/api/core/v1" + kexec "k8s.io/utils/exec" "strings" "time" - - "github.com/sirupsen/logrus" - kapi "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "ovn4nfv-k8s-plugin/internal/pkg/factory" - "ovn4nfv-k8s-plugin/internal/pkg/kube" ) -// Controller structure is the object which holds the controls for starting -// and reacting upon the watched resources (e.g. pods, endpoints) type Controller struct { - kube kube.Interface - watchFactory *factory.WatchFactory - gatewayCache map[string]string - // A cache of all logical switches seen by the watcher - logicalSwitchCache map[string]bool - // A cache of all logical ports seen by the watcher and - // its corresponding logical switch - logicalPortCache map[string]string } -// NewOvnController creates a new OVN controller for creating logical network -// infrastructure and policy -func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory) *Controller { - return &Controller{ - kube: &kube.Kube{KClient: kubeClient}, - watchFactory: wf, - logicalSwitchCache: make(map[string]bool), - logicalPortCache: make(map[string]string), - gatewayCache: make(map[string]string), +const ( + ovn4nfvRouterName = "ovn4nfv-master" + Ovn4nfvAnnotationTag = "k8s.plugin.opnfv.org/ovnInterfaces" +) + +type netInterface struct { + Name string + Interface string + NetType string + DefaultGateway string + IPAddress string + MacAddress string +} + +var ovnCtl *Controller + +// NewOvnController creates a new OVN controller for creating logical networks +func NewOvnController(exec kexec.Interface) (*Controller, error) { + + if exec == nil { + exec = kexec.New() + } + + if err := SetExec(exec); err != nil { + log.Error(err, "Failed to initialize exec helper") + return nil, err + } + + if err := SetupOvnUtils(); err != nil { + log.Error(err, "Failed to initialize OVN State") + return nil, err + } + ovnCtl = &Controller{ + gatewayCache: make(map[string]string), } + return ovnCtl, nil } -// Run starts the actual watching. Also initializes any local structures needed. -func (oc *Controller) Run() error { - fmt.Println("ovn4nfvk8s watching Pods") - for _, f := range []func() error{oc.WatchPods} { - if err := f(); err != nil { - return err - } +// GetOvnController returns OVN controller for creating logical networks +func GetOvnController() (*Controller, error) { + if ovnCtl != nil { + return ovnCtl, nil } - return nil + return nil, fmt.Errorf("OVN Controller not initialized") } -// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic -func (oc *Controller) WatchPods() error { - _, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*kapi.Pod) - if pod.Spec.NodeName != "" { - oc.addLogicalPort(pod) +// AddLogicalPorts adds ports to the Pod +func (oc *Controller) AddLogicalPorts(pod *kapi.Pod, ovnNetObjs []map[string]interface{}) (key, value string) { + + if ovnNetObjs == nil { + return + } + + if pod.Spec.HostNetwork { + return + } + + if _, ok := pod.Annotations[Ovn4nfvAnnotationTag]; ok { + log.Info("AddLogicalPorts : Pod annotation found") + return + } + + var ovnString, outStr string + ovnString = "[" + var ns netInterface + for _, net := range ovnNetObjs { + + err := mapstructure.Decode(net, &ns) + if err != nil { + log.Error(err, "mapstruct error", "network", net) + return + } + + if !findLogicalSwitch(ns.Name) { + log.Info("Logical Switch not found") + return + } + if ns.Interface == "" { + log.Info("Interface name must be provided") + return + } + if ns.DefaultGateway == "" { + ns.DefaultGateway = "false" + } + if ns.NetType == "" || ns.NetType != "provider" { + ns.NetType = "virtual" + } + if ns.NetType == "provider" { + if ns.IPAddress == "" { + log.Info("ipAddress must be provided for netType Provider") + return } - }, - UpdateFunc: func(old, newer interface{}) { - podNew := newer.(*kapi.Pod) - podOld := old.(*kapi.Pod) - if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" { - oc.addLogicalPort(podNew) + if ns.DefaultGateway == "true" { + log.Info("defaultGateway not supported for provider network - Use ovnNetworkRoutes to add routes") + return } - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*kapi.Pod) - oc.deleteLogicalPort(pod) - }, - }, oc.syncPods) - return err + + } + + outStr = oc.addLogicalPortWithSwitch(pod, ns.Name, ns.IPAddress, ns.MacAddress, ns.Interface, ns.NetType) + if outStr == "" { + return + } + last := len(outStr) - 1 + tmpString := outStr[:last] + tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + ns.DefaultGateway + "\\\"" + tmpString += "," + "\\\"interface\\\":" + "\\\"" + ns.Interface + "\\\"}" + ovnString += tmpString + ovnString += "," + } + last := len(ovnString) - 1 + ovnString = ovnString[:last] + ovnString += "]" + key = Ovn4nfvAnnotationTag + value = ovnString + return key, value } -func (oc *Controller) syncPods(pods []interface{}) { +func (oc *Controller) DeleteLogicalPorts(name, namespace string) { + + log.Info("Deleting pod", "name", name) + logicalPort := fmt.Sprintf("%s_%s", namespace, name) + + // get the list of logical ports from OVN + stdout, stderr, err := RunOVNNbctl("--data=bare", "--no-heading", + "--columns=name", "find", "logical_switch_port", "external_ids:pod=true") + if err != nil { + log.Error(err, "Error in obtaining list of logical ports ", "stdout", stdout, "stderr", stderr) + return + } + log.Info("Deleting", "Existing Ports", stdout) + existingLogicalPorts := strings.Fields(stdout) + for _, existingPort := range existingLogicalPorts { + if strings.Contains(existingPort, logicalPort) { + // found, delete this logical port + log.Info("Deleting", "existingPort", existingPort) + stdout, stderr, err := RunOVNNbctl("--if-exists", "lsp-del", + existingPort) + if err != nil { + log.Error(err, "Error in deleting pod's logical port ", "stdout", stdout, "stderr", stderr) + } + } + } + return } func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string, error) { var gatewayIPMaskStr, stderr string var ok bool var err error - logrus.Infof("getGatewayFromSwitch: %s", logicalSwitch) + log.Info("getGatewayFromSwitch", "logicalSwitch", logicalSwitch) if gatewayIPMaskStr, ok = oc.gatewayCache[logicalSwitch]; !ok { - gatewayIPMaskStr, stderr, err = RunOVNNbctlUnix("--if-exists", + gatewayIPMaskStr, stderr, err = RunOVNNbctl("--if-exists", "get", "logical_switch", logicalSwitch, "external_ids:gateway_ip") if err != nil { - logrus.Errorf("Failed to get gateway IP: %s, stderr: %q, %v", - gatewayIPMaskStr, stderr, err) + log.Error(err, "Failed to get gateway IP", "stderr", stderr, "gatewayIPMaskStr", gatewayIPMaskStr) return "", "", err } if gatewayIPMaskStr == "" { @@ -107,44 +188,6 @@ func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string return gatewayIP, mask, nil } -func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) { - - if pod.Spec.HostNetwork { - return - } - - logrus.Infof("Deleting pod: %s", pod.Name) - logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name) - - // get the list of logical ports from OVN - output, stderr, err := RunOVNNbctlUnix("--data=bare", "--no-heading", - "--columns=name", "find", "logical_switch_port", "external_ids:pod=true") - if err != nil { - logrus.Errorf("Error in obtaining list of logical ports, "+ - "stderr: %q, err: %v", - stderr, err) - return - } - logrus.Infof("Exising Ports : %s. ", output) - existingLogicalPorts := strings.Fields(output) - for _, existingPort := range existingLogicalPorts { - if strings.Contains(existingPort, logicalPort) { - // found, delete this logical port - logrus.Infof("Deleting: %s. ", existingPort) - out, stderr, err := RunOVNNbctlUnix("--if-exists", "lsp-del", - existingPort) - if err != nil { - logrus.Errorf("Error in deleting pod's logical port "+ - "stdout: %q, stderr: %q err: %v", - out, stderr, err) - } else { - delete(oc.logicalPortCache, existingPort) - } - } - } - return -} - func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, interfaceName, netType string) (annotation string) { var out, stderr string var err error @@ -153,9 +196,6 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA return } - if !oc.logicalSwitchCache[logicalSwitch] { - oc.logicalSwitchCache[logicalSwitch] = true - } var portName string if interfaceName != "" { portName = fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, interfaceName) @@ -163,7 +203,7 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA return } - logrus.Infof("Creating logical port for %s on switch %s", portName, logicalSwitch) + log.Info("Creating logical port for on switch", "portName", portName, "logicalSwitch", logicalSwitch) if ipAddress != "" && macAddress != "" { isStaticIP = true @@ -174,7 +214,7 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA } if isStaticIP { - out, stderr, err = RunOVNNbctlUnix("--may-exist", "lsp-add", + out, stderr, err = RunOVNNbctl("--may-exist", "lsp-add", logicalSwitch, portName, "--", "lsp-set-addresses", portName, fmt.Sprintf("%s %s", macAddress, ipAddress), "--", "--if-exists", "clear", "logical_switch_port", portName, "dynamic_addresses", "--", "set", @@ -183,13 +223,11 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA "external-ids:logical_switch="+logicalSwitch, "external-ids:pod=true") if err != nil { - logrus.Errorf("Failed to add logical port to switch "+ - "stdout: %q, stderr: %q (%v)", - out, stderr, err) + log.Error(err, "Failed to add logical port to switch", "out", out, "stderr", stderr) return } } else { - out, stderr, err = RunOVNNbctlUnix("--wait=sb", "--", + out, stderr, err = RunOVNNbctl("--wait=sb", "--", "--may-exist", "lsp-add", logicalSwitch, portName, "--", "lsp-set-addresses", portName, "dynamic", "--", "set", @@ -198,37 +236,32 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA "external-ids:logical_switch="+logicalSwitch, "external-ids:pod=true") if err != nil { - logrus.Errorf("Error while creating logical port %s "+ - "stdout: %q, stderr: %q (%v)", - portName, out, stderr, err) + log.Error(err, "Error while creating logical port %s ", "portName", portName, "stdout", out, "stderr", stderr) return } } - oc.logicalPortCache[portName] = logicalSwitch count := 30 for count > 0 { if isStaticIP { - out, stderr, err = RunOVNNbctlUnix("get", + out, stderr, err = RunOVNNbctl("get", "logical_switch_port", portName, "addresses") } else { - out, stderr, err = RunOVNNbctlUnix("get", + out, stderr, err = RunOVNNbctl("get", "logical_switch_port", portName, "dynamic_addresses") } if err == nil && out != "[]" { break } if err != nil { - logrus.Errorf("Error while obtaining addresses for %s - %v", portName, - err) + log.Error(err, "Error while obtaining addresses for", "portName", portName) return } time.Sleep(time.Second) count-- } if count == 0 { - logrus.Errorf("Error while obtaining addresses for %s "+ - "stdout: %q, stderr: %q, (%v)", portName, out, stderr, err) + log.Error(err, "Error while obtaining addresses for", "portName", portName, "stdout", out, "stderr", stderr) return } @@ -239,14 +272,14 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA outStr = strings.Trim(outStr, `"`) addresses := strings.Split(outStr, " ") if len(addresses) != 2 { - logrus.Errorf("Error while obtaining addresses for %s", portName) + log.Info("Error while obtaining addresses for", "portName", portName) return } if netType == "virtual" { gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch) if err != nil { - logrus.Errorf("Error obtaining gateway address for switch %s: %s", logicalSwitch, err) + log.Error(err, "Error obtaining gateway address for switch", "logicalSwitch", logicalSwitch) return } annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP) @@ -256,102 +289,3 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA return annotation } - -func (oc *Controller) findLogicalSwitch(name string) bool { - // get logical switch from OVN - output, stderr, err := RunOVNNbctlUnix("--data=bare", "--no-heading", - "--columns=name", "find", "logical_switch", "name="+name) - if err != nil { - logrus.Errorf("Error in obtaining list of logical switch, "+ - "stderr: %q, err: %v", - stderr, err) - return false - } - - if strings.Compare(name, output) == 0 { - return true - } else { - logrus.Errorf("Error finding Switch %v", name) - return false - } -} - -func (oc *Controller) addLogicalPort(pod *kapi.Pod) { - var logicalSwitch string - var ipAddress, macAddress, interfaceName, defaultGateway, netType string - - annotation := pod.Annotations["ovnNetwork"] - - if annotation != "" { - ovnNetObjs, err := parseOvnNetworkObject(annotation) - if err != nil { - logrus.Errorf("addLogicalPort : Error Parsing OvnNetwork List") - return - } - var ovnString, outStr string - ovnString = "[" - for _, net := range ovnNetObjs { - logicalSwitch = net["name"].(string) - if !oc.findLogicalSwitch(logicalSwitch) { - logrus.Errorf("Logical Switch not found") - return - } - if _, ok := net["interface"]; ok { - interfaceName = net["interface"].(string) - } else { - logrus.Errorf("Interface name must be provided") - return - } - if _, ok := net["ipAddress"]; ok { - ipAddress = net["ipAddress"].(string) - } else { - ipAddress = "" - } - if _, ok := net["macAddress"]; ok { - macAddress = net["macAddress"].(string) - } else { - macAddress = "" - } - if _, ok := net["defaultGateway"]; ok { - defaultGateway = net["defaultGateway"].(string) - } else { - defaultGateway = "false" - } - if _, ok := net["netType"]; ok { - netType = net["netType"].(string) - } else { - netType = "virtual" - } - if netType != "provider" && netType != "virtual" { - logrus.Errorf("netType is not supported") - return - } - if netType == "provider" && ipAddress == "" { - logrus.Errorf("ipAddress must be provided for netType Provider") - return - } - if netType == "provider" && defaultGateway == "true" { - logrus.Errorf("defaultGateway not supported for provider network - Use ovnNetworkRoutes to add routes") - return - } - outStr = oc.addLogicalPortWithSwitch(pod, logicalSwitch, ipAddress, macAddress, interfaceName, netType) - if outStr == "" { - return - } - last := len(outStr) - 1 - tmpString := outStr[:last] - tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + defaultGateway + "\\\"" - tmpString += "," + "\\\"interface\\\":" + "\\\"" + interfaceName + "\\\"}" - ovnString += tmpString - ovnString += "," - } - last := len(ovnString) - 1 - ovnString = ovnString[:last] - ovnString += "]" - logrus.Debugf("ovnIfaceList - %v", ovnString) - err = oc.kube.SetAnnotationOnPod(pod, "ovnIfaceList", ovnString) - if err != nil { - logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err) - } - } -} diff --git a/internal/pkg/ovn/ovn_test.go b/internal/pkg/ovn/ovn_test.go index bc33d35..6e38759 100644 --- a/internal/pkg/ovn/ovn_test.go +++ b/internal/pkg/ovn/ovn_test.go @@ -9,9 +9,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" "ovn4nfv-k8s-plugin/internal/pkg/config" - "ovn4nfv-k8s-plugin/internal/pkg/factory" ovntest "ovn4nfv-k8s-plugin/internal/pkg/testing" . "github.com/onsi/ginkgo" @@ -30,10 +28,10 @@ var _ = Describe("Add logical Port", func() { var app *cli.App BeforeEach(func() { - app = cli.NewApp() app.Name = "test" app.Flags = config.Flags + }) It("tests Pod", func() { @@ -68,15 +66,15 @@ var _ = Describe("Add logical Port", func() { return fmt.Sprintf("/fake-bin/%s", file), nil }, } - - err := SetExec(fexec) + oldSetupOvnUtils := SetupOvnUtils + // as we are exiting, revert ConfigureInterface back at end of function + defer func() { SetupOvnUtils = oldSetupOvnUtils }() + SetupOvnUtils = func() error { + return nil + } + ovnController, err := NewOvnController(fexec) Expect(err).NotTo(HaveOccurred()) - fakeClient := &fake.Clientset{} - var fakeWatchFactory factory.WatchFactory - - ovnController := NewOvnController(fakeClient, &fakeWatchFactory) - Expect(err).NotTo(HaveOccurred()) var ( okPod = v1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -84,8 +82,7 @@ var _ = Describe("Add logical Port", func() { APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "ok", - Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\" , \"defaultGateway\": \"true\"}]"}, + Name: "ok", }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -97,8 +94,8 @@ var _ = Describe("Add logical Port", func() { }, } ) - - ovnController.addLogicalPort(&okPod) + a := []map[string]interface{}{{"name": "ovn-prot-net", "interface": "net0"}} + ovnController.AddLogicalPorts(&okPod, a) Expect(fexec.CommandCalls).To(Equal(len(fakeCmds))) return nil @@ -137,13 +134,14 @@ var _ = Describe("Add logical Port", func() { return fmt.Sprintf("/fake-bin/%s", file), nil }, } - err := SetExec(fexec) + oldSetupOvnUtils := SetupOvnUtils + // as we are exiting, revert ConfigureInterface back at end of function + defer func() { SetupOvnUtils = oldSetupOvnUtils }() + SetupOvnUtils = func() error { + return nil + } + ovnController, err := NewOvnController(fexec) Expect(err).NotTo(HaveOccurred()) - - fakeClient := &fake.Clientset{} - var fakeWatchFactory factory.WatchFactory - - ovnController := NewOvnController(fakeClient, &fakeWatchFactory) var ( okPod = v1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -151,8 +149,7 @@ var _ = Describe("Add logical Port", func() { APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ - Name: "ok", - Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\", \"netType\": \"provider\", \"ipAddress\": \"192.168.1.3/24\", \"macAddress\": \"0a:00:00:00:00:01\" }]"}, + Name: "ok", }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -164,7 +161,8 @@ var _ = Describe("Add logical Port", func() { }, } ) - ovnController.addLogicalPort(&okPod) + a := []map[string]interface{}{{"name": "ovn-prot-net", "interface": "net0", "netType": "provider", "ipAddress": "192.168.1.3/24", "macAddress": "0a:00:00:00:00:01"}} + ovnController.AddLogicalPorts(&okPod, a) Expect(fexec.CommandCalls).To(Equal(len(fakeCmds))) return nil diff --git a/internal/pkg/ovn/utils.go b/internal/pkg/ovn/utils.go index 1700bf8..2478ac2 100644 --- a/internal/pkg/ovn/utils.go +++ b/internal/pkg/ovn/utils.go @@ -3,55 +3,53 @@ package ovn import ( "bytes" "fmt" + kexec "k8s.io/utils/exec" + "os" "strings" "time" - "unicode" - - "github.com/sirupsen/logrus" - kexec "k8s.io/utils/exec" ) const ( ovsCommandTimeout = 15 - ovsVsctlCommand = "ovs-vsctl" - ovsOfctlCommand = "ovs-ofctl" ovnNbctlCommand = "ovn-nbctl" - ipCommand = "ip" ) // Exec runs various OVN and OVS utilities type execHelper struct { exec kexec.Interface - ofctlPath string - vsctlPath string nbctlPath string - ipPath string + hostIP string + hostPort string } var runner *execHelper +// SetupOvnUtils does internal OVN initialization +var SetupOvnUtils = func() error { + runner.hostIP = os.Getenv("HOST_IP") + // OVN Host Port + runner.hostPort = "6641" + log.Info("Host Port", "IP", runner.hostIP, "Port", runner.hostPort) + + // Setup Distributed Router + err := setupDistributedRouter(ovn4nfvRouterName) + if err != nil { + log.Error(err, "Failed to initialize OVN Distributed Router") + return err + } + return nil +} + // SetExec validates executable paths and saves the given exec interface // to be used for running various OVS and OVN utilites func SetExec(exec kexec.Interface) error { var err error runner = &execHelper{exec: exec} - runner.ofctlPath, err = exec.LookPath(ovsOfctlCommand) - if err != nil { - return err - } - runner.vsctlPath, err = exec.LookPath(ovsVsctlCommand) - if err != nil { - return err - } runner.nbctlPath, err = exec.LookPath(ovnNbctlCommand) if err != nil { return err } - runner.ipPath, err = exec.LookPath(ipCommand) - if err != nil { - return err - } return nil } @@ -65,7 +63,6 @@ func runOVNretry(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, if err == nil { return stdout, stderr, err } - // Connection refused // Master may not be up so keep trying if strings.Contains(stderr.String(), "Connection refused") { @@ -87,34 +84,29 @@ func run(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) { cmd := runner.exec.Command(cmdPath, args...) cmd.SetStdout(stdout) cmd.SetStderr(stderr) - logrus.Debugf("exec: %s %s", cmdPath, strings.Join(args, " ")) + log.Info("exec:", "cmdPath", cmdPath, "args", strings.Join(args, " ")) err := cmd.Run() if err != nil { - logrus.Debugf("exec: %s %s => %v", cmdPath, strings.Join(args, " "), err) + log.Error(err, "exec:", "cmdPath", cmdPath, "args", strings.Join(args, " ")) } return stdout, stderr, err } -// RunOVSVsctl runs a command via ovs-vsctl. -func RunOVSVsctlUnix(args ...string) (string, string, error) { - cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} - cmdArgs = append(cmdArgs, args...) - stdout, stderr, err := run(runner.vsctlPath, cmdArgs...) - return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err -} - -// RunOVNNbctlUnix runs command via ovn-nbctl, with ovn-nbctl using the unix -// domain sockets to connect to the ovsdb-server backing the OVN NB database. -func RunOVNNbctlUnix(args ...string) (string, string, error) { - cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)} +// RunOVNSbctlWithTimeout runs command via ovn-nbctl with a specific timeout +func RunOVNNbctlWithTimeout(timeout int, args ...string) (string, string, error) { + var cmdArgs []string + if len(runner.hostIP) > 0 { + cmdArgs = []string{ + fmt.Sprintf("--db=tcp:%s:%s", runner.hostIP, runner.hostPort), + } + } + cmdArgs = append(cmdArgs, fmt.Sprintf("--timeout=%d", timeout)) cmdArgs = append(cmdArgs, args...) stdout, stderr, err := runOVNretry(runner.nbctlPath, cmdArgs...) - return strings.Trim(strings.TrimFunc(stdout.String(), unicode.IsSpace), "\""), - stderr.String(), err + return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err } -// RunIP runs a command via the iproute2 "ip" utility -func RunIP(args ...string) (string, string, error) { - stdout, stderr, err := run(runner.ipPath, args...) - return strings.TrimSpace(stdout.String()), stderr.String(), err +// RunOVNNbctl runs a command via ovn-nbctl. +func RunOVNNbctl(args ...string) (string, string, error) { + return RunOVNNbctlWithTimeout(ovsCommandTimeout, args...) } |