aboutsummaryrefslogtreecommitdiffstats
path: root/internal/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'internal/pkg')
-rw-r--r--internal/pkg/factory/factory.go318
-rw-r--r--internal/pkg/factory/factory_test.go686
-rw-r--r--internal/pkg/kube/kube.go28
-rw-r--r--internal/pkg/ovn/common.go65
-rw-r--r--internal/pkg/ovn/ovn.go368
-rw-r--r--internal/pkg/ovn/ovn_test.go44
-rw-r--r--internal/pkg/ovn/utils.go78
7 files changed, 260 insertions, 1327 deletions
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go
deleted file mode 100644
index a635e3f..0000000
--- a/internal/pkg/factory/factory.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package factory
-
-import (
- "fmt"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/sirupsen/logrus"
-
- kapi "k8s.io/api/core/v1"
- knet "k8s.io/api/networking/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- informerfactory "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
-)
-
-type informer struct {
- sync.Mutex
- oType reflect.Type
- inf cache.SharedIndexInformer
- handlers map[uint64]cache.ResourceEventHandler
-}
-
-func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) {
- i.Lock()
- defer i.Unlock()
-
- objType := reflect.TypeOf(obj)
- if objType != i.oType {
- logrus.Errorf("object type %v did not match expected %v", objType, i.oType)
- return
- }
-
- for id, handler := range i.handlers {
- f(id, handler)
- }
-}
-
-// WatchFactory initializes and manages common kube watches
-type WatchFactory struct {
- iFactory informerfactory.SharedInformerFactory
- informers map[reflect.Type]*informer
- handlerCounter uint64
-}
-
-const (
- resyncInterval = 12 * time.Hour
-)
-
-func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer {
- return &informer{
- oType: oType,
- inf: inf,
- handlers: make(map[uint64]cache.ResourceEventHandler),
- }
-}
-
-var (
- podType reflect.Type = reflect.TypeOf(&kapi.Pod{})
- serviceType reflect.Type = reflect.TypeOf(&kapi.Service{})
- endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{})
- policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{})
- namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{})
- nodeType reflect.Type = reflect.TypeOf(&kapi.Node{})
-)
-
-// NewWatchFactory initializes a new watch factory
-func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) {
- // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
- // any race condition where a resync may be required e.g. cni executable on node watching for
- // events on pods and assuming that an 'ADD' event will contain the annotations put in by
- // ovnkube master (currently, it is just a 'get' loop)
- // the downside of making it tight (like 10 minutes) is needless spinning on all resources
- wf := &WatchFactory{
- iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
- informers: make(map[reflect.Type]*informer),
- }
-
- // Create shared informers we know we'll use
- wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer())
- wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer())
- wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer())
- wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer())
- wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer())
- wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer())
-
- wf.iFactory.Start(stopChan)
- res := wf.iFactory.WaitForCacheSync(stopChan)
- for oType, synced := range res {
- if !synced {
- return nil, fmt.Errorf("error in syncing cache for %v informer", oType)
- }
- informer := wf.informers[oType]
- informer.inf.AddEventHandler(wf.newFederatedHandler(informer))
- }
-
- return wf, nil
-}
-
-func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs {
- return cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v ADD event for handler %d", inf.oType, id)
- handler.OnAdd(obj)
- })
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id)
- handler.OnUpdate(oldObj, newObj)
- })
- },
- DeleteFunc: func(obj interface{}) {
- if inf.oType != reflect.TypeOf(obj) {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- logrus.Errorf("couldn't get object from tombstone: %+v", obj)
- return
- }
- obj = tombstone.Obj
- objType := reflect.TypeOf(obj)
- if inf.oType != objType {
- logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType)
- return
- }
- }
- inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id)
- handler.OnDelete(obj)
- })
- },
- }
-}
-
-func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
- switch objType {
- case podType:
- if pod, ok := obj.(*kapi.Pod); ok {
- return &pod.ObjectMeta, nil
- }
- case serviceType:
- if service, ok := obj.(*kapi.Service); ok {
- return &service.ObjectMeta, nil
- }
- case endpointsType:
- if endpoints, ok := obj.(*kapi.Endpoints); ok {
- return &endpoints.ObjectMeta, nil
- }
- case policyType:
- if policy, ok := obj.(*knet.NetworkPolicy); ok {
- return &policy.ObjectMeta, nil
- }
- case namespaceType:
- if namespace, ok := obj.(*kapi.Namespace); ok {
- return &namespace.ObjectMeta, nil
- }
- case nodeType:
- if node, ok := obj.(*kapi.Node); ok {
- return &node.ObjectMeta, nil
- }
- }
- return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType)
-}
-
-func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- inf, ok := wf.informers[objType]
- if !ok {
- return 0, fmt.Errorf("unknown object type %v", objType)
- }
-
- sel, err := metav1.LabelSelectorAsSelector(lsel)
- if err != nil {
- return 0, fmt.Errorf("error creating label selector: %v", err)
- }
-
- filterFunc := func(obj interface{}) bool {
- if namespace == "" && lsel == nil {
- // Unfiltered handler
- return true
- }
- meta, err := getObjectMeta(objType, obj)
- if err != nil {
- logrus.Errorf("watch handler filter error: %v", err)
- return false
- }
- if namespace != "" && meta.Namespace != namespace {
- return false
- }
- if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) {
- return false
- }
- return true
- }
-
- // Process existing items as a set so the caller can clean up
- // after a restart or whatever
- existingItems := inf.inf.GetStore().List()
- if processExisting != nil {
- items := make([]interface{}, 0)
- for _, obj := range existingItems {
- if filterFunc(obj) {
- items = append(items, obj)
- }
- }
- processExisting(items)
- }
-
- handlerID := atomic.AddUint64(&wf.handlerCounter, 1)
-
- inf.Lock()
- defer inf.Unlock()
-
- inf.handlers[handlerID] = cache.FilteringResourceEventHandler{
- FilterFunc: filterFunc,
- Handler: funcs,
- }
- logrus.Debugf("added %v event handler %d", objType, handlerID)
-
- // Send existing items to the handler's add function; informers usually
- // do this but since we share informers, it's long-since happened so
- // we must emulate that here
- for _, obj := range existingItems {
- inf.handlers[handlerID].OnAdd(obj)
- }
-
- return handlerID, nil
-}
-
-func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error {
- inf, ok := wf.informers[objType]
- if !ok {
- return fmt.Errorf("tried to remove unknown object type %v event handler", objType)
- }
-
- inf.Lock()
- defer inf.Unlock()
- if _, ok := inf.handlers[handlerID]; !ok {
- return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID)
- }
- delete(inf.handlers, handlerID)
- logrus.Debugf("removed %v event handler %d", objType, handlerID)
- return nil
-}
-
-// AddPodHandler adds a handler function that will be executed on Pod object changes
-func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(podType, "", nil, handlerFuncs, processExisting)
-}
-
-// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change
-func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting)
-}
-
-// RemovePodHandler removes a Pod object event handler function
-func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error {
- return wf.removeHandler(podType, handlerID)
-}
-
-// AddServiceHandler adds a handler function that will be executed on Service object changes
-func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveServiceHandler removes a Service object event handler function
-func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error {
- return wf.removeHandler(serviceType, handlerID)
-}
-
-// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes
-func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveEndpointsHandler removes a Endpoints object event handler function
-func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error {
- return wf.removeHandler(endpointsType, handlerID)
-}
-
-// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes
-func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemovePolicyHandler removes a NetworkPolicy object event handler function
-func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error {
- return wf.removeHandler(policyType, handlerID)
-}
-
-// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
-func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting)
-}
-
-// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change
-func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting)
-}
-
-// RemoveNamespaceHandler removes a Namespace object event handler function
-func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error {
- return wf.removeHandler(namespaceType, handlerID)
-}
-
-// AddNodeHandler adds a handler function that will be executed on Node object changes
-func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveNodeHandler removes a Node object event handler function
-func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error {
- return wf.removeHandler(nodeType, handlerID)
-}
diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go
deleted file mode 100644
index 38fb77e..0000000
--- a/internal/pkg/factory/factory_test.go
+++ /dev/null
@@ -1,686 +0,0 @@
-package factory
-
-import (
- "reflect"
- "testing"
-
- "k8s.io/api/core/v1"
- knet "k8s.io/api/networking/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/kubernetes/fake"
- core "k8s.io/client-go/testing"
- "k8s.io/client-go/tools/cache"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-)
-
-func TestFactory(t *testing.T) {
- RegisterFailHandler(Fail)
- RunSpecs(t, "Watch Factory Suite")
-}
-
-func newObjectMeta(name, namespace string) metav1.ObjectMeta {
- return metav1.ObjectMeta{
- Name: name,
- UID: types.UID(name),
- Namespace: namespace,
- Labels: map[string]string{
- "name": name,
- },
- }
-}
-
-func newPod(name, namespace string) *v1.Pod {
- return &v1.Pod{
- Status: v1.PodStatus{
- Phase: v1.PodRunning,
- },
- ObjectMeta: newObjectMeta(name, namespace),
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "containerName",
- Image: "containerImage",
- },
- },
- NodeName: "mynode",
- },
- }
-}
-
-func newNamespace(name string) *v1.Namespace {
- return &v1.Namespace{
- Status: v1.NamespaceStatus{
- Phase: v1.NamespaceActive,
- },
- ObjectMeta: newObjectMeta(name, name),
- }
-}
-
-func newNode(name string) *v1.Node {
- return &v1.Node{
- Status: v1.NodeStatus{
- Phase: v1.NodeRunning,
- },
- ObjectMeta: newObjectMeta(name, ""),
- }
-}
-
-func newPolicy(name, namespace string) *knet.NetworkPolicy {
- return &knet.NetworkPolicy{
- ObjectMeta: newObjectMeta(name, namespace),
- }
-}
-
-func newEndpoints(name, namespace string) *v1.Endpoints {
- return &v1.Endpoints{
- ObjectMeta: newObjectMeta(name, namespace),
- }
-}
-
-func newService(name, namespace string) *v1.Service {
- return &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- UID: types.UID(name),
- Namespace: namespace,
- Labels: map[string]string{
- "name": name,
- },
- },
- }
-}
-
-func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher {
- w := watch.NewFake()
- c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil))
- c.AddReactor("list", objType, listFn)
- return w
-}
-
-var _ = Describe("Watch Factory Operations", func() {
- var (
- fakeClient *fake.Clientset
- podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher
- policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher
- pods []*v1.Pod
- namespaces []*v1.Namespace
- nodes []*v1.Node
- policies []*knet.NetworkPolicy
- endpoints []*v1.Endpoints
- services []*v1.Service
- stop chan struct{}
- numAdded, numUpdated, numDeleted int
- )
-
- BeforeEach(func() {
- fakeClient = &fake.Clientset{}
- stop = make(chan struct{})
-
- pods = make([]*v1.Pod, 0)
- podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.PodList{}
- for _, p := range pods {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- namespaces = make([]*v1.Namespace, 0)
- namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.NamespaceList{}
- for _, p := range namespaces {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- nodes = make([]*v1.Node, 0)
- nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.NodeList{}
- for _, p := range nodes {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- policies = make([]*knet.NetworkPolicy, 0)
- policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) {
- obj := &knet.NetworkPolicyList{}
- for _, p := range policies {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- endpoints = make([]*v1.Endpoints, 0)
- endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.EndpointsList{}
- for _, p := range endpoints {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- services = make([]*v1.Service, 0)
- serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.ServiceList{}
- for _, p := range services {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- numAdded = 0
- numUpdated = 0
- numDeleted = 0
- })
-
- Context("when a processExisting is given", func() {
- testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
- id, err := wf.addHandler(objType, namespace, lsel,
- cache.ResourceEventHandlerFuncs{},
- func(objs []interface{}) {
- Expect(len(objs)).To(Equal(1))
- })
- Expect(err).NotTo(HaveOccurred())
- Expect(id).To(BeNumerically(">", uint64(0)))
- wf.removeHandler(objType, id)
- close(stop)
- }
-
- It("is called for each existing pod", func() {
- pods = append(pods, newPod("pod1", "default"))
- testExisting(podType, "", nil)
- })
-
- It("is called for each existing namespace", func() {
- namespaces = append(namespaces, newNamespace("default"))
- testExisting(namespaceType, "", nil)
- })
-
- It("is called for each existing node", func() {
- nodes = append(nodes, newNode("default"))
- testExisting(nodeType, "", nil)
- })
-
- It("is called for each existing policy", func() {
- policies = append(policies, newPolicy("denyall", "default"))
- testExisting(policyType, "", nil)
- })
-
- It("is called for each existing endpoints", func() {
- endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
- testExisting(endpointsType, "", nil)
- })
-
- It("is called for each existing service", func() {
- services = append(services, newService("myservice", "default"))
- testExisting(serviceType, "", nil)
- })
-
- It("is called for each existing pod that matches a given namespace and label", func() {
- pod := newPod("pod1", "default")
- pod.ObjectMeta.Labels["blah"] = "foobar"
- pods = append(pods, pod)
- testExisting(podType, "default", &metav1.LabelSelector{
- MatchLabels: map[string]string{"blah": "foobar"},
- })
- })
- })
-
- Context("when existing items are known to the informer", func() {
- testExisting := func(objType reflect.Type) {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
- id, err := wf.addHandler(objType, "", nil,
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- numAdded++
- },
- UpdateFunc: func(old, new interface{}) {},
- DeleteFunc: func(obj interface{}) {},
- }, nil)
- Expect(err).NotTo(HaveOccurred())
- Expect(numAdded).To(Equal(2))
- wf.removeHandler(objType, id)
- close(stop)
- }
-
- It("calls ADD for each existing pod", func() {
- pods = append(pods, newPod("pod1", "default"))
- pods = append(pods, newPod("pod2", "default"))
- testExisting(podType)
- })
-
- It("calls ADD for each existing namespace", func() {
- namespaces = append(namespaces, newNamespace("default"))
- namespaces = append(namespaces, newNamespace("default2"))
- testExisting(namespaceType)
- })
-
- It("calls ADD for each existing node", func() {
- nodes = append(nodes, newNode("default"))
- nodes = append(nodes, newNode("default2"))
- testExisting(nodeType)
- })
-
- It("calls ADD for each existing policy", func() {
- policies = append(policies, newPolicy("denyall", "default"))
- policies = append(policies, newPolicy("denyall2", "default"))
- testExisting(policyType)
- })
-
- It("calls ADD for each existing endpoints", func() {
- endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
- endpoints = append(endpoints, newEndpoints("myendpoint2", "default"))
- testExisting(endpointsType)
- })
-
- It("calls ADD for each existing service", func() {
- services = append(services, newService("myservice", "default"))
- services = append(services, newService("myservice2", "default"))
- testExisting(serviceType)
- })
- })
-
- addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 {
- id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- defer GinkgoRecover()
- numAdded++
- funcs.AddFunc(obj)
- },
- UpdateFunc: func(old, new interface{}) {
- defer GinkgoRecover()
- numUpdated++
- funcs.UpdateFunc(old, new)
- },
- DeleteFunc: func(obj interface{}) {
- defer GinkgoRecover()
- numDeleted++
- funcs.DeleteFunc(obj)
- },
- }, nil)
- Expect(err).NotTo(HaveOccurred())
- Expect(id).To(BeNumerically(">", uint64(0)))
- return id
- }
-
- addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 {
- return addFilteredHandler(wf, objType, "", nil, funcs)
- }
-
- It("responds to pod add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newPod("pod1", "default")
- id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newPod := new.(*v1.Pod)
- Expect(reflect.DeepEqual(newPod, added)).To(BeTrue())
- Expect(newPod.Spec.NodeName).To(Equal("foobar"))
- },
- DeleteFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
- },
- })
-
- pods = append(pods, added)
- podWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Spec.NodeName = "foobar"
- podWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- pods = pods[:0]
- podWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.RemovePodHandler(id)
- close(stop)
- })
-
- It("responds to namespace add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newNamespace("default")
- id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- ns := obj.(*v1.Namespace)
- Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newNS := new.(*v1.Namespace)
- Expect(reflect.DeepEqual(newNS, added)).To(BeTrue())
- Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating))
- },
- DeleteFunc: func(obj interface{}) {
- ns := obj.(*v1.Namespace)
- Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
- },
- })
-
- namespaces = append(namespaces, added)
- namespaceWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Status.Phase = v1.NamespaceTerminating
- namespaceWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- namespaces = namespaces[:0]
- namespaceWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.RemoveNamespaceHandler(id)
- close(stop)
- })
-
- It("responds to node add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newNode("mynode")
- id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- node := obj.(*v1.Node)
- Expect(reflect.DeepEqual(node, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newNode := new.(*v1.Node)
- Expect(reflect.DeepEqual(newNode, added)).To(BeTrue())
- Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated))
- },
- DeleteFunc: func(obj interface{}) {
- node := obj.(*v1.Node)
- Expect(reflect.DeepEqual(node, added)).To(BeTrue())
- },
- })
-
- nodes = append(nodes, added)
- nodeWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Status.Phase = v1.NodeTerminated
- nodeWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- nodes = nodes[:0]
- nodeWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(nodeType, id)
- close(stop)
- })
-
- It("responds to policy add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newPolicy("mypolicy", "default")
- id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- np := obj.(*knet.NetworkPolicy)
- Expect(reflect.DeepEqual(np, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newNP := new.(*knet.NetworkPolicy)
- Expect(reflect.DeepEqual(newNP, added)).To(BeTrue())
- Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress}))
- },
- DeleteFunc: func(obj interface{}) {
- np := obj.(*knet.NetworkPolicy)
- Expect(reflect.DeepEqual(np, added)).To(BeTrue())
- },
- })
-
- policies = append(policies, added)
- policyWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress}
- policyWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- policies = policies[:0]
- policyWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(policyType, id)
- close(stop)
- })
-
- It("responds to endpoints add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newEndpoints("myendpoints", "default")
- id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- eps := obj.(*v1.Endpoints)
- Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newEPs := new.(*v1.Endpoints)
- Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue())
- Expect(len(newEPs.Subsets)).To(Equal(1))
- },
- DeleteFunc: func(obj interface{}) {
- eps := obj.(*v1.Endpoints)
- Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
- },
- })
-
- endpoints = append(endpoints, added)
- endpointsWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Subsets = append(added.Subsets, v1.EndpointSubset{
- Ports: []v1.EndpointPort{
- {
- Name: "foobar",
- Port: 1234,
- },
- },
- })
- endpointsWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- endpoints = endpoints[:0]
- endpointsWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(endpointsType, id)
- close(stop)
- })
-
- It("responds to service add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newService("myservice", "default")
- id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- service := obj.(*v1.Service)
- Expect(reflect.DeepEqual(service, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newService := new.(*v1.Service)
- Expect(reflect.DeepEqual(newService, added)).To(BeTrue())
- Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1"))
- },
- DeleteFunc: func(obj interface{}) {
- service := obj.(*v1.Service)
- Expect(reflect.DeepEqual(service, added)).To(BeTrue())
- },
- })
-
- services = append(services, added)
- serviceWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Spec.ClusterIP = "1.1.1.1"
- serviceWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- services = services[:0]
- serviceWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(serviceType, id)
- close(stop)
- })
-
- It("stops processing events after the handler is removed", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newNamespace("default")
- id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {},
- UpdateFunc: func(old, new interface{}) {},
- DeleteFunc: func(obj interface{}) {},
- })
-
- namespaces = append(namespaces, added)
- namespaceWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- wf.RemoveNamespaceHandler(id)
-
- added2 := newNamespace("other")
- namespaces = append(namespaces, added2)
- namespaceWatch.Add(added2)
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
-
- added2.Status.Phase = v1.NamespaceTerminating
- namespaceWatch.Modify(added2)
- Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
- namespaces = []*v1.Namespace{added}
- namespaceWatch.Delete(added2)
- Consistently(func() int { return numDeleted }, 2).Should(Equal(0))
-
- close(stop)
- })
-
- It("filters correctly by label and namespace", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- passesFilter := newPod("pod1", "default")
- passesFilter.ObjectMeta.Labels["blah"] = "foobar"
- failsFilter := newPod("pod2", "default")
- failsFilter.ObjectMeta.Labels["blah"] = "baz"
- failsFilter2 := newPod("pod3", "otherns")
- failsFilter2.ObjectMeta.Labels["blah"] = "foobar"
-
- addFilteredHandler(wf,
- podType,
- "default",
- &metav1.LabelSelector{
- MatchLabels: map[string]string{"blah": "foobar"},
- },
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newPod := new.(*v1.Pod)
- Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue())
- },
- DeleteFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
- },
- })
-
- pods = append(pods, passesFilter)
- podWatch.Add(passesFilter)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
-
- // numAdded should remain 1
- pods = append(pods, failsFilter)
- podWatch.Add(failsFilter)
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
-
- // numAdded should remain 1
- pods = append(pods, failsFilter2)
- podWatch.Add(failsFilter2)
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
-
- passesFilter.Status.Phase = v1.PodFailed
- podWatch.Modify(passesFilter)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
-
- // numAdded should remain 1
- failsFilter.Status.Phase = v1.PodFailed
- podWatch.Modify(failsFilter)
- Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
-
- failsFilter2.Status.Phase = v1.PodFailed
- podWatch.Modify(failsFilter2)
- Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
-
- pods = []*v1.Pod{failsFilter, failsFilter2}
- podWatch.Delete(passesFilter)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- close(stop)
- })
-
- It("correctly handles object updates that cause filter changes", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- pod := newPod("pod1", "default")
- pod.ObjectMeta.Labels["blah"] = "baz"
-
- equalPod := pod
- id := addFilteredHandler(wf,
- podType,
- "default",
- &metav1.LabelSelector{
- MatchLabels: map[string]string{"blah": "foobar"},
- },
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- p := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {},
- DeleteFunc: func(obj interface{}) {
- p := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
- },
- })
-
- pods = append(pods, pod)
-
- // Pod doesn't pass filter; shouldn't be added
- podWatch.Add(pod)
- Consistently(func() int { return numAdded }, 2).Should(Equal(0))
-
- // Update pod to pass filter; should be treated as add. Need
- // to deep-copy pod when modifying because it's a pointer all
- // the way through when using FakeClient
- podCopy := pod.DeepCopy()
- podCopy.ObjectMeta.Labels["blah"] = "foobar"
- pods = []*v1.Pod{podCopy}
- equalPod = podCopy
- podWatch.Modify(podCopy)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
-
- // Update pod to fail filter; should be treated as delete
- pod.ObjectMeta.Labels["blah"] = "baz"
- podWatch.Modify(pod)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
- Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
-
- wf.RemovePodHandler(id)
- close(stop)
- })
-})
diff --git a/internal/pkg/kube/kube.go b/internal/pkg/kube/kube.go
index cc7c29b..e51963e 100644
--- a/internal/pkg/kube/kube.go
+++ b/internal/pkg/kube/kube.go
@@ -41,7 +41,7 @@ type Kube struct {
func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error {
logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name)
patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
- _, err := k.KClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData))
+ _, err := k.KClient.CoreV1().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData))
if err != nil {
logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err)
}
@@ -52,7 +52,7 @@ func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error {
func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error {
logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name)
patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
- _, err := k.KClient.Core().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData))
+ _, err := k.KClient.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData))
if err != nil {
logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err)
}
@@ -67,7 +67,7 @@ func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key,
ns.Name)
patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key,
value)
- _, err := k.KClient.Core().Namespaces().Patch(ns.Name,
+ _, err := k.KClient.CoreV1().Namespaces().Patch(ns.Name,
types.MergePatchType, []byte(patchData))
if err != nil {
logrus.Errorf("Error in setting annotation on namespace %s: %v",
@@ -78,7 +78,7 @@ func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key,
// GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace
func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) {
- pod, err := k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{})
+ pod, err := k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, err
}
@@ -87,12 +87,12 @@ func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, e
// GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace
func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) {
- return k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
// GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace
func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) {
- return k.KClient.Core().Pods(namespace).List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Pods(namespace).List(metav1.ListOptions{})
}
// GetPodsByLabels obtains the Pod resources from kubernetes apiserver,
@@ -100,42 +100,42 @@ func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) {
func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) {
options := metav1.ListOptions{}
options.LabelSelector = selector.String()
- return k.KClient.Core().Pods(namespace).List(options)
+ return k.KClient.CoreV1().Pods(namespace).List(options)
}
// GetNodes returns the list of all Node objects from kubernetes
func (k *Kube) GetNodes() (*kapi.NodeList, error) {
- return k.KClient.Core().Nodes().List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Nodes().List(metav1.ListOptions{})
}
// GetNode returns the Node resource from kubernetes apiserver, given its name
func (k *Kube) GetNode(name string) (*kapi.Node, error) {
- return k.KClient.Core().Nodes().Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
}
// GetService returns the Service resource from kubernetes apiserver, given its name and namespace
func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) {
- return k.KClient.Core().Services(namespace).Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
}
// GetEndpoints returns all the Endpoint resources from kubernetes
// apiserver, given namespace
func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) {
- return k.KClient.Core().Endpoints(namespace).List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Endpoints(namespace).List(metav1.ListOptions{})
}
// GetNamespace returns the Namespace resource from kubernetes apiserver,
// given its name
func (k *Kube) GetNamespace(name string) (*kapi.Namespace, error) {
- return k.KClient.Core().Namespaces().Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Namespaces().Get(name, metav1.GetOptions{})
}
// GetNamespaces returns all Namespace resource from kubernetes apiserver
func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) {
- return k.KClient.Core().Namespaces().List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Namespaces().List(metav1.ListOptions{})
}
// GetNetworkPolicies returns all network policy objects from kubernetes
func (k *Kube) GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) {
- return k.KClient.Networking().NetworkPolicies(namespace).List(metav1.ListOptions{})
+ return k.KClient.NetworkingV1().NetworkPolicies(namespace).List(metav1.ListOptions{})
}
diff --git a/internal/pkg/ovn/common.go b/internal/pkg/ovn/common.go
index b504440..60cd202 100644
--- a/internal/pkg/ovn/common.go
+++ b/internal/pkg/ovn/common.go
@@ -3,69 +3,82 @@ package ovn
import (
"encoding/json"
"fmt"
- "github.com/sirupsen/logrus"
"math/big"
"math/rand"
"net"
+ logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+ "strings"
"time"
)
-func SetupDistributedRouter(name string) error {
+var log = logf.Log.WithName("ovn")
- // Make sure br-int is created.
- stdout, stderr, err := RunOVSVsctlUnix("--", "--may-exist", "add-br", "br-int")
- if err != nil {
- logrus.Errorf("Failed to create br-int, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
- return err
+func parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) {
+ var ovnNet []map[string]interface{}
+
+ if ovnnetwork == "" {
+ return nil, fmt.Errorf("parseOvnNetworkObject:error")
}
+
+ if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil {
+ return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork)
+ }
+
+ return ovnNet, nil
+}
+
+func setupDistributedRouter(name string) error {
+
// Create a single common distributed router for the cluster.
- stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes")
+ stdout, stderr, err := RunOVNNbctl("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes")
if err != nil {
- logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ log.Error(err, "Failed to create a single common distributed router for the cluster", "stdout", stdout, "stderr", stderr)
return err
}
// Create a logical switch called "ovn4nfv-join" that will be used to connect gateway routers to the distributed router.
// The "ovn4nfv-join" will be allocated IP addresses in the range 100.64.1.0/24.
- stdout, stderr, err = RunOVNNbctlUnix("--may-exist", "ls-add", "ovn4nfv-join")
+ stdout, stderr, err = RunOVNNbctl("--may-exist", "ls-add", "ovn4nfv-join")
if err != nil {
- logrus.Errorf("Failed to create logical switch called \"ovn4nfv-join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ log.Error(err, "Failed to create logical switch called \"ovn4nfv-join\"", "stdout", stdout, "stderr", stderr)
return err
}
// Connect the distributed router to "ovn4nfv-join".
- routerMac, stderr, err := RunOVNNbctlUnix("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac")
+ routerMac, stderr, err := RunOVNNbctl("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac")
if err != nil {
- logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", name, stderr, err)
+ log.Error(err, "Failed to get logical router port rtoj-", "name", name, "stdout", stdout, "stderr", stderr)
return err
}
if routerMac == "" {
routerMac = generateMac()
- stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes")
+ stdout, stderr, err = RunOVNNbctl("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes")
if err != nil {
- logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", name, stdout, stderr, err)
+ log.Error(err, "Failed to add logical router port rtoj", "name", name, "stdout", stdout, "stderr", stderr)
return err
}
}
// Connect the switch "ovn4nfv-join" to the router.
- stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"")
+ stdout, stderr, err = RunOVNNbctl("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"")
if err != nil {
- logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ log.Error(err, "Failed to add logical switch port to logical router", "stdout", stdout, "stderr", stderr)
return err
}
return nil
}
-func parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) {
- var ovnNet []map[string]interface{}
-
- if ovnnetwork == "" {
- return nil, fmt.Errorf("parseOvnNetworkObject:error")
+// Find if switch exists
+func findLogicalSwitch(name string) bool {
+ // get logical switch from OVN
+ output, stderr, err := RunOVNNbctl("--data=bare", "--no-heading",
+ "--columns=name", "find", "logical_switch", "name="+name)
+ if err != nil {
+ log.Error(err, "Error in obtaining list of logical switch", "stderr", stderr)
+ return false
}
- if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil {
- return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork)
+ if strings.Compare(name, output) == 0 {
+ return true
}
-
- return ovnNet, nil
+ return false
}
// generateMac generates mac address.
diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go
index 470416b..dad4641 100644
--- a/internal/pkg/ovn/ovn.go
+++ b/internal/pkg/ovn/ovn.go
@@ -2,93 +2,174 @@ package ovn
import (
"fmt"
+ "github.com/mitchellh/mapstructure"
+ kapi "k8s.io/api/core/v1"
+ kexec "k8s.io/utils/exec"
"strings"
"time"
-
- "github.com/sirupsen/logrus"
- kapi "k8s.io/api/core/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- "ovn4nfv-k8s-plugin/internal/pkg/factory"
- "ovn4nfv-k8s-plugin/internal/pkg/kube"
)
-// Controller structure is the object which holds the controls for starting
-// and reacting upon the watched resources (e.g. pods, endpoints)
type Controller struct {
- kube kube.Interface
- watchFactory *factory.WatchFactory
-
gatewayCache map[string]string
- // A cache of all logical switches seen by the watcher
- logicalSwitchCache map[string]bool
- // A cache of all logical ports seen by the watcher and
- // its corresponding logical switch
- logicalPortCache map[string]string
}
-// NewOvnController creates a new OVN controller for creating logical network
-// infrastructure and policy
-func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory) *Controller {
- return &Controller{
- kube: &kube.Kube{KClient: kubeClient},
- watchFactory: wf,
- logicalSwitchCache: make(map[string]bool),
- logicalPortCache: make(map[string]string),
- gatewayCache: make(map[string]string),
+const (
+ ovn4nfvRouterName = "ovn4nfv-master"
+ Ovn4nfvAnnotationTag = "k8s.plugin.opnfv.org/ovnInterfaces"
+)
+
+type netInterface struct {
+ Name string
+ Interface string
+ NetType string
+ DefaultGateway string
+ IPAddress string
+ MacAddress string
+}
+
+var ovnCtl *Controller
+
+// NewOvnController creates a new OVN controller for creating logical networks
+func NewOvnController(exec kexec.Interface) (*Controller, error) {
+
+ if exec == nil {
+ exec = kexec.New()
+ }
+
+ if err := SetExec(exec); err != nil {
+ log.Error(err, "Failed to initialize exec helper")
+ return nil, err
+ }
+
+ if err := SetupOvnUtils(); err != nil {
+ log.Error(err, "Failed to initialize OVN State")
+ return nil, err
+ }
+ ovnCtl = &Controller{
+ gatewayCache: make(map[string]string),
}
+ return ovnCtl, nil
}
-// Run starts the actual watching. Also initializes any local structures needed.
-func (oc *Controller) Run() error {
- fmt.Println("ovn4nfvk8s watching Pods")
- for _, f := range []func() error{oc.WatchPods} {
- if err := f(); err != nil {
- return err
- }
+// GetOvnController returns OVN controller for creating logical networks
+func GetOvnController() (*Controller, error) {
+ if ovnCtl != nil {
+ return ovnCtl, nil
}
- return nil
+ return nil, fmt.Errorf("OVN Controller not initialized")
}
-// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic
-func (oc *Controller) WatchPods() error {
- _, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*kapi.Pod)
- if pod.Spec.NodeName != "" {
- oc.addLogicalPort(pod)
+// AddLogicalPorts adds ports to the Pod
+func (oc *Controller) AddLogicalPorts(pod *kapi.Pod, ovnNetObjs []map[string]interface{}) (key, value string) {
+
+ if ovnNetObjs == nil {
+ return
+ }
+
+ if pod.Spec.HostNetwork {
+ return
+ }
+
+ if _, ok := pod.Annotations[Ovn4nfvAnnotationTag]; ok {
+ log.Info("AddLogicalPorts : Pod annotation found")
+ return
+ }
+
+ var ovnString, outStr string
+ ovnString = "["
+ var ns netInterface
+ for _, net := range ovnNetObjs {
+
+ err := mapstructure.Decode(net, &ns)
+ if err != nil {
+ log.Error(err, "mapstruct error", "network", net)
+ return
+ }
+
+ if !findLogicalSwitch(ns.Name) {
+ log.Info("Logical Switch not found")
+ return
+ }
+ if ns.Interface == "" {
+ log.Info("Interface name must be provided")
+ return
+ }
+ if ns.DefaultGateway == "" {
+ ns.DefaultGateway = "false"
+ }
+ if ns.NetType == "" || ns.NetType != "provider" {
+ ns.NetType = "virtual"
+ }
+ if ns.NetType == "provider" {
+ if ns.IPAddress == "" {
+ log.Info("ipAddress must be provided for netType Provider")
+ return
}
- },
- UpdateFunc: func(old, newer interface{}) {
- podNew := newer.(*kapi.Pod)
- podOld := old.(*kapi.Pod)
- if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" {
- oc.addLogicalPort(podNew)
+ if ns.DefaultGateway == "true" {
+ log.Info("defaultGateway not supported for provider network - Use ovnNetworkRoutes to add routes")
+ return
}
- },
- DeleteFunc: func(obj interface{}) {
- pod := obj.(*kapi.Pod)
- oc.deleteLogicalPort(pod)
- },
- }, oc.syncPods)
- return err
+
+ }
+
+ outStr = oc.addLogicalPortWithSwitch(pod, ns.Name, ns.IPAddress, ns.MacAddress, ns.Interface, ns.NetType)
+ if outStr == "" {
+ return
+ }
+ last := len(outStr) - 1
+ tmpString := outStr[:last]
+ tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + ns.DefaultGateway + "\\\""
+ tmpString += "," + "\\\"interface\\\":" + "\\\"" + ns.Interface + "\\\"}"
+ ovnString += tmpString
+ ovnString += ","
+ }
+ last := len(ovnString) - 1
+ ovnString = ovnString[:last]
+ ovnString += "]"
+ key = Ovn4nfvAnnotationTag
+ value = ovnString
+ return key, value
}
-func (oc *Controller) syncPods(pods []interface{}) {
+func (oc *Controller) DeleteLogicalPorts(name, namespace string) {
+
+ log.Info("Deleting pod", "name", name)
+ logicalPort := fmt.Sprintf("%s_%s", namespace, name)
+
+ // get the list of logical ports from OVN
+ stdout, stderr, err := RunOVNNbctl("--data=bare", "--no-heading",
+ "--columns=name", "find", "logical_switch_port", "external_ids:pod=true")
+ if err != nil {
+ log.Error(err, "Error in obtaining list of logical ports ", "stdout", stdout, "stderr", stderr)
+ return
+ }
+ log.Info("Deleting", "Existing Ports", stdout)
+ existingLogicalPorts := strings.Fields(stdout)
+ for _, existingPort := range existingLogicalPorts {
+ if strings.Contains(existingPort, logicalPort) {
+ // found, delete this logical port
+ log.Info("Deleting", "existingPort", existingPort)
+ stdout, stderr, err := RunOVNNbctl("--if-exists", "lsp-del",
+ existingPort)
+ if err != nil {
+ log.Error(err, "Error in deleting pod's logical port ", "stdout", stdout, "stderr", stderr)
+ }
+ }
+ }
+ return
}
func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string, error) {
var gatewayIPMaskStr, stderr string
var ok bool
var err error
- logrus.Infof("getGatewayFromSwitch: %s", logicalSwitch)
+ log.Info("getGatewayFromSwitch", "logicalSwitch", logicalSwitch)
if gatewayIPMaskStr, ok = oc.gatewayCache[logicalSwitch]; !ok {
- gatewayIPMaskStr, stderr, err = RunOVNNbctlUnix("--if-exists",
+ gatewayIPMaskStr, stderr, err = RunOVNNbctl("--if-exists",
"get", "logical_switch", logicalSwitch,
"external_ids:gateway_ip")
if err != nil {
- logrus.Errorf("Failed to get gateway IP: %s, stderr: %q, %v",
- gatewayIPMaskStr, stderr, err)
+ log.Error(err, "Failed to get gateway IP", "stderr", stderr, "gatewayIPMaskStr", gatewayIPMaskStr)
return "", "", err
}
if gatewayIPMaskStr == "" {
@@ -107,44 +188,6 @@ func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string
return gatewayIP, mask, nil
}
-func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) {
-
- if pod.Spec.HostNetwork {
- return
- }
-
- logrus.Infof("Deleting pod: %s", pod.Name)
- logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name)
-
- // get the list of logical ports from OVN
- output, stderr, err := RunOVNNbctlUnix("--data=bare", "--no-heading",
- "--columns=name", "find", "logical_switch_port", "external_ids:pod=true")
- if err != nil {
- logrus.Errorf("Error in obtaining list of logical ports, "+
- "stderr: %q, err: %v",
- stderr, err)
- return
- }
- logrus.Infof("Exising Ports : %s. ", output)
- existingLogicalPorts := strings.Fields(output)
- for _, existingPort := range existingLogicalPorts {
- if strings.Contains(existingPort, logicalPort) {
- // found, delete this logical port
- logrus.Infof("Deleting: %s. ", existingPort)
- out, stderr, err := RunOVNNbctlUnix("--if-exists", "lsp-del",
- existingPort)
- if err != nil {
- logrus.Errorf("Error in deleting pod's logical port "+
- "stdout: %q, stderr: %q err: %v",
- out, stderr, err)
- } else {
- delete(oc.logicalPortCache, existingPort)
- }
- }
- }
- return
-}
-
func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, interfaceName, netType string) (annotation string) {
var out, stderr string
var err error
@@ -153,9 +196,6 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
return
}
- if !oc.logicalSwitchCache[logicalSwitch] {
- oc.logicalSwitchCache[logicalSwitch] = true
- }
var portName string
if interfaceName != "" {
portName = fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, interfaceName)
@@ -163,7 +203,7 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
return
}
- logrus.Infof("Creating logical port for %s on switch %s", portName, logicalSwitch)
+ log.Info("Creating logical port for on switch", "portName", portName, "logicalSwitch", logicalSwitch)
if ipAddress != "" && macAddress != "" {
isStaticIP = true
@@ -174,7 +214,7 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
}
if isStaticIP {
- out, stderr, err = RunOVNNbctlUnix("--may-exist", "lsp-add",
+ out, stderr, err = RunOVNNbctl("--may-exist", "lsp-add",
logicalSwitch, portName, "--", "lsp-set-addresses", portName,
fmt.Sprintf("%s %s", macAddress, ipAddress), "--", "--if-exists",
"clear", "logical_switch_port", portName, "dynamic_addresses", "--", "set",
@@ -183,13 +223,11 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
"external-ids:logical_switch="+logicalSwitch,
"external-ids:pod=true")
if err != nil {
- logrus.Errorf("Failed to add logical port to switch "+
- "stdout: %q, stderr: %q (%v)",
- out, stderr, err)
+ log.Error(err, "Failed to add logical port to switch", "out", out, "stderr", stderr)
return
}
} else {
- out, stderr, err = RunOVNNbctlUnix("--wait=sb", "--",
+ out, stderr, err = RunOVNNbctl("--wait=sb", "--",
"--may-exist", "lsp-add", logicalSwitch, portName,
"--", "lsp-set-addresses",
portName, "dynamic", "--", "set",
@@ -198,37 +236,32 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
"external-ids:logical_switch="+logicalSwitch,
"external-ids:pod=true")
if err != nil {
- logrus.Errorf("Error while creating logical port %s "+
- "stdout: %q, stderr: %q (%v)",
- portName, out, stderr, err)
+ log.Error(err, "Error while creating logical port %s ", "portName", portName, "stdout", out, "stderr", stderr)
return
}
}
- oc.logicalPortCache[portName] = logicalSwitch
count := 30
for count > 0 {
if isStaticIP {
- out, stderr, err = RunOVNNbctlUnix("get",
+ out, stderr, err = RunOVNNbctl("get",
"logical_switch_port", portName, "addresses")
} else {
- out, stderr, err = RunOVNNbctlUnix("get",
+ out, stderr, err = RunOVNNbctl("get",
"logical_switch_port", portName, "dynamic_addresses")
}
if err == nil && out != "[]" {
break
}
if err != nil {
- logrus.Errorf("Error while obtaining addresses for %s - %v", portName,
- err)
+ log.Error(err, "Error while obtaining addresses for", "portName", portName)
return
}
time.Sleep(time.Second)
count--
}
if count == 0 {
- logrus.Errorf("Error while obtaining addresses for %s "+
- "stdout: %q, stderr: %q, (%v)", portName, out, stderr, err)
+ log.Error(err, "Error while obtaining addresses for", "portName", portName, "stdout", out, "stderr", stderr)
return
}
@@ -239,14 +272,14 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
outStr = strings.Trim(outStr, `"`)
addresses := strings.Split(outStr, " ")
if len(addresses) != 2 {
- logrus.Errorf("Error while obtaining addresses for %s", portName)
+ log.Info("Error while obtaining addresses for", "portName", portName)
return
}
if netType == "virtual" {
gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch)
if err != nil {
- logrus.Errorf("Error obtaining gateway address for switch %s: %s", logicalSwitch, err)
+ log.Error(err, "Error obtaining gateway address for switch", "logicalSwitch", logicalSwitch)
return
}
annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP)
@@ -256,102 +289,3 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
return annotation
}
-
-func (oc *Controller) findLogicalSwitch(name string) bool {
- // get logical switch from OVN
- output, stderr, err := RunOVNNbctlUnix("--data=bare", "--no-heading",
- "--columns=name", "find", "logical_switch", "name="+name)
- if err != nil {
- logrus.Errorf("Error in obtaining list of logical switch, "+
- "stderr: %q, err: %v",
- stderr, err)
- return false
- }
-
- if strings.Compare(name, output) == 0 {
- return true
- } else {
- logrus.Errorf("Error finding Switch %v", name)
- return false
- }
-}
-
-func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
- var logicalSwitch string
- var ipAddress, macAddress, interfaceName, defaultGateway, netType string
-
- annotation := pod.Annotations["ovnNetwork"]
-
- if annotation != "" {
- ovnNetObjs, err := parseOvnNetworkObject(annotation)
- if err != nil {
- logrus.Errorf("addLogicalPort : Error Parsing OvnNetwork List")
- return
- }
- var ovnString, outStr string
- ovnString = "["
- for _, net := range ovnNetObjs {
- logicalSwitch = net["name"].(string)
- if !oc.findLogicalSwitch(logicalSwitch) {
- logrus.Errorf("Logical Switch not found")
- return
- }
- if _, ok := net["interface"]; ok {
- interfaceName = net["interface"].(string)
- } else {
- logrus.Errorf("Interface name must be provided")
- return
- }
- if _, ok := net["ipAddress"]; ok {
- ipAddress = net["ipAddress"].(string)
- } else {
- ipAddress = ""
- }
- if _, ok := net["macAddress"]; ok {
- macAddress = net["macAddress"].(string)
- } else {
- macAddress = ""
- }
- if _, ok := net["defaultGateway"]; ok {
- defaultGateway = net["defaultGateway"].(string)
- } else {
- defaultGateway = "false"
- }
- if _, ok := net["netType"]; ok {
- netType = net["netType"].(string)
- } else {
- netType = "virtual"
- }
- if netType != "provider" && netType != "virtual" {
- logrus.Errorf("netType is not supported")
- return
- }
- if netType == "provider" && ipAddress == "" {
- logrus.Errorf("ipAddress must be provided for netType Provider")
- return
- }
- if netType == "provider" && defaultGateway == "true" {
- logrus.Errorf("defaultGateway not supported for provider network - Use ovnNetworkRoutes to add routes")
- return
- }
- outStr = oc.addLogicalPortWithSwitch(pod, logicalSwitch, ipAddress, macAddress, interfaceName, netType)
- if outStr == "" {
- return
- }
- last := len(outStr) - 1
- tmpString := outStr[:last]
- tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + defaultGateway + "\\\""
- tmpString += "," + "\\\"interface\\\":" + "\\\"" + interfaceName + "\\\"}"
- ovnString += tmpString
- ovnString += ","
- }
- last := len(ovnString) - 1
- ovnString = ovnString[:last]
- ovnString += "]"
- logrus.Debugf("ovnIfaceList - %v", ovnString)
- err = oc.kube.SetAnnotationOnPod(pod, "ovnIfaceList", ovnString)
- if err != nil {
- logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err)
- }
- }
-}
diff --git a/internal/pkg/ovn/ovn_test.go b/internal/pkg/ovn/ovn_test.go
index bc33d35..6e38759 100644
--- a/internal/pkg/ovn/ovn_test.go
+++ b/internal/pkg/ovn/ovn_test.go
@@ -9,9 +9,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes/fake"
"ovn4nfv-k8s-plugin/internal/pkg/config"
- "ovn4nfv-k8s-plugin/internal/pkg/factory"
ovntest "ovn4nfv-k8s-plugin/internal/pkg/testing"
. "github.com/onsi/ginkgo"
@@ -30,10 +28,10 @@ var _ = Describe("Add logical Port", func() {
var app *cli.App
BeforeEach(func() {
-
app = cli.NewApp()
app.Name = "test"
app.Flags = config.Flags
+
})
It("tests Pod", func() {
@@ -68,15 +66,15 @@ var _ = Describe("Add logical Port", func() {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
-
- err := SetExec(fexec)
+ oldSetupOvnUtils := SetupOvnUtils
+ // as we are exiting, revert ConfigureInterface back at end of function
+ defer func() { SetupOvnUtils = oldSetupOvnUtils }()
+ SetupOvnUtils = func() error {
+ return nil
+ }
+ ovnController, err := NewOvnController(fexec)
Expect(err).NotTo(HaveOccurred())
- fakeClient := &fake.Clientset{}
- var fakeWatchFactory factory.WatchFactory
-
- ovnController := NewOvnController(fakeClient, &fakeWatchFactory)
- Expect(err).NotTo(HaveOccurred())
var (
okPod = v1.Pod{
TypeMeta: metav1.TypeMeta{
@@ -84,8 +82,7 @@ var _ = Describe("Add logical Port", func() {
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
- Name: "ok",
- Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\" , \"defaultGateway\": \"true\"}]"},
+ Name: "ok",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@@ -97,8 +94,8 @@ var _ = Describe("Add logical Port", func() {
},
}
)
-
- ovnController.addLogicalPort(&okPod)
+ a := []map[string]interface{}{{"name": "ovn-prot-net", "interface": "net0"}}
+ ovnController.AddLogicalPorts(&okPod, a)
Expect(fexec.CommandCalls).To(Equal(len(fakeCmds)))
return nil
@@ -137,13 +134,14 @@ var _ = Describe("Add logical Port", func() {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
- err := SetExec(fexec)
+ oldSetupOvnUtils := SetupOvnUtils
+ // as we are exiting, revert ConfigureInterface back at end of function
+ defer func() { SetupOvnUtils = oldSetupOvnUtils }()
+ SetupOvnUtils = func() error {
+ return nil
+ }
+ ovnController, err := NewOvnController(fexec)
Expect(err).NotTo(HaveOccurred())
-
- fakeClient := &fake.Clientset{}
- var fakeWatchFactory factory.WatchFactory
-
- ovnController := NewOvnController(fakeClient, &fakeWatchFactory)
var (
okPod = v1.Pod{
TypeMeta: metav1.TypeMeta{
@@ -151,8 +149,7 @@ var _ = Describe("Add logical Port", func() {
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
- Name: "ok",
- Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\", \"netType\": \"provider\", \"ipAddress\": \"192.168.1.3/24\", \"macAddress\": \"0a:00:00:00:00:01\" }]"},
+ Name: "ok",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@@ -164,7 +161,8 @@ var _ = Describe("Add logical Port", func() {
},
}
)
- ovnController.addLogicalPort(&okPod)
+ a := []map[string]interface{}{{"name": "ovn-prot-net", "interface": "net0", "netType": "provider", "ipAddress": "192.168.1.3/24", "macAddress": "0a:00:00:00:00:01"}}
+ ovnController.AddLogicalPorts(&okPod, a)
Expect(fexec.CommandCalls).To(Equal(len(fakeCmds)))
return nil
diff --git a/internal/pkg/ovn/utils.go b/internal/pkg/ovn/utils.go
index 1700bf8..2478ac2 100644
--- a/internal/pkg/ovn/utils.go
+++ b/internal/pkg/ovn/utils.go
@@ -3,55 +3,53 @@ package ovn
import (
"bytes"
"fmt"
+ kexec "k8s.io/utils/exec"
+ "os"
"strings"
"time"
- "unicode"
-
- "github.com/sirupsen/logrus"
- kexec "k8s.io/utils/exec"
)
const (
ovsCommandTimeout = 15
- ovsVsctlCommand = "ovs-vsctl"
- ovsOfctlCommand = "ovs-ofctl"
ovnNbctlCommand = "ovn-nbctl"
- ipCommand = "ip"
)
// Exec runs various OVN and OVS utilities
type execHelper struct {
exec kexec.Interface
- ofctlPath string
- vsctlPath string
nbctlPath string
- ipPath string
+ hostIP string
+ hostPort string
}
var runner *execHelper
+// SetupOvnUtils does internal OVN initialization
+var SetupOvnUtils = func() error {
+ runner.hostIP = os.Getenv("HOST_IP")
+ // OVN Host Port
+ runner.hostPort = "6641"
+ log.Info("Host Port", "IP", runner.hostIP, "Port", runner.hostPort)
+
+ // Setup Distributed Router
+ err := setupDistributedRouter(ovn4nfvRouterName)
+ if err != nil {
+ log.Error(err, "Failed to initialize OVN Distributed Router")
+ return err
+ }
+ return nil
+}
+
// SetExec validates executable paths and saves the given exec interface
// to be used for running various OVS and OVN utilites
func SetExec(exec kexec.Interface) error {
var err error
runner = &execHelper{exec: exec}
- runner.ofctlPath, err = exec.LookPath(ovsOfctlCommand)
- if err != nil {
- return err
- }
- runner.vsctlPath, err = exec.LookPath(ovsVsctlCommand)
- if err != nil {
- return err
- }
runner.nbctlPath, err = exec.LookPath(ovnNbctlCommand)
if err != nil {
return err
}
- runner.ipPath, err = exec.LookPath(ipCommand)
- if err != nil {
- return err
- }
return nil
}
@@ -65,7 +63,6 @@ func runOVNretry(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer,
if err == nil {
return stdout, stderr, err
}
-
// Connection refused
// Master may not be up so keep trying
if strings.Contains(stderr.String(), "Connection refused") {
@@ -87,34 +84,29 @@ func run(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) {
cmd := runner.exec.Command(cmdPath, args...)
cmd.SetStdout(stdout)
cmd.SetStderr(stderr)
- logrus.Debugf("exec: %s %s", cmdPath, strings.Join(args, " "))
+ log.Info("exec:", "cmdPath", cmdPath, "args", strings.Join(args, " "))
err := cmd.Run()
if err != nil {
- logrus.Debugf("exec: %s %s => %v", cmdPath, strings.Join(args, " "), err)
+ log.Error(err, "exec:", "cmdPath", cmdPath, "args", strings.Join(args, " "))
}
return stdout, stderr, err
}
-// RunOVSVsctl runs a command via ovs-vsctl.
-func RunOVSVsctlUnix(args ...string) (string, string, error) {
- cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)}
- cmdArgs = append(cmdArgs, args...)
- stdout, stderr, err := run(runner.vsctlPath, cmdArgs...)
- return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err
-}
-
-// RunOVNNbctlUnix runs command via ovn-nbctl, with ovn-nbctl using the unix
-// domain sockets to connect to the ovsdb-server backing the OVN NB database.
-func RunOVNNbctlUnix(args ...string) (string, string, error) {
- cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)}
+// RunOVNSbctlWithTimeout runs command via ovn-nbctl with a specific timeout
+func RunOVNNbctlWithTimeout(timeout int, args ...string) (string, string, error) {
+ var cmdArgs []string
+ if len(runner.hostIP) > 0 {
+ cmdArgs = []string{
+ fmt.Sprintf("--db=tcp:%s:%s", runner.hostIP, runner.hostPort),
+ }
+ }
+ cmdArgs = append(cmdArgs, fmt.Sprintf("--timeout=%d", timeout))
cmdArgs = append(cmdArgs, args...)
stdout, stderr, err := runOVNretry(runner.nbctlPath, cmdArgs...)
- return strings.Trim(strings.TrimFunc(stdout.String(), unicode.IsSpace), "\""),
- stderr.String(), err
+ return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err
}
-// RunIP runs a command via the iproute2 "ip" utility
-func RunIP(args ...string) (string, string, error) {
- stdout, stderr, err := run(runner.ipPath, args...)
- return strings.TrimSpace(stdout.String()), stderr.String(), err
+// RunOVNNbctl runs a command via ovn-nbctl.
+func RunOVNNbctl(args ...string) (string, string, error) {
+ return RunOVNNbctlWithTimeout(ovsCommandTimeout, args...)
}