summaryrefslogtreecommitdiffstats
path: root/clover/clovisor/libclovisor/clovisor_k8s.go
diff options
context:
space:
mode:
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_k8s.go')
-rw-r--r--clover/clovisor/libclovisor/clovisor_k8s.go272
1 files changed, 272 insertions, 0 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go
new file mode 100644
index 0000000..a53e308
--- /dev/null
+++ b/clover/clovisor/libclovisor/clovisor_k8s.go
@@ -0,0 +1,272 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ core_v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+type ClovisorK8s struct {
+ client *kubernetes.Clientset
+ config *rest.Config
+}
+
+type monitoring_info_t struct {
+ namespace string
+ svc_name string
+ pod_name string
+ container_name string
+ protocols []string
+ port_num uint32
+ pod_ip string
+}
+
+var DEFAULT_NAMESPACE = "default"
+var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"}
+
+func K8s_client_init(nodeName string) (*ClovisorK8s, error) {
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ client, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ return &ClovisorK8s{
+ client: client,
+ config: config,
+ }, nil
+}
+
+func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*monitoring_info_t,
+ error) {
+
+ labels_list, err := get_cfg_labels(nodeName)
+ if err != nil {
+ fmt.Printf("Error getting cfg labels: %v\n", err)
+ return nil, err
+ }
+
+ mon_info_map := client.get_monitoring_pods(nodeName, labels_list)
+ if mon_info_map == nil {
+ return nil, errors.New("monitoring info empty")
+ }
+
+ return mon_info_map, nil
+}
+
+func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service,
+ namespace string) (*core_v1.PodList, error) {
+ set := labels.Set(svc.Spec.Selector)
+ //label := strings.Split(set.AsSelector().String(), ",")[0]
+ //fmt.Printf("Trying to get pods for service %v with label %v from %v\n", svc.GetName(), label, set.AsSelector().String())
+ listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()}
+ //listOptions := metav1.ListOptions{LabelSelector: label}
+ return client.client.CoreV1().Pods(namespace).List(listOptions)
+}
+
+func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
+ labels_list []string) (map[string]*monitoring_info_t) {
+ /*
+ * Three cases:
+ * 1.) no configured namespaces, monitoring all pods in default namesapce
+ * 2.) if any config only has namespace, monitoring all pods in namespace
+ * 3.) label is configured, only monitor pods with that label
+ */
+ var namespace string
+ ns_svc_map := make(map[string][]*core_v1.ServiceList)
+ monitoring_info := make(map[string]*monitoring_info_t)
+ if len(labels_list) == 0 {
+ // TODO(s3wong): set it to 'default'
+ //namespace = "linux-foundation-gke"
+ namespace = "default"
+ if svcs_list, err :=
+ client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
+ err != nil {
+ fmt.Printf("Error fetching service list for namespace %s\n",
+ namespace)
+ return nil
+ } else {
+ /*
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ */
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list)
+ }
+ } else {
+ for _, label_str := range labels_list {
+ var label_selector string
+ namespace, key, value := parse_label_cfg(label_str)
+ if len(namespace) == 0 {
+ fmt.Printf("Error in config: %s not a valid config\n", label_str)
+ continue
+ }
+ if len(key) == 0 {
+ fmt.Printf("Namespace only config for %s\n", namespace)
+ } else {
+ label_selector = fmt.Sprintf("%s=%s", key, value)
+ }
+ if svc_list, err :=
+ client.client.CoreV1().Services(namespace).List(metav1.ListOptions{
+ LabelSelector: label_selector,
+ }); err != nil {
+ fmt.Printf("Error listing services with label %v:%v:%v - %v\n",
+ key, value, namespace, err.Error())
+ continue
+ } else {
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list)
+ }
+ }
+ }
+
+ for ns, svc_slice := range ns_svc_map {
+ for _, svc_list_ := range svc_slice {
+ for _, svc := range svc_list_.Items {
+ if ns == "default" && svc.GetName() == "kubernetes" {
+ continue
+ }
+ //fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName())
+ //var svc_port_map = map[string]core_v1.ServicePort{}
+ var svc_port_map = map[string][]string{}
+ for _, svc_port := range svc.Spec.Ports {
+ if len(svc_port.Name) > 0 {
+ svc_protos := strings.Split(svc_port.Name, "-")
+ for _, proto := range svc_protos {
+ if err := loadProtoParser(proto, false); err == nil {
+ for _, sp := range SUPPORTED_PROTOCOLS {
+ if strings.Contains(proto, sp) {
+ target_port := svc_port.TargetPort.String()
+ svc_port_map[target_port] = append(svc_port_map[target_port], proto)
+ }
+ }
+ } else {
+ fmt.Printf("Unsupported protocol: %v\n", proto)
+ }
+ }
+ }
+ }
+ if len(svc_port_map) == 0 {
+ fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName())
+ continue
+ } else {
+ fmt.Printf("svc_port_map for service %v is %v\n", svc.GetName(), svc_port_map)
+ }
+ //fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName())
+ pod_list, err := client.getPodsForSvc(&svc, ns)
+ if err != nil {
+ fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+ continue
+ }
+ /*
+ labelSet := labels.Set(svc.Spec.Selector)
+ pod_list, err := client.client.CoreV1().Pods(ns).List(metav1.ListOptions{})
+ if err != nil {
+ fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+ continue
+ }
+ */
+ for _, pod := range pod_list.Items {
+ if pod.Spec.NodeName == nodeName {
+ for _, container := range pod.Spec.Containers {
+ var port_num uint32
+ var tp_string string
+ for _, port := range container.Ports {
+ port_num = uint32(port.ContainerPort)
+ tp_string = strconv.Itoa(int(port_num))
+ if _, in := svc_port_map[tp_string]; !in {
+ continue
+ }
+ pod_name := pod.GetName()
+ monitoring_info[pod_name] = &(monitoring_info_t{})
+ monitoring_info[pod_name].namespace = ns
+ monitoring_info[pod_name].svc_name = svc.GetName()
+ monitoring_info[pod_name].pod_name = pod_name
+ monitoring_info[pod_name].container_name = container.Name
+ monitoring_info[pod_name].port_num = port_num
+ monitoring_info[pod_name].protocols = svc_port_map[tp_string]
+ monitoring_info[pod_name].pod_ip = pod.Status.PodIP
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return monitoring_info
+}
+
+func (client *ClovisorK8s) exec_command(command string,
+ monitoring_info *monitoring_info_t) (string, error) {
+
+ // Following code based on:
+ // https://stackoverflow.com/questions/43314689/example-of-exec-in-k8ss-pod-by-using-go-client
+ // https://github.com/a4abhishek/Client-Go-Examples/blob/master/exec_to_pod/exec_to_pod.go
+ exec_req := client.client.CoreV1().RESTClient().Post().
+ Resource("pods").
+ Name(monitoring_info.pod_name).
+ Namespace(monitoring_info.namespace).
+ SubResource("exec")
+ scheme := runtime.NewScheme()
+ if err := core_v1.AddToScheme(scheme); err != nil {
+ fmt.Printf("Error in exec pods: %v\n", err.Error())
+ return "", err
+ }
+
+ parameterCodec := runtime.NewParameterCodec(scheme)
+ exec_req.VersionedParams(&core_v1.PodExecOptions{
+ Command: strings.Fields(command),
+ Container: monitoring_info.container_name,
+ Stdin: false,
+ Stdout: true,
+ Stderr: true,
+ TTY: false,
+ }, parameterCodec)
+
+ exec, err := remotecommand.NewSPDYExecutor(client.config, "POST", exec_req.URL())
+ if err != nil {
+ fmt.Printf("Error in remotecommand exec: %v\n", err.Error())
+ return "", err
+ }
+
+ var stdout, stderr bytes.Buffer
+ err = exec.Stream(remotecommand.StreamOptions{
+ Stdin: nil,
+ Stdout: &stdout,
+ Stderr: &stderr,
+ Tty: false,
+ })
+ if err != nil {
+ fmt.Printf("Error in exec stream: %v\n", err.Error())
+ return "", err
+ }
+
+ stdout_no_newline := strings.TrimSuffix(stdout.String(), "\n")
+ return stdout_no_newline, nil
+}