aboutsummaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
authorRitu Sood <ritu.sood@intel.com>2019-08-06 19:35:42 -0700
committerRitu Sood <ritu.sood@intel.com>2019-08-15 10:03:47 -0700
commit8295a28f6d6e14f5adb62138271de393015061e9 (patch)
treed11b1e799de55e89d08bc810180d99ce65e6f21e /internal
parentaa41b49246d84b605a76d169f0c861ba0691a4fb (diff)
Use controller runtime and operator sdk
Changing the framework to use controller runtime and operator sdk. This allows to add CRD controllers for Network, Provider Network etc in the same operator. Binary renamed to nfn-operator (Network funtion networking). Change-Id: Ic25a3c3f5f1418fc0614f3aede48b41d9c1156cd Signed-off-by: Ritu Sood <ritu.sood@intel.com>
Diffstat (limited to 'internal')
-rw-r--r--internal/pkg/factory/factory.go318
-rw-r--r--internal/pkg/factory/factory_test.go686
-rw-r--r--internal/pkg/kube/kube.go28
-rw-r--r--internal/pkg/ovn/common.go65
-rw-r--r--internal/pkg/ovn/ovn.go368
-rw-r--r--internal/pkg/ovn/ovn_test.go44
-rw-r--r--internal/pkg/ovn/utils.go78
7 files changed, 260 insertions, 1327 deletions
diff --git a/internal/pkg/factory/factory.go b/internal/pkg/factory/factory.go
deleted file mode 100644
index a635e3f..0000000
--- a/internal/pkg/factory/factory.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package factory
-
-import (
- "fmt"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/sirupsen/logrus"
-
- kapi "k8s.io/api/core/v1"
- knet "k8s.io/api/networking/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- informerfactory "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
-)
-
-type informer struct {
- sync.Mutex
- oType reflect.Type
- inf cache.SharedIndexInformer
- handlers map[uint64]cache.ResourceEventHandler
-}
-
-func (i *informer) forEachHandler(obj interface{}, f func(id uint64, handler cache.ResourceEventHandler)) {
- i.Lock()
- defer i.Unlock()
-
- objType := reflect.TypeOf(obj)
- if objType != i.oType {
- logrus.Errorf("object type %v did not match expected %v", objType, i.oType)
- return
- }
-
- for id, handler := range i.handlers {
- f(id, handler)
- }
-}
-
-// WatchFactory initializes and manages common kube watches
-type WatchFactory struct {
- iFactory informerfactory.SharedInformerFactory
- informers map[reflect.Type]*informer
- handlerCounter uint64
-}
-
-const (
- resyncInterval = 12 * time.Hour
-)
-
-func newInformer(oType reflect.Type, inf cache.SharedIndexInformer) *informer {
- return &informer{
- oType: oType,
- inf: inf,
- handlers: make(map[uint64]cache.ResourceEventHandler),
- }
-}
-
-var (
- podType reflect.Type = reflect.TypeOf(&kapi.Pod{})
- serviceType reflect.Type = reflect.TypeOf(&kapi.Service{})
- endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{})
- policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{})
- namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{})
- nodeType reflect.Type = reflect.TypeOf(&kapi.Node{})
-)
-
-// NewWatchFactory initializes a new watch factory
-func NewWatchFactory(c kubernetes.Interface, stopChan <-chan struct{}) (*WatchFactory, error) {
- // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
- // any race condition where a resync may be required e.g. cni executable on node watching for
- // events on pods and assuming that an 'ADD' event will contain the annotations put in by
- // ovnkube master (currently, it is just a 'get' loop)
- // the downside of making it tight (like 10 minutes) is needless spinning on all resources
- wf := &WatchFactory{
- iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
- informers: make(map[reflect.Type]*informer),
- }
-
- // Create shared informers we know we'll use
- wf.informers[podType] = newInformer(podType, wf.iFactory.Core().V1().Pods().Informer())
- wf.informers[serviceType] = newInformer(serviceType, wf.iFactory.Core().V1().Services().Informer())
- wf.informers[endpointsType] = newInformer(endpointsType, wf.iFactory.Core().V1().Endpoints().Informer())
- wf.informers[policyType] = newInformer(policyType, wf.iFactory.Networking().V1().NetworkPolicies().Informer())
- wf.informers[namespaceType] = newInformer(namespaceType, wf.iFactory.Core().V1().Namespaces().Informer())
- wf.informers[nodeType] = newInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer())
-
- wf.iFactory.Start(stopChan)
- res := wf.iFactory.WaitForCacheSync(stopChan)
- for oType, synced := range res {
- if !synced {
- return nil, fmt.Errorf("error in syncing cache for %v informer", oType)
- }
- informer := wf.informers[oType]
- informer.inf.AddEventHandler(wf.newFederatedHandler(informer))
- }
-
- return wf, nil
-}
-
-func (wf *WatchFactory) newFederatedHandler(inf *informer) cache.ResourceEventHandlerFuncs {
- return cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v ADD event for handler %d", inf.oType, id)
- handler.OnAdd(obj)
- })
- },
- UpdateFunc: func(oldObj, newObj interface{}) {
- inf.forEachHandler(newObj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v UPDATE event for handler %d", inf.oType, id)
- handler.OnUpdate(oldObj, newObj)
- })
- },
- DeleteFunc: func(obj interface{}) {
- if inf.oType != reflect.TypeOf(obj) {
- tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- logrus.Errorf("couldn't get object from tombstone: %+v", obj)
- return
- }
- obj = tombstone.Obj
- objType := reflect.TypeOf(obj)
- if inf.oType != objType {
- logrus.Errorf("expected tombstone object resource type %v but got %v", inf.oType, objType)
- return
- }
- }
- inf.forEachHandler(obj, func(id uint64, handler cache.ResourceEventHandler) {
- logrus.Debugf("running %v DELETE event for handler %d", inf.oType, id)
- handler.OnDelete(obj)
- })
- },
- }
-}
-
-func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, error) {
- switch objType {
- case podType:
- if pod, ok := obj.(*kapi.Pod); ok {
- return &pod.ObjectMeta, nil
- }
- case serviceType:
- if service, ok := obj.(*kapi.Service); ok {
- return &service.ObjectMeta, nil
- }
- case endpointsType:
- if endpoints, ok := obj.(*kapi.Endpoints); ok {
- return &endpoints.ObjectMeta, nil
- }
- case policyType:
- if policy, ok := obj.(*knet.NetworkPolicy); ok {
- return &policy.ObjectMeta, nil
- }
- case namespaceType:
- if namespace, ok := obj.(*kapi.Namespace); ok {
- return &namespace.ObjectMeta, nil
- }
- case nodeType:
- if node, ok := obj.(*kapi.Node); ok {
- return &node.ObjectMeta, nil
- }
- }
- return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType)
-}
-
-func (wf *WatchFactory) addHandler(objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- inf, ok := wf.informers[objType]
- if !ok {
- return 0, fmt.Errorf("unknown object type %v", objType)
- }
-
- sel, err := metav1.LabelSelectorAsSelector(lsel)
- if err != nil {
- return 0, fmt.Errorf("error creating label selector: %v", err)
- }
-
- filterFunc := func(obj interface{}) bool {
- if namespace == "" && lsel == nil {
- // Unfiltered handler
- return true
- }
- meta, err := getObjectMeta(objType, obj)
- if err != nil {
- logrus.Errorf("watch handler filter error: %v", err)
- return false
- }
- if namespace != "" && meta.Namespace != namespace {
- return false
- }
- if lsel != nil && !sel.Matches(labels.Set(meta.Labels)) {
- return false
- }
- return true
- }
-
- // Process existing items as a set so the caller can clean up
- // after a restart or whatever
- existingItems := inf.inf.GetStore().List()
- if processExisting != nil {
- items := make([]interface{}, 0)
- for _, obj := range existingItems {
- if filterFunc(obj) {
- items = append(items, obj)
- }
- }
- processExisting(items)
- }
-
- handlerID := atomic.AddUint64(&wf.handlerCounter, 1)
-
- inf.Lock()
- defer inf.Unlock()
-
- inf.handlers[handlerID] = cache.FilteringResourceEventHandler{
- FilterFunc: filterFunc,
- Handler: funcs,
- }
- logrus.Debugf("added %v event handler %d", objType, handlerID)
-
- // Send existing items to the handler's add function; informers usually
- // do this but since we share informers, it's long-since happened so
- // we must emulate that here
- for _, obj := range existingItems {
- inf.handlers[handlerID].OnAdd(obj)
- }
-
- return handlerID, nil
-}
-
-func (wf *WatchFactory) removeHandler(objType reflect.Type, handlerID uint64) error {
- inf, ok := wf.informers[objType]
- if !ok {
- return fmt.Errorf("tried to remove unknown object type %v event handler", objType)
- }
-
- inf.Lock()
- defer inf.Unlock()
- if _, ok := inf.handlers[handlerID]; !ok {
- return fmt.Errorf("tried to remove unknown object type %v event handler %d", objType, handlerID)
- }
- delete(inf.handlers, handlerID)
- logrus.Debugf("removed %v event handler %d", objType, handlerID)
- return nil
-}
-
-// AddPodHandler adds a handler function that will be executed on Pod object changes
-func (wf *WatchFactory) AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(podType, "", nil, handlerFuncs, processExisting)
-}
-
-// AddFilteredPodHandler adds a handler function that will be executed when Pod objects that match the given filters change
-func (wf *WatchFactory) AddFilteredPodHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(podType, namespace, lsel, handlerFuncs, processExisting)
-}
-
-// RemovePodHandler removes a Pod object event handler function
-func (wf *WatchFactory) RemovePodHandler(handlerID uint64) error {
- return wf.removeHandler(podType, handlerID)
-}
-
-// AddServiceHandler adds a handler function that will be executed on Service object changes
-func (wf *WatchFactory) AddServiceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(serviceType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveServiceHandler removes a Service object event handler function
-func (wf *WatchFactory) RemoveServiceHandler(handlerID uint64) error {
- return wf.removeHandler(serviceType, handlerID)
-}
-
-// AddEndpointsHandler adds a handler function that will be executed on Endpoints object changes
-func (wf *WatchFactory) AddEndpointsHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(endpointsType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveEndpointsHandler removes a Endpoints object event handler function
-func (wf *WatchFactory) RemoveEndpointsHandler(handlerID uint64) error {
- return wf.removeHandler(endpointsType, handlerID)
-}
-
-// AddPolicyHandler adds a handler function that will be executed on NetworkPolicy object changes
-func (wf *WatchFactory) AddPolicyHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(policyType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemovePolicyHandler removes a NetworkPolicy object event handler function
-func (wf *WatchFactory) RemovePolicyHandler(handlerID uint64) error {
- return wf.removeHandler(policyType, handlerID)
-}
-
-// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
-func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting)
-}
-
-// AddFilteredNamespaceHandler adds a handler function that will be executed when Namespace objects that match the given filters change
-func (wf *WatchFactory) AddFilteredNamespaceHandler(namespace string, lsel *metav1.LabelSelector, handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(namespaceType, namespace, lsel, handlerFuncs, processExisting)
-}
-
-// RemoveNamespaceHandler removes a Namespace object event handler function
-func (wf *WatchFactory) RemoveNamespaceHandler(handlerID uint64) error {
- return wf.removeHandler(namespaceType, handlerID)
-}
-
-// AddNodeHandler adds a handler function that will be executed on Node object changes
-func (wf *WatchFactory) AddNodeHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (uint64, error) {
- return wf.addHandler(nodeType, "", nil, handlerFuncs, processExisting)
-}
-
-// RemoveNodeHandler removes a Node object event handler function
-func (wf *WatchFactory) RemoveNodeHandler(handlerID uint64) error {
- return wf.removeHandler(nodeType, handlerID)
-}
diff --git a/internal/pkg/factory/factory_test.go b/internal/pkg/factory/factory_test.go
deleted file mode 100644
index 38fb77e..0000000
--- a/internal/pkg/factory/factory_test.go
+++ /dev/null
@@ -1,686 +0,0 @@
-package factory
-
-import (
- "reflect"
- "testing"
-
- "k8s.io/api/core/v1"
- knet "k8s.io/api/networking/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/kubernetes/fake"
- core "k8s.io/client-go/testing"
- "k8s.io/client-go/tools/cache"
-
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-)
-
-func TestFactory(t *testing.T) {
- RegisterFailHandler(Fail)
- RunSpecs(t, "Watch Factory Suite")
-}
-
-func newObjectMeta(name, namespace string) metav1.ObjectMeta {
- return metav1.ObjectMeta{
- Name: name,
- UID: types.UID(name),
- Namespace: namespace,
- Labels: map[string]string{
- "name": name,
- },
- }
-}
-
-func newPod(name, namespace string) *v1.Pod {
- return &v1.Pod{
- Status: v1.PodStatus{
- Phase: v1.PodRunning,
- },
- ObjectMeta: newObjectMeta(name, namespace),
- Spec: v1.PodSpec{
- Containers: []v1.Container{
- {
- Name: "containerName",
- Image: "containerImage",
- },
- },
- NodeName: "mynode",
- },
- }
-}
-
-func newNamespace(name string) *v1.Namespace {
- return &v1.Namespace{
- Status: v1.NamespaceStatus{
- Phase: v1.NamespaceActive,
- },
- ObjectMeta: newObjectMeta(name, name),
- }
-}
-
-func newNode(name string) *v1.Node {
- return &v1.Node{
- Status: v1.NodeStatus{
- Phase: v1.NodeRunning,
- },
- ObjectMeta: newObjectMeta(name, ""),
- }
-}
-
-func newPolicy(name, namespace string) *knet.NetworkPolicy {
- return &knet.NetworkPolicy{
- ObjectMeta: newObjectMeta(name, namespace),
- }
-}
-
-func newEndpoints(name, namespace string) *v1.Endpoints {
- return &v1.Endpoints{
- ObjectMeta: newObjectMeta(name, namespace),
- }
-}
-
-func newService(name, namespace string) *v1.Service {
- return &v1.Service{
- ObjectMeta: metav1.ObjectMeta{
- Name: name,
- UID: types.UID(name),
- Namespace: namespace,
- Labels: map[string]string{
- "name": name,
- },
- },
- }
-}
-
-func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher {
- w := watch.NewFake()
- c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil))
- c.AddReactor("list", objType, listFn)
- return w
-}
-
-var _ = Describe("Watch Factory Operations", func() {
- var (
- fakeClient *fake.Clientset
- podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher
- policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher
- pods []*v1.Pod
- namespaces []*v1.Namespace
- nodes []*v1.Node
- policies []*knet.NetworkPolicy
- endpoints []*v1.Endpoints
- services []*v1.Service
- stop chan struct{}
- numAdded, numUpdated, numDeleted int
- )
-
- BeforeEach(func() {
- fakeClient = &fake.Clientset{}
- stop = make(chan struct{})
-
- pods = make([]*v1.Pod, 0)
- podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.PodList{}
- for _, p := range pods {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- namespaces = make([]*v1.Namespace, 0)
- namespaceWatch = objSetup(fakeClient, "namespaces", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.NamespaceList{}
- for _, p := range namespaces {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- nodes = make([]*v1.Node, 0)
- nodeWatch = objSetup(fakeClient, "nodes", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.NodeList{}
- for _, p := range nodes {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- policies = make([]*knet.NetworkPolicy, 0)
- policyWatch = objSetup(fakeClient, "networkpolicies", func(core.Action) (bool, runtime.Object, error) {
- obj := &knet.NetworkPolicyList{}
- for _, p := range policies {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- endpoints = make([]*v1.Endpoints, 0)
- endpointsWatch = objSetup(fakeClient, "endpoints", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.EndpointsList{}
- for _, p := range endpoints {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- services = make([]*v1.Service, 0)
- serviceWatch = objSetup(fakeClient, "services", func(core.Action) (bool, runtime.Object, error) {
- obj := &v1.ServiceList{}
- for _, p := range services {
- obj.Items = append(obj.Items, *p)
- }
- return true, obj, nil
- })
-
- numAdded = 0
- numUpdated = 0
- numDeleted = 0
- })
-
- Context("when a processExisting is given", func() {
- testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
- id, err := wf.addHandler(objType, namespace, lsel,
- cache.ResourceEventHandlerFuncs{},
- func(objs []interface{}) {
- Expect(len(objs)).To(Equal(1))
- })
- Expect(err).NotTo(HaveOccurred())
- Expect(id).To(BeNumerically(">", uint64(0)))
- wf.removeHandler(objType, id)
- close(stop)
- }
-
- It("is called for each existing pod", func() {
- pods = append(pods, newPod("pod1", "default"))
- testExisting(podType, "", nil)
- })
-
- It("is called for each existing namespace", func() {
- namespaces = append(namespaces, newNamespace("default"))
- testExisting(namespaceType, "", nil)
- })
-
- It("is called for each existing node", func() {
- nodes = append(nodes, newNode("default"))
- testExisting(nodeType, "", nil)
- })
-
- It("is called for each existing policy", func() {
- policies = append(policies, newPolicy("denyall", "default"))
- testExisting(policyType, "", nil)
- })
-
- It("is called for each existing endpoints", func() {
- endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
- testExisting(endpointsType, "", nil)
- })
-
- It("is called for each existing service", func() {
- services = append(services, newService("myservice", "default"))
- testExisting(serviceType, "", nil)
- })
-
- It("is called for each existing pod that matches a given namespace and label", func() {
- pod := newPod("pod1", "default")
- pod.ObjectMeta.Labels["blah"] = "foobar"
- pods = append(pods, pod)
- testExisting(podType, "default", &metav1.LabelSelector{
- MatchLabels: map[string]string{"blah": "foobar"},
- })
- })
- })
-
- Context("when existing items are known to the informer", func() {
- testExisting := func(objType reflect.Type) {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
- id, err := wf.addHandler(objType, "", nil,
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- numAdded++
- },
- UpdateFunc: func(old, new interface{}) {},
- DeleteFunc: func(obj interface{}) {},
- }, nil)
- Expect(err).NotTo(HaveOccurred())
- Expect(numAdded).To(Equal(2))
- wf.removeHandler(objType, id)
- close(stop)
- }
-
- It("calls ADD for each existing pod", func() {
- pods = append(pods, newPod("pod1", "default"))
- pods = append(pods, newPod("pod2", "default"))
- testExisting(podType)
- })
-
- It("calls ADD for each existing namespace", func() {
- namespaces = append(namespaces, newNamespace("default"))
- namespaces = append(namespaces, newNamespace("default2"))
- testExisting(namespaceType)
- })
-
- It("calls ADD for each existing node", func() {
- nodes = append(nodes, newNode("default"))
- nodes = append(nodes, newNode("default2"))
- testExisting(nodeType)
- })
-
- It("calls ADD for each existing policy", func() {
- policies = append(policies, newPolicy("denyall", "default"))
- policies = append(policies, newPolicy("denyall2", "default"))
- testExisting(policyType)
- })
-
- It("calls ADD for each existing endpoints", func() {
- endpoints = append(endpoints, newEndpoints("myendpoint", "default"))
- endpoints = append(endpoints, newEndpoints("myendpoint2", "default"))
- testExisting(endpointsType)
- })
-
- It("calls ADD for each existing service", func() {
- services = append(services, newService("myservice", "default"))
- services = append(services, newService("myservice2", "default"))
- testExisting(serviceType)
- })
- })
-
- addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) uint64 {
- id, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- defer GinkgoRecover()
- numAdded++
- funcs.AddFunc(obj)
- },
- UpdateFunc: func(old, new interface{}) {
- defer GinkgoRecover()
- numUpdated++
- funcs.UpdateFunc(old, new)
- },
- DeleteFunc: func(obj interface{}) {
- defer GinkgoRecover()
- numDeleted++
- funcs.DeleteFunc(obj)
- },
- }, nil)
- Expect(err).NotTo(HaveOccurred())
- Expect(id).To(BeNumerically(">", uint64(0)))
- return id
- }
-
- addHandler := func(wf *WatchFactory, objType reflect.Type, funcs cache.ResourceEventHandlerFuncs) uint64 {
- return addFilteredHandler(wf, objType, "", nil, funcs)
- }
-
- It("responds to pod add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newPod("pod1", "default")
- id := addHandler(wf, podType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newPod := new.(*v1.Pod)
- Expect(reflect.DeepEqual(newPod, added)).To(BeTrue())
- Expect(newPod.Spec.NodeName).To(Equal("foobar"))
- },
- DeleteFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, added)).To(BeTrue())
- },
- })
-
- pods = append(pods, added)
- podWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Spec.NodeName = "foobar"
- podWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- pods = pods[:0]
- podWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.RemovePodHandler(id)
- close(stop)
- })
-
- It("responds to namespace add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newNamespace("default")
- id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- ns := obj.(*v1.Namespace)
- Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newNS := new.(*v1.Namespace)
- Expect(reflect.DeepEqual(newNS, added)).To(BeTrue())
- Expect(newNS.Status.Phase).To(Equal(v1.NamespaceTerminating))
- },
- DeleteFunc: func(obj interface{}) {
- ns := obj.(*v1.Namespace)
- Expect(reflect.DeepEqual(ns, added)).To(BeTrue())
- },
- })
-
- namespaces = append(namespaces, added)
- namespaceWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Status.Phase = v1.NamespaceTerminating
- namespaceWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- namespaces = namespaces[:0]
- namespaceWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.RemoveNamespaceHandler(id)
- close(stop)
- })
-
- It("responds to node add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newNode("mynode")
- id := addHandler(wf, nodeType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- node := obj.(*v1.Node)
- Expect(reflect.DeepEqual(node, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newNode := new.(*v1.Node)
- Expect(reflect.DeepEqual(newNode, added)).To(BeTrue())
- Expect(newNode.Status.Phase).To(Equal(v1.NodeTerminated))
- },
- DeleteFunc: func(obj interface{}) {
- node := obj.(*v1.Node)
- Expect(reflect.DeepEqual(node, added)).To(BeTrue())
- },
- })
-
- nodes = append(nodes, added)
- nodeWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Status.Phase = v1.NodeTerminated
- nodeWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- nodes = nodes[:0]
- nodeWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(nodeType, id)
- close(stop)
- })
-
- It("responds to policy add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newPolicy("mypolicy", "default")
- id := addHandler(wf, policyType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- np := obj.(*knet.NetworkPolicy)
- Expect(reflect.DeepEqual(np, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newNP := new.(*knet.NetworkPolicy)
- Expect(reflect.DeepEqual(newNP, added)).To(BeTrue())
- Expect(newNP.Spec.PolicyTypes).To(Equal([]knet.PolicyType{knet.PolicyTypeIngress}))
- },
- DeleteFunc: func(obj interface{}) {
- np := obj.(*knet.NetworkPolicy)
- Expect(reflect.DeepEqual(np, added)).To(BeTrue())
- },
- })
-
- policies = append(policies, added)
- policyWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Spec.PolicyTypes = []knet.PolicyType{knet.PolicyTypeIngress}
- policyWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- policies = policies[:0]
- policyWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(policyType, id)
- close(stop)
- })
-
- It("responds to endpoints add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newEndpoints("myendpoints", "default")
- id := addHandler(wf, endpointsType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- eps := obj.(*v1.Endpoints)
- Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newEPs := new.(*v1.Endpoints)
- Expect(reflect.DeepEqual(newEPs, added)).To(BeTrue())
- Expect(len(newEPs.Subsets)).To(Equal(1))
- },
- DeleteFunc: func(obj interface{}) {
- eps := obj.(*v1.Endpoints)
- Expect(reflect.DeepEqual(eps, added)).To(BeTrue())
- },
- })
-
- endpoints = append(endpoints, added)
- endpointsWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Subsets = append(added.Subsets, v1.EndpointSubset{
- Ports: []v1.EndpointPort{
- {
- Name: "foobar",
- Port: 1234,
- },
- },
- })
- endpointsWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- endpoints = endpoints[:0]
- endpointsWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(endpointsType, id)
- close(stop)
- })
-
- It("responds to service add/update/delete events", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newService("myservice", "default")
- id := addHandler(wf, serviceType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- service := obj.(*v1.Service)
- Expect(reflect.DeepEqual(service, added)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newService := new.(*v1.Service)
- Expect(reflect.DeepEqual(newService, added)).To(BeTrue())
- Expect(newService.Spec.ClusterIP).To(Equal("1.1.1.1"))
- },
- DeleteFunc: func(obj interface{}) {
- service := obj.(*v1.Service)
- Expect(reflect.DeepEqual(service, added)).To(BeTrue())
- },
- })
-
- services = append(services, added)
- serviceWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- added.Spec.ClusterIP = "1.1.1.1"
- serviceWatch.Modify(added)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
- services = services[:0]
- serviceWatch.Delete(added)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- wf.removeHandler(serviceType, id)
- close(stop)
- })
-
- It("stops processing events after the handler is removed", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- added := newNamespace("default")
- id := addHandler(wf, namespaceType, cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {},
- UpdateFunc: func(old, new interface{}) {},
- DeleteFunc: func(obj interface{}) {},
- })
-
- namespaces = append(namespaces, added)
- namespaceWatch.Add(added)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
- wf.RemoveNamespaceHandler(id)
-
- added2 := newNamespace("other")
- namespaces = append(namespaces, added2)
- namespaceWatch.Add(added2)
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
-
- added2.Status.Phase = v1.NamespaceTerminating
- namespaceWatch.Modify(added2)
- Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
- namespaces = []*v1.Namespace{added}
- namespaceWatch.Delete(added2)
- Consistently(func() int { return numDeleted }, 2).Should(Equal(0))
-
- close(stop)
- })
-
- It("filters correctly by label and namespace", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- passesFilter := newPod("pod1", "default")
- passesFilter.ObjectMeta.Labels["blah"] = "foobar"
- failsFilter := newPod("pod2", "default")
- failsFilter.ObjectMeta.Labels["blah"] = "baz"
- failsFilter2 := newPod("pod3", "otherns")
- failsFilter2.ObjectMeta.Labels["blah"] = "foobar"
-
- addFilteredHandler(wf,
- podType,
- "default",
- &metav1.LabelSelector{
- MatchLabels: map[string]string{"blah": "foobar"},
- },
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {
- newPod := new.(*v1.Pod)
- Expect(reflect.DeepEqual(newPod, passesFilter)).To(BeTrue())
- },
- DeleteFunc: func(obj interface{}) {
- pod := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(pod, passesFilter)).To(BeTrue())
- },
- })
-
- pods = append(pods, passesFilter)
- podWatch.Add(passesFilter)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
-
- // numAdded should remain 1
- pods = append(pods, failsFilter)
- podWatch.Add(failsFilter)
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
-
- // numAdded should remain 1
- pods = append(pods, failsFilter2)
- podWatch.Add(failsFilter2)
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
-
- passesFilter.Status.Phase = v1.PodFailed
- podWatch.Modify(passesFilter)
- Eventually(func() int { return numUpdated }, 2).Should(Equal(1))
-
- // numAdded should remain 1
- failsFilter.Status.Phase = v1.PodFailed
- podWatch.Modify(failsFilter)
- Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
-
- failsFilter2.Status.Phase = v1.PodFailed
- podWatch.Modify(failsFilter2)
- Consistently(func() int { return numUpdated }, 2).Should(Equal(1))
-
- pods = []*v1.Pod{failsFilter, failsFilter2}
- podWatch.Delete(passesFilter)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
-
- close(stop)
- })
-
- It("correctly handles object updates that cause filter changes", func() {
- wf, err := NewWatchFactory(fakeClient, stop)
- Expect(err).NotTo(HaveOccurred())
-
- pod := newPod("pod1", "default")
- pod.ObjectMeta.Labels["blah"] = "baz"
-
- equalPod := pod
- id := addFilteredHandler(wf,
- podType,
- "default",
- &metav1.LabelSelector{
- MatchLabels: map[string]string{"blah": "foobar"},
- },
- cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- p := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
- },
- UpdateFunc: func(old, new interface{}) {},
- DeleteFunc: func(obj interface{}) {
- p := obj.(*v1.Pod)
- Expect(reflect.DeepEqual(p, equalPod)).To(BeTrue())
- },
- })
-
- pods = append(pods, pod)
-
- // Pod doesn't pass filter; shouldn't be added
- podWatch.Add(pod)
- Consistently(func() int { return numAdded }, 2).Should(Equal(0))
-
- // Update pod to pass filter; should be treated as add. Need
- // to deep-copy pod when modifying because it's a pointer all
- // the way through when using FakeClient
- podCopy := pod.DeepCopy()
- podCopy.ObjectMeta.Labels["blah"] = "foobar"
- pods = []*v1.Pod{podCopy}
- equalPod = podCopy
- podWatch.Modify(podCopy)
- Eventually(func() int { return numAdded }, 2).Should(Equal(1))
-
- // Update pod to fail filter; should be treated as delete
- pod.ObjectMeta.Labels["blah"] = "baz"
- podWatch.Modify(pod)
- Eventually(func() int { return numDeleted }, 2).Should(Equal(1))
- Consistently(func() int { return numAdded }, 2).Should(Equal(1))
- Consistently(func() int { return numUpdated }, 2).Should(Equal(0))
-
- wf.RemovePodHandler(id)
- close(stop)
- })
-})
diff --git a/internal/pkg/kube/kube.go b/internal/pkg/kube/kube.go
index cc7c29b..e51963e 100644
--- a/internal/pkg/kube/kube.go
+++ b/internal/pkg/kube/kube.go
@@ -41,7 +41,7 @@ type Kube struct {
func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error {
logrus.Infof("Setting annotations %s=%s on pod %s", key, value, pod.Name)
patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
- _, err := k.KClient.Core().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData))
+ _, err := k.KClient.CoreV1().Pods(pod.Namespace).Patch(pod.Name, types.MergePatchType, []byte(patchData))
if err != nil {
logrus.Errorf("Error in setting annotation on pod %s/%s: %v", pod.Name, pod.Namespace, err)
}
@@ -52,7 +52,7 @@ func (k *Kube) SetAnnotationOnPod(pod *kapi.Pod, key, value string) error {
func (k *Kube) SetAnnotationOnNode(node *kapi.Node, key, value string) error {
logrus.Infof("Setting annotations %s=%s on node %s", key, value, node.Name)
patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
- _, err := k.KClient.Core().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData))
+ _, err := k.KClient.CoreV1().Nodes().Patch(node.Name, types.MergePatchType, []byte(patchData))
if err != nil {
logrus.Errorf("Error in setting annotation on node %s: %v", node.Name, err)
}
@@ -67,7 +67,7 @@ func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key,
ns.Name)
patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key,
value)
- _, err := k.KClient.Core().Namespaces().Patch(ns.Name,
+ _, err := k.KClient.CoreV1().Namespaces().Patch(ns.Name,
types.MergePatchType, []byte(patchData))
if err != nil {
logrus.Errorf("Error in setting annotation on namespace %s: %v",
@@ -78,7 +78,7 @@ func (k *Kube) SetAnnotationOnNamespace(ns *kapi.Namespace, key,
// GetAnnotationsOnPod obtains the pod annotations from kubernetes apiserver, given the name and namespace
func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, error) {
- pod, err := k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{})
+ pod, err := k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil, err
}
@@ -87,12 +87,12 @@ func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, e
// GetPod obtains the Pod resource from kubernetes apiserver, given the name and namespace
func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) {
- return k.KClient.Core().Pods(namespace).Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
// GetPods obtains the Pod resource from kubernetes apiserver, given the name and namespace
func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) {
- return k.KClient.Core().Pods(namespace).List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Pods(namespace).List(metav1.ListOptions{})
}
// GetPodsByLabels obtains the Pod resources from kubernetes apiserver,
@@ -100,42 +100,42 @@ func (k *Kube) GetPods(namespace string) (*kapi.PodList, error) {
func (k *Kube) GetPodsByLabels(namespace string, selector labels.Selector) (*kapi.PodList, error) {
options := metav1.ListOptions{}
options.LabelSelector = selector.String()
- return k.KClient.Core().Pods(namespace).List(options)
+ return k.KClient.CoreV1().Pods(namespace).List(options)
}
// GetNodes returns the list of all Node objects from kubernetes
func (k *Kube) GetNodes() (*kapi.NodeList, error) {
- return k.KClient.Core().Nodes().List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Nodes().List(metav1.ListOptions{})
}
// GetNode returns the Node resource from kubernetes apiserver, given its name
func (k *Kube) GetNode(name string) (*kapi.Node, error) {
- return k.KClient.Core().Nodes().Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
}
// GetService returns the Service resource from kubernetes apiserver, given its name and namespace
func (k *Kube) GetService(namespace, name string) (*kapi.Service, error) {
- return k.KClient.Core().Services(namespace).Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Services(namespace).Get(name, metav1.GetOptions{})
}
// GetEndpoints returns all the Endpoint resources from kubernetes
// apiserver, given namespace
func (k *Kube) GetEndpoints(namespace string) (*kapi.EndpointsList, error) {
- return k.KClient.Core().Endpoints(namespace).List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Endpoints(namespace).List(metav1.ListOptions{})
}
// GetNamespace returns the Namespace resource from kubernetes apiserver,
// given its name
func (k *Kube) GetNamespace(name string) (*kapi.Namespace, error) {
- return k.KClient.Core().Namespaces().Get(name, metav1.GetOptions{})
+ return k.KClient.CoreV1().Namespaces().Get(name, metav1.GetOptions{})
}
// GetNamespaces returns all Namespace resource from kubernetes apiserver
func (k *Kube) GetNamespaces() (*kapi.NamespaceList, error) {
- return k.KClient.Core().Namespaces().List(metav1.ListOptions{})
+ return k.KClient.CoreV1().Namespaces().List(metav1.ListOptions{})
}
// GetNetworkPolicies returns all network policy objects from kubernetes
func (k *Kube) GetNetworkPolicies(namespace string) (*kapisnetworking.NetworkPolicyList, error) {
- return k.KClient.Networking().NetworkPolicies(namespace).List(metav1.ListOptions{})
+ return k.KClient.NetworkingV1().NetworkPolicies(namespace).List(metav1.ListOptions{})
}
diff --git a/internal/pkg/ovn/common.go b/internal/pkg/ovn/common.go
index b504440..60cd202 100644
--- a/internal/pkg/ovn/common.go
+++ b/internal/pkg/ovn/common.go
@@ -3,69 +3,82 @@ package ovn
import (
"encoding/json"
"fmt"
- "github.com/sirupsen/logrus"
"math/big"
"math/rand"
"net"
+ logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+ "strings"
"time"
)
-func SetupDistributedRouter(name string) error {
+var log = logf.Log.WithName("ovn")
- // Make sure br-int is created.
- stdout, stderr, err := RunOVSVsctlUnix("--", "--may-exist", "add-br", "br-int")
- if err != nil {
- logrus.Errorf("Failed to create br-int, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
- return err
+func parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) {
+ var ovnNet []map[string]interface{}
+
+ if ovnnetwork == "" {
+ return nil, fmt.Errorf("parseOvnNetworkObject:error")
}
+
+ if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil {
+ return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork)
+ }
+
+ return ovnNet, nil
+}
+
+func setupDistributedRouter(name string) error {
+
// Create a single common distributed router for the cluster.
- stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes")
+ stdout, stderr, err := RunOVNNbctl("--", "--may-exist", "lr-add", name, "--", "set", "logical_router", name, "external_ids:ovn4nfv-cluster-router=yes")
if err != nil {
- logrus.Errorf("Failed to create a single common distributed router for the cluster, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ log.Error(err, "Failed to create a single common distributed router for the cluster", "stdout", stdout, "stderr", stderr)
return err
}
// Create a logical switch called "ovn4nfv-join" that will be used to connect gateway routers to the distributed router.
// The "ovn4nfv-join" will be allocated IP addresses in the range 100.64.1.0/24.
- stdout, stderr, err = RunOVNNbctlUnix("--may-exist", "ls-add", "ovn4nfv-join")
+ stdout, stderr, err = RunOVNNbctl("--may-exist", "ls-add", "ovn4nfv-join")
if err != nil {
- logrus.Errorf("Failed to create logical switch called \"ovn4nfv-join\", stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ log.Error(err, "Failed to create logical switch called \"ovn4nfv-join\"", "stdout", stdout, "stderr", stderr)
return err
}
// Connect the distributed router to "ovn4nfv-join".
- routerMac, stderr, err := RunOVNNbctlUnix("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac")
+ routerMac, stderr, err := RunOVNNbctl("--if-exist", "get", "logical_router_port", "rtoj-"+name, "mac")
if err != nil {
- logrus.Errorf("Failed to get logical router port rtoj-%v, stderr: %q, error: %v", name, stderr, err)
+ log.Error(err, "Failed to get logical router port rtoj-", "name", name, "stdout", stdout, "stderr", stderr)
return err
}
if routerMac == "" {
routerMac = generateMac()
- stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes")
+ stdout, stderr, err = RunOVNNbctl("--", "--may-exist", "lrp-add", name, "rtoj-"+name, routerMac, "100.64.1.1/24", "--", "set", "logical_router_port", "rtoj-"+name, "external_ids:connect_to_ovn4nfvjoin=yes")
if err != nil {
- logrus.Errorf("Failed to add logical router port rtoj-%v, stdout: %q, stderr: %q, error: %v", name, stdout, stderr, err)
+ log.Error(err, "Failed to add logical router port rtoj", "name", name, "stdout", stdout, "stderr", stderr)
return err
}
}
// Connect the switch "ovn4nfv-join" to the router.
- stdout, stderr, err = RunOVNNbctlUnix("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"")
+ stdout, stderr, err = RunOVNNbctl("--", "--may-exist", "lsp-add", "ovn4nfv-join", "jtor-"+name, "--", "set", "logical_switch_port", "jtor-"+name, "type=router", "options:router-port=rtoj-"+name, "addresses="+"\""+routerMac+"\"")
if err != nil {
- logrus.Errorf("Failed to add logical switch port to logical router, stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
+ log.Error(err, "Failed to add logical switch port to logical router", "stdout", stdout, "stderr", stderr)
return err
}
return nil
}
-func parseOvnNetworkObject(ovnnetwork string) ([]map[string]interface{}, error) {
- var ovnNet []map[string]interface{}
-
- if ovnnetwork == "" {
- return nil, fmt.Errorf("parseOvnNetworkObject:error")
+// Find if switch exists
+func findLogicalSwitch(name string) bool {
+ // get logical switch from OVN
+ output, stderr, err := RunOVNNbctl("--data=bare", "--no-heading",
+ "--columns=name", "find", "logical_switch", "name="+name)
+ if err != nil {
+ log.Error(err, "Error in obtaining list of logical switch", "stderr", stderr)
+ return false
}
- if err := json.Unmarshal([]byte(ovnnetwork), &ovnNet); err != nil {
- return nil, fmt.Errorf("parseOvnNetworkObject: failed to load ovn network err: %v | ovn network: %v", err, ovnnetwork)
+ if strings.Compare(name, output) == 0 {
+ return true
}
-
- return ovnNet, nil
+ return false
}
// generateMac generates mac address.
diff --git a/internal/pkg/ovn/ovn.go b/internal/pkg/ovn/ovn.go
index 470416b..dad4641 100644
--- a/internal/pkg/ovn/ovn.go
+++ b/internal/pkg/ovn/ovn.go
@@ -2,93 +2,174 @@ package ovn
import (
"fmt"
+ "github.com/mitchellh/mapstructure"
+ kapi "k8s.io/api/core/v1"
+ kexec "k8s.io/utils/exec"
"strings"
"time"
-
- "github.com/sirupsen/logrus"
- kapi "k8s.io/api/core/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
- "ovn4nfv-k8s-plugin/internal/pkg/factory"
- "ovn4nfv-k8s-plugin/internal/pkg/kube"
)
-// Controller structure is the object which holds the controls for starting
-// and reacting upon the watched resources (e.g. pods, endpoints)
type Controller struct {
- kube kube.Interface
- watchFactory *factory.WatchFactory
-
gatewayCache map[string]string
- // A cache of all logical switches seen by the watcher
- logicalSwitchCache map[string]bool
- // A cache of all logical ports seen by the watcher and
- // its corresponding logical switch
- logicalPortCache map[string]string
}
-// NewOvnController creates a new OVN controller for creating logical network
-// infrastructure and policy
-func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory) *Controller {
- return &Controller{
- kube: &kube.Kube{KClient: kubeClient},
- watchFactory: wf,
- logicalSwitchCache: make(map[string]bool),
- logicalPortCache: make(map[string]string),
- gatewayCache: make(map[string]string),
+const (
+ ovn4nfvRouterName = "ovn4nfv-master"
+ Ovn4nfvAnnotationTag = "k8s.plugin.opnfv.org/ovnInterfaces"
+)
+
+type netInterface struct {
+ Name string
+ Interface string
+ NetType string
+ DefaultGateway string
+ IPAddress string
+ MacAddress string
+}
+
+var ovnCtl *Controller
+
+// NewOvnController creates a new OVN controller for creating logical networks
+func NewOvnController(exec kexec.Interface) (*Controller, error) {
+
+ if exec == nil {
+ exec = kexec.New()
+ }
+
+ if err := SetExec(exec); err != nil {
+ log.Error(err, "Failed to initialize exec helper")
+ return nil, err
+ }
+
+ if err := SetupOvnUtils(); err != nil {
+ log.Error(err, "Failed to initialize OVN State")
+ return nil, err
+ }
+ ovnCtl = &Controller{
+ gatewayCache: make(map[string]string),
}
+ return ovnCtl, nil
}
-// Run starts the actual watching. Also initializes any local structures needed.
-func (oc *Controller) Run() error {
- fmt.Println("ovn4nfvk8s watching Pods")
- for _, f := range []func() error{oc.WatchPods} {
- if err := f(); err != nil {
- return err
- }
+// GetOvnController returns OVN controller for creating logical networks
+func GetOvnController() (*Controller, error) {
+ if ovnCtl != nil {
+ return ovnCtl, nil
}
- return nil
+ return nil, fmt.Errorf("OVN Controller not initialized")
}
-// WatchPods starts the watching of Pod resource and calls back the appropriate handler logic
-func (oc *Controller) WatchPods() error {
- _, err := oc.watchFactory.AddPodHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- pod := obj.(*kapi.Pod)
- if pod.Spec.NodeName != "" {
- oc.addLogicalPort(pod)
+// AddLogicalPorts adds ports to the Pod
+func (oc *Controller) AddLogicalPorts(pod *kapi.Pod, ovnNetObjs []map[string]interface{}) (key, value string) {
+
+ if ovnNetObjs == nil {
+ return
+ }
+
+ if pod.Spec.HostNetwork {
+ return
+ }
+
+ if _, ok := pod.Annotations[Ovn4nfvAnnotationTag]; ok {
+ log.Info("AddLogicalPorts : Pod annotation found")
+ return
+ }
+
+ var ovnString, outStr string
+ ovnString = "["
+ var ns netInterface
+ for _, net := range ovnNetObjs {
+
+ err := mapstructure.Decode(net, &ns)
+ if err != nil {
+ log.Error(err, "mapstruct error", "network", net)
+ return
+ }
+
+ if !findLogicalSwitch(ns.Name) {
+ log.Info("Logical Switch not found")
+ return
+ }
+ if ns.Interface == "" {
+ log.Info("Interface name must be provided")
+ return
+ }
+ if ns.DefaultGateway == "" {
+ ns.DefaultGateway = "false"
+ }
+ if ns.NetType == "" || ns.NetType != "provider" {
+ ns.NetType = "virtual"
+ }
+ if ns.NetType == "provider" {
+ if ns.IPAddress == "" {
+ log.Info("ipAddress must be provided for netType Provider")
+ return
}
- },
- UpdateFunc: func(old, newer interface{}) {
- podNew := newer.(*kapi.Pod)
- podOld := old.(*kapi.Pod)
- if podOld.Spec.NodeName == "" && podNew.Spec.NodeName != "" {
- oc.addLogicalPort(podNew)
+ if ns.DefaultGateway == "true" {
+ log.Info("defaultGateway not supported for provider network - Use ovnNetworkRoutes to add routes")
+ return
}
- },
- DeleteFunc: func(obj interface{}) {
- pod := obj.(*kapi.Pod)
- oc.deleteLogicalPort(pod)
- },
- }, oc.syncPods)
- return err
+
+ }
+
+ outStr = oc.addLogicalPortWithSwitch(pod, ns.Name, ns.IPAddress, ns.MacAddress, ns.Interface, ns.NetType)
+ if outStr == "" {
+ return
+ }
+ last := len(outStr) - 1
+ tmpString := outStr[:last]
+ tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + ns.DefaultGateway + "\\\""
+ tmpString += "," + "\\\"interface\\\":" + "\\\"" + ns.Interface + "\\\"}"
+ ovnString += tmpString
+ ovnString += ","
+ }
+ last := len(ovnString) - 1
+ ovnString = ovnString[:last]
+ ovnString += "]"
+ key = Ovn4nfvAnnotationTag
+ value = ovnString
+ return key, value
}
-func (oc *Controller) syncPods(pods []interface{}) {
+func (oc *Controller) DeleteLogicalPorts(name, namespace string) {
+
+ log.Info("Deleting pod", "name", name)
+ logicalPort := fmt.Sprintf("%s_%s", namespace, name)
+
+ // get the list of logical ports from OVN
+ stdout, stderr, err := RunOVNNbctl("--data=bare", "--no-heading",
+ "--columns=name", "find", "logical_switch_port", "external_ids:pod=true")
+ if err != nil {
+ log.Error(err, "Error in obtaining list of logical ports ", "stdout", stdout, "stderr", stderr)
+ return
+ }
+ log.Info("Deleting", "Existing Ports", stdout)
+ existingLogicalPorts := strings.Fields(stdout)
+ for _, existingPort := range existingLogicalPorts {
+ if strings.Contains(existingPort, logicalPort) {
+ // found, delete this logical port
+ log.Info("Deleting", "existingPort", existingPort)
+ stdout, stderr, err := RunOVNNbctl("--if-exists", "lsp-del",
+ existingPort)
+ if err != nil {
+ log.Error(err, "Error in deleting pod's logical port ", "stdout", stdout, "stderr", stderr)
+ }
+ }
+ }
+ return
}
func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string, error) {
var gatewayIPMaskStr, stderr string
var ok bool
var err error
- logrus.Infof("getGatewayFromSwitch: %s", logicalSwitch)
+ log.Info("getGatewayFromSwitch", "logicalSwitch", logicalSwitch)
if gatewayIPMaskStr, ok = oc.gatewayCache[logicalSwitch]; !ok {
- gatewayIPMaskStr, stderr, err = RunOVNNbctlUnix("--if-exists",
+ gatewayIPMaskStr, stderr, err = RunOVNNbctl("--if-exists",
"get", "logical_switch", logicalSwitch,
"external_ids:gateway_ip")
if err != nil {
- logrus.Errorf("Failed to get gateway IP: %s, stderr: %q, %v",
- gatewayIPMaskStr, stderr, err)
+ log.Error(err, "Failed to get gateway IP", "stderr", stderr, "gatewayIPMaskStr", gatewayIPMaskStr)
return "", "", err
}
if gatewayIPMaskStr == "" {
@@ -107,44 +188,6 @@ func (oc *Controller) getGatewayFromSwitch(logicalSwitch string) (string, string
return gatewayIP, mask, nil
}
-func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) {
-
- if pod.Spec.HostNetwork {
- return
- }
-
- logrus.Infof("Deleting pod: %s", pod.Name)
- logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name)
-
- // get the list of logical ports from OVN
- output, stderr, err := RunOVNNbctlUnix("--data=bare", "--no-heading",
- "--columns=name", "find", "logical_switch_port", "external_ids:pod=true")
- if err != nil {
- logrus.Errorf("Error in obtaining list of logical ports, "+
- "stderr: %q, err: %v",
- stderr, err)
- return
- }
- logrus.Infof("Exising Ports : %s. ", output)
- existingLogicalPorts := strings.Fields(output)
- for _, existingPort := range existingLogicalPorts {
- if strings.Contains(existingPort, logicalPort) {
- // found, delete this logical port
- logrus.Infof("Deleting: %s. ", existingPort)
- out, stderr, err := RunOVNNbctlUnix("--if-exists", "lsp-del",
- existingPort)
- if err != nil {
- logrus.Errorf("Error in deleting pod's logical port "+
- "stdout: %q, stderr: %q err: %v",
- out, stderr, err)
- } else {
- delete(oc.logicalPortCache, existingPort)
- }
- }
- }
- return
-}
-
func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipAddress, macAddress, interfaceName, netType string) (annotation string) {
var out, stderr string
var err error
@@ -153,9 +196,6 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
return
}
- if !oc.logicalSwitchCache[logicalSwitch] {
- oc.logicalSwitchCache[logicalSwitch] = true
- }
var portName string
if interfaceName != "" {
portName = fmt.Sprintf("%s_%s_%s", pod.Namespace, pod.Name, interfaceName)
@@ -163,7 +203,7 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
return
}
- logrus.Infof("Creating logical port for %s on switch %s", portName, logicalSwitch)
+ log.Info("Creating logical port for on switch", "portName", portName, "logicalSwitch", logicalSwitch)
if ipAddress != "" && macAddress != "" {
isStaticIP = true
@@ -174,7 +214,7 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
}
if isStaticIP {
- out, stderr, err = RunOVNNbctlUnix("--may-exist", "lsp-add",
+ out, stderr, err = RunOVNNbctl("--may-exist", "lsp-add",
logicalSwitch, portName, "--", "lsp-set-addresses", portName,
fmt.Sprintf("%s %s", macAddress, ipAddress), "--", "--if-exists",
"clear", "logical_switch_port", portName, "dynamic_addresses", "--", "set",
@@ -183,13 +223,11 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
"external-ids:logical_switch="+logicalSwitch,
"external-ids:pod=true")
if err != nil {
- logrus.Errorf("Failed to add logical port to switch "+
- "stdout: %q, stderr: %q (%v)",
- out, stderr, err)
+ log.Error(err, "Failed to add logical port to switch", "out", out, "stderr", stderr)
return
}
} else {
- out, stderr, err = RunOVNNbctlUnix("--wait=sb", "--",
+ out, stderr, err = RunOVNNbctl("--wait=sb", "--",
"--may-exist", "lsp-add", logicalSwitch, portName,
"--", "lsp-set-addresses",
portName, "dynamic", "--", "set",
@@ -198,37 +236,32 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
"external-ids:logical_switch="+logicalSwitch,
"external-ids:pod=true")
if err != nil {
- logrus.Errorf("Error while creating logical port %s "+
- "stdout: %q, stderr: %q (%v)",
- portName, out, stderr, err)
+ log.Error(err, "Error while creating logical port %s ", "portName", portName, "stdout", out, "stderr", stderr)
return
}
}
- oc.logicalPortCache[portName] = logicalSwitch
count := 30
for count > 0 {
if isStaticIP {
- out, stderr, err = RunOVNNbctlUnix("get",
+ out, stderr, err = RunOVNNbctl("get",
"logical_switch_port", portName, "addresses")
} else {
- out, stderr, err = RunOVNNbctlUnix("get",
+ out, stderr, err = RunOVNNbctl("get",
"logical_switch_port", portName, "dynamic_addresses")
}
if err == nil && out != "[]" {
break
}
if err != nil {
- logrus.Errorf("Error while obtaining addresses for %s - %v", portName,
- err)
+ log.Error(err, "Error while obtaining addresses for", "portName", portName)
return
}
time.Sleep(time.Second)
count--
}
if count == 0 {
- logrus.Errorf("Error while obtaining addresses for %s "+
- "stdout: %q, stderr: %q, (%v)", portName, out, stderr, err)
+ log.Error(err, "Error while obtaining addresses for", "portName", portName, "stdout", out, "stderr", stderr)
return
}
@@ -239,14 +272,14 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
outStr = strings.Trim(outStr, `"`)
addresses := strings.Split(outStr, " ")
if len(addresses) != 2 {
- logrus.Errorf("Error while obtaining addresses for %s", portName)
+ log.Info("Error while obtaining addresses for", "portName", portName)
return
}
if netType == "virtual" {
gatewayIP, mask, err := oc.getGatewayFromSwitch(logicalSwitch)
if err != nil {
- logrus.Errorf("Error obtaining gateway address for switch %s: %s", logicalSwitch, err)
+ log.Error(err, "Error obtaining gateway address for switch", "logicalSwitch", logicalSwitch)
return
}
annotation = fmt.Sprintf(`{\"ip_address\":\"%s/%s\", \"mac_address\":\"%s\", \"gateway_ip\": \"%s\"}`, addresses[1], mask, addresses[0], gatewayIP)
@@ -256,102 +289,3 @@ func (oc *Controller) addLogicalPortWithSwitch(pod *kapi.Pod, logicalSwitch, ipA
return annotation
}
-
-func (oc *Controller) findLogicalSwitch(name string) bool {
- // get logical switch from OVN
- output, stderr, err := RunOVNNbctlUnix("--data=bare", "--no-heading",
- "--columns=name", "find", "logical_switch", "name="+name)
- if err != nil {
- logrus.Errorf("Error in obtaining list of logical switch, "+
- "stderr: %q, err: %v",
- stderr, err)
- return false
- }
-
- if strings.Compare(name, output) == 0 {
- return true
- } else {
- logrus.Errorf("Error finding Switch %v", name)
- return false
- }
-}
-
-func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
- var logicalSwitch string
- var ipAddress, macAddress, interfaceName, defaultGateway, netType string
-
- annotation := pod.Annotations["ovnNetwork"]
-
- if annotation != "" {
- ovnNetObjs, err := parseOvnNetworkObject(annotation)
- if err != nil {
- logrus.Errorf("addLogicalPort : Error Parsing OvnNetwork List")
- return
- }
- var ovnString, outStr string
- ovnString = "["
- for _, net := range ovnNetObjs {
- logicalSwitch = net["name"].(string)
- if !oc.findLogicalSwitch(logicalSwitch) {
- logrus.Errorf("Logical Switch not found")
- return
- }
- if _, ok := net["interface"]; ok {
- interfaceName = net["interface"].(string)
- } else {
- logrus.Errorf("Interface name must be provided")
- return
- }
- if _, ok := net["ipAddress"]; ok {
- ipAddress = net["ipAddress"].(string)
- } else {
- ipAddress = ""
- }
- if _, ok := net["macAddress"]; ok {
- macAddress = net["macAddress"].(string)
- } else {
- macAddress = ""
- }
- if _, ok := net["defaultGateway"]; ok {
- defaultGateway = net["defaultGateway"].(string)
- } else {
- defaultGateway = "false"
- }
- if _, ok := net["netType"]; ok {
- netType = net["netType"].(string)
- } else {
- netType = "virtual"
- }
- if netType != "provider" && netType != "virtual" {
- logrus.Errorf("netType is not supported")
- return
- }
- if netType == "provider" && ipAddress == "" {
- logrus.Errorf("ipAddress must be provided for netType Provider")
- return
- }
- if netType == "provider" && defaultGateway == "true" {
- logrus.Errorf("defaultGateway not supported for provider network - Use ovnNetworkRoutes to add routes")
- return
- }
- outStr = oc.addLogicalPortWithSwitch(pod, logicalSwitch, ipAddress, macAddress, interfaceName, netType)
- if outStr == "" {
- return
- }
- last := len(outStr) - 1
- tmpString := outStr[:last]
- tmpString += "," + "\\\"defaultGateway\\\":" + "\\\"" + defaultGateway + "\\\""
- tmpString += "," + "\\\"interface\\\":" + "\\\"" + interfaceName + "\\\"}"
- ovnString += tmpString
- ovnString += ","
- }
- last := len(ovnString) - 1
- ovnString = ovnString[:last]
- ovnString += "]"
- logrus.Debugf("ovnIfaceList - %v", ovnString)
- err = oc.kube.SetAnnotationOnPod(pod, "ovnIfaceList", ovnString)
- if err != nil {
- logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err)
- }
- }
-}
diff --git a/internal/pkg/ovn/ovn_test.go b/internal/pkg/ovn/ovn_test.go
index bc33d35..6e38759 100644
--- a/internal/pkg/ovn/ovn_test.go
+++ b/internal/pkg/ovn/ovn_test.go
@@ -9,9 +9,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/client-go/kubernetes/fake"
"ovn4nfv-k8s-plugin/internal/pkg/config"
- "ovn4nfv-k8s-plugin/internal/pkg/factory"
ovntest "ovn4nfv-k8s-plugin/internal/pkg/testing"
. "github.com/onsi/ginkgo"
@@ -30,10 +28,10 @@ var _ = Describe("Add logical Port", func() {
var app *cli.App
BeforeEach(func() {
-
app = cli.NewApp()
app.Name = "test"
app.Flags = config.Flags
+
})
It("tests Pod", func() {
@@ -68,15 +66,15 @@ var _ = Describe("Add logical Port", func() {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
-
- err := SetExec(fexec)
+ oldSetupOvnUtils := SetupOvnUtils
+ // as we are exiting, revert ConfigureInterface back at end of function
+ defer func() { SetupOvnUtils = oldSetupOvnUtils }()
+ SetupOvnUtils = func() error {
+ return nil
+ }
+ ovnController, err := NewOvnController(fexec)
Expect(err).NotTo(HaveOccurred())
- fakeClient := &fake.Clientset{}
- var fakeWatchFactory factory.WatchFactory
-
- ovnController := NewOvnController(fakeClient, &fakeWatchFactory)
- Expect(err).NotTo(HaveOccurred())
var (
okPod = v1.Pod{
TypeMeta: metav1.TypeMeta{
@@ -84,8 +82,7 @@ var _ = Describe("Add logical Port", func() {
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
- Name: "ok",
- Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\" , \"defaultGateway\": \"true\"}]"},
+ Name: "ok",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@@ -97,8 +94,8 @@ var _ = Describe("Add logical Port", func() {
},
}
)
-
- ovnController.addLogicalPort(&okPod)
+ a := []map[string]interface{}{{"name": "ovn-prot-net", "interface": "net0"}}
+ ovnController.AddLogicalPorts(&okPod, a)
Expect(fexec.CommandCalls).To(Equal(len(fakeCmds)))
return nil
@@ -137,13 +134,14 @@ var _ = Describe("Add logical Port", func() {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
- err := SetExec(fexec)
+ oldSetupOvnUtils := SetupOvnUtils
+ // as we are exiting, revert ConfigureInterface back at end of function
+ defer func() { SetupOvnUtils = oldSetupOvnUtils }()
+ SetupOvnUtils = func() error {
+ return nil
+ }
+ ovnController, err := NewOvnController(fexec)
Expect(err).NotTo(HaveOccurred())
-
- fakeClient := &fake.Clientset{}
- var fakeWatchFactory factory.WatchFactory
-
- ovnController := NewOvnController(fakeClient, &fakeWatchFactory)
var (
okPod = v1.Pod{
TypeMeta: metav1.TypeMeta{
@@ -151,8 +149,7 @@ var _ = Describe("Add logical Port", func() {
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
- Name: "ok",
- Annotations: map[string]string{"ovnNetwork": "[{ \"name\": \"ovn-prot-net\", \"interface\": \"net0\", \"netType\": \"provider\", \"ipAddress\": \"192.168.1.3/24\", \"macAddress\": \"0a:00:00:00:00:01\" }]"},
+ Name: "ok",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@@ -164,7 +161,8 @@ var _ = Describe("Add logical Port", func() {
},
}
)
- ovnController.addLogicalPort(&okPod)
+ a := []map[string]interface{}{{"name": "ovn-prot-net", "interface": "net0", "netType": "provider", "ipAddress": "192.168.1.3/24", "macAddress": "0a:00:00:00:00:01"}}
+ ovnController.AddLogicalPorts(&okPod, a)
Expect(fexec.CommandCalls).To(Equal(len(fakeCmds)))
return nil
diff --git a/internal/pkg/ovn/utils.go b/internal/pkg/ovn/utils.go
index 1700bf8..2478ac2 100644
--- a/internal/pkg/ovn/utils.go
+++ b/internal/pkg/ovn/utils.go
@@ -3,55 +3,53 @@ package ovn
import (
"bytes"
"fmt"
+ kexec "k8s.io/utils/exec"
+ "os"
"strings"
"time"
- "unicode"
-
- "github.com/sirupsen/logrus"
- kexec "k8s.io/utils/exec"
)
const (
ovsCommandTimeout = 15
- ovsVsctlCommand = "ovs-vsctl"
- ovsOfctlCommand = "ovs-ofctl"
ovnNbctlCommand = "ovn-nbctl"
- ipCommand = "ip"
)
// Exec runs various OVN and OVS utilities
type execHelper struct {
exec kexec.Interface
- ofctlPath string
- vsctlPath string
nbctlPath string
- ipPath string
+ hostIP string
+ hostPort string
}
var runner *execHelper
+// SetupOvnUtils does internal OVN initialization
+var SetupOvnUtils = func() error {
+ runner.hostIP = os.Getenv("HOST_IP")
+ // OVN Host Port
+ runner.hostPort = "6641"
+ log.Info("Host Port", "IP", runner.hostIP, "Port", runner.hostPort)
+
+ // Setup Distributed Router
+ err := setupDistributedRouter(ovn4nfvRouterName)
+ if err != nil {
+ log.Error(err, "Failed to initialize OVN Distributed Router")
+ return err
+ }
+ return nil
+}
+
// SetExec validates executable paths and saves the given exec interface
// to be used for running various OVS and OVN utilites
func SetExec(exec kexec.Interface) error {
var err error
runner = &execHelper{exec: exec}
- runner.ofctlPath, err = exec.LookPath(ovsOfctlCommand)
- if err != nil {
- return err
- }
- runner.vsctlPath, err = exec.LookPath(ovsVsctlCommand)
- if err != nil {
- return err
- }
runner.nbctlPath, err = exec.LookPath(ovnNbctlCommand)
if err != nil {
return err
}
- runner.ipPath, err = exec.LookPath(ipCommand)
- if err != nil {
- return err
- }
return nil
}
@@ -65,7 +63,6 @@ func runOVNretry(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer,
if err == nil {
return stdout, stderr, err
}
-
// Connection refused
// Master may not be up so keep trying
if strings.Contains(stderr.String(), "Connection refused") {
@@ -87,34 +84,29 @@ func run(cmdPath string, args ...string) (*bytes.Buffer, *bytes.Buffer, error) {
cmd := runner.exec.Command(cmdPath, args...)
cmd.SetStdout(stdout)
cmd.SetStderr(stderr)
- logrus.Debugf("exec: %s %s", cmdPath, strings.Join(args, " "))
+ log.Info("exec:", "cmdPath", cmdPath, "args", strings.Join(args, " "))
err := cmd.Run()
if err != nil {
- logrus.Debugf("exec: %s %s => %v", cmdPath, strings.Join(args, " "), err)
+ log.Error(err, "exec:", "cmdPath", cmdPath, "args", strings.Join(args, " "))
}
return stdout, stderr, err
}
-// RunOVSVsctl runs a command via ovs-vsctl.
-func RunOVSVsctlUnix(args ...string) (string, string, error) {
- cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)}
- cmdArgs = append(cmdArgs, args...)
- stdout, stderr, err := run(runner.vsctlPath, cmdArgs...)
- return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err
-}
-
-// RunOVNNbctlUnix runs command via ovn-nbctl, with ovn-nbctl using the unix
-// domain sockets to connect to the ovsdb-server backing the OVN NB database.
-func RunOVNNbctlUnix(args ...string) (string, string, error) {
- cmdArgs := []string{fmt.Sprintf("--timeout=%d", ovsCommandTimeout)}
+// RunOVNSbctlWithTimeout runs command via ovn-nbctl with a specific timeout
+func RunOVNNbctlWithTimeout(timeout int, args ...string) (string, string, error) {
+ var cmdArgs []string
+ if len(runner.hostIP) > 0 {
+ cmdArgs = []string{
+ fmt.Sprintf("--db=tcp:%s:%s", runner.hostIP, runner.hostPort),
+ }
+ }
+ cmdArgs = append(cmdArgs, fmt.Sprintf("--timeout=%d", timeout))
cmdArgs = append(cmdArgs, args...)
stdout, stderr, err := runOVNretry(runner.nbctlPath, cmdArgs...)
- return strings.Trim(strings.TrimFunc(stdout.String(), unicode.IsSpace), "\""),
- stderr.String(), err
+ return strings.Trim(strings.TrimSpace(stdout.String()), "\""), stderr.String(), err
}
-// RunIP runs a command via the iproute2 "ip" utility
-func RunIP(args ...string) (string, string, error) {
- stdout, stderr, err := run(runner.ipPath, args...)
- return strings.TrimSpace(stdout.String()), stderr.String(), err
+// RunOVNNbctl runs a command via ovn-nbctl.
+func RunOVNNbctl(args ...string) (string, string, error) {
+ return RunOVNNbctlWithTimeout(ovsCommandTimeout, args...)
}