path: root/pkg
diff options
Diffstat (limited to 'pkg')
3 files changed, 233 insertions, 0 deletions
diff --git a/pkg/controller/add_pod.go b/pkg/controller/add_pod.go
new file mode 100644
index 0000000..cc5f562
--- /dev/null
+++ b/pkg/controller/add_pod.go
@@ -0,0 +1,10 @@
+package controller
+import (
+ "ovn4nfv-k8s-plugin/pkg/controller/pod"
+func init() {
+ // AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
+ AddToManagerFuncs = append(AddToManagerFuncs, pod.Add)
diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go
new file mode 100644
index 0000000..7c069f3
--- /dev/null
+++ b/pkg/controller/controller.go
@@ -0,0 +1,18 @@
+package controller
+import (
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+// AddToManagerFuncs is a list of functions to add all Controllers to the Manager
+var AddToManagerFuncs []func(manager.Manager) error
+// AddToManager adds all Controllers to the Manager
+func AddToManager(m manager.Manager) error {
+ for _, f := range AddToManagerFuncs {
+ if err := f(m); err != nil {
+ return err
+ }
+ }
+ return nil
diff --git a/pkg/controller/pod/pod_controller.go b/pkg/controller/pod/pod_controller.go
new file mode 100644
index 0000000..8792985
--- /dev/null
+++ b/pkg/controller/pod/pod_controller.go
@@ -0,0 +1,205 @@
+package pod
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
+ "ovn4nfv-k8s-plugin/internal/pkg/ovn"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ "sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/event"
+ "sigs.k8s.io/controller-runtime/pkg/handler"
+ "sigs.k8s.io/controller-runtime/pkg/manager"
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
+ logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
+ "sigs.k8s.io/controller-runtime/pkg/source"
+var log = logf.Log.WithName("controller_pod")
+const (
+ nfnNetworkAnnotation = "k8s.plugin.opnfv.org/nfn-network"
+type nfnNetwork struct {
+ Type string "json:\"type\""
+ Interface []map[string]interface{} "json:\"interface\""
+// Add creates a new Pod Controller and adds it to the Manager. The Manager will set fields on the Controller
+// and Start it when the Manager is Started.
+func Add(mgr manager.Manager) error {
+ return add(mgr, newReconciler(mgr))
+// newReconciler returns a new reconcile.Reconciler
+func newReconciler(mgr manager.Manager) reconcile.Reconciler {
+ return &ReconcilePod{client: mgr.GetClient(), scheme: mgr.GetScheme()}
+// add adds a new Controller to mgr with r as the reconcile.Reconciler
+func add(mgr manager.Manager, r reconcile.Reconciler) error {
+ // Create a new Controller that will call the provided Reconciler function in response
+ // to events.
+ c, err := controller.New("pod-controller", mgr, controller.Options{Reconciler: r})
+ if err != nil {
+ return err
+ }
+ // Define Predicates On Create and Update function
+ p := predicate.Funcs{
+ UpdateFunc: func(e event.UpdateEvent) bool {
+ annotaion := e.MetaNew.GetAnnotations()
+ // The object doesn't contain annotation ,nfnNetworkAnnotation so the event will be
+ // ignored.
+ if _, ok := annotaion[nfnNetworkAnnotation]; !ok {
+ return false
+ }
+ // If pod is already processed by OVN don't add event
+ if _, ok := annotaion[ovn.Ovn4nfvAnnotationTag]; ok {
+ return false
+ }
+ return true
+ },
+ CreateFunc: func(e event.CreateEvent) bool {
+ // The object doesn't contain annotation ,nfnNetworkAnnotation so the event will be
+ // ignored.
+ annotaion := e.Meta.GetAnnotations()
+ if _, ok := annotaion[nfnNetworkAnnotation]; !ok {
+ return false
+ }
+ return true
+ },
+ DeleteFunc: func(e event.DeleteEvent) bool {
+ // The object doesn't contain annotation ,nfnNetworkAnnotation so the event will be
+ // ignored.
+ annotaion := e.Meta.GetAnnotations()
+ if _, ok := annotaion[nfnNetworkAnnotation]; !ok {
+ return false
+ }
+ return true
+ },
+ }
+ // Watch for Pod create / update / delete events and call Reconcile
+ err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, p)
+ if err != nil {
+ return err
+ }
+ return nil
+// blank assignment to verify that ReconcuilePod implements reconcile.Reconciler
+var _ reconcile.Reconciler = &ReconcilePod{}
+// ReconcilePod reconciles a ProviderNetwork object
+type ReconcilePod struct {
+ // This client, initialized using mgr.Client() above, is a split client
+ // that reads objects from the cache and writes to the apiserver
+ client client.Client
+ scheme *runtime.Scheme
+// Reconcile function
+// The Controller will requeue the Request to be processed again if the returned error is non-nil or
+// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
+func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, error) {
+ reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
+ reqLogger.Info("Enter Reconciling Pod")
+ // Fetch the Pod instance
+ instance := &corev1.Pod{}
+ err := r.client.Get(context.TODO(), request.NamespacedName, instance)
+ if err != nil {
+ if errors.IsNotFound(err) {
+ // Request object not found, could have been deleted after reconcile request.
+ // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
+ // Return and don't requeue
+ if instance.Name == "" || instance.Namespace == "" {
+ return reconcile.Result{}, nil
+ }
+ r.deleteLogicalPorts(request.Name, request.Namespace)
+ return reconcile.Result{}, nil
+ }
+ // Error reading the object - requeue the request.
+ return reconcile.Result{}, err
+ }
+ if instance.Name == "" || instance.Namespace == "" {
+ return reconcile.Result{}, nil
+ }
+ err = r.addLogicalPorts(instance)
+ if err != nil && err.Error() == "Failed to add ports" {
+ // Requeue the object
+ return reconcile.Result{}, err
+ }
+ reqLogger.Info("Exit Reconciling Pod")
+ return reconcile.Result{}, nil
+// annotatePod annotates pod with the given annotations
+func (r *ReconcilePod) setPodAnnotation(pod *corev1.Pod, key, value string) error {
+ patchData := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, value)
+ err := r.client.Patch(context.TODO(), pod, client.ConstantPatch(types.MergePatchType, []byte(patchData)))
+ if err != nil {
+ log.Error(err, "Updating pod failed", "pod", pod, "key", key, "value", value)
+ return err
+ }
+ return nil
+func (r *ReconcilePod) addLogicalPorts(pod *corev1.Pod) error {
+ nfn, err := r.readPodAnnotation(pod)
+ if err != nil {
+ return err
+ }
+ switch {
+ case nfn.Type == "ovn4nfv":
+ ovnCtl, err := ovn.GetOvnController()
+ if err != nil {
+ return err
+ }
+ key, value := ovnCtl.AddLogicalPorts(pod, nfn.Interface)
+ if len(key) > 0 {
+ return r.setPodAnnotation(pod, key, value)
+ }
+ return fmt.Errorf("Failed to add ports")
+ default:
+ return fmt.Errorf("Unsupported Networking type %s", nfn.Type)
+ // Add other types here
+ }
+func (r *ReconcilePod) deleteLogicalPorts(name, namesapce string) error {
+ // Run delete for all controllers; pod annonations inaccessible
+ ovnCtl, err := ovn.GetOvnController()
+ if err != nil {
+ return err
+ }
+ ovnCtl.DeleteLogicalPorts(name, namesapce)
+ return nil
+ // Add other types here
+func (r *ReconcilePod) readPodAnnotation(pod *corev1.Pod) (*nfnNetwork, error) {
+ annotaion, ok := pod.Annotations[nfnNetworkAnnotation]
+ if !ok {
+ return nil, fmt.Errorf("Invalid annotations")
+ }
+ var nfn nfnNetwork
+ err := json.Unmarshal([]byte(annotaion), &nfn)
+ if err != nil {
+ log.Error(err, "Invalid nfn annotaion", "annotaiton", annotaion)
+ return nil, err
+ }
+ return &nfn, nil