summaryrefslogtreecommitdiffstats
path: root/clover/clovisor/libclovisor/clovisor_k8s.go
diff options
context:
space:
mode:
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_k8s.go')
-rw-r--r--clover/clovisor/libclovisor/clovisor_k8s.go251
1 files changed, 251 insertions, 0 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go
new file mode 100644
index 0000000..85b0ea4
--- /dev/null
+++ b/clover/clovisor/libclovisor/clovisor_k8s.go
@@ -0,0 +1,251 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+ "bytes"
+ "fmt"
+ "strconv"
+ "strings"
+
+ core_v1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+type ClovisorK8s struct {
+ client *kubernetes.Clientset
+ config *rest.Config
+}
+
+type monitoring_info_t struct {
+ namespace string
+ svc_name string
+ pod_name string
+ container_name string
+ protocol string
+ port_num uint32
+}
+
+var DEFAULT_NAMESPACE = "default"
+
+func K8s_client_init(nodeName string) (*ClovisorK8s, error) {
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ client, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ return &ClovisorK8s{
+ client: client,
+ config: config,
+ }, nil
+}
+
+func parse_label_cfg(label_cfg string) (string, string, string) {
+ label_slice := strings.Split(label_cfg, ":")
+ if len(label_slice) == 1 {
+ return label_slice[0], "", ""
+ }
+ return label_slice[0], label_slice[1], label_slice[2]
+}
+
+func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*monitoring_info_t,
+ error) {
+
+ labels_list, err := get_cfg_labels(nodeName)
+ if err != nil {
+ fmt.Printf("Error getting cfg labels: %v\n", err)
+ return nil, err
+ }
+
+ namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list)
+ if err != nil {
+ return nil, err
+ }
+
+ mon_info_map := make(map[string]*monitoring_info_t)
+ for idx, _ := range svcs {
+ svc := svcs[idx]
+ pod := pods[idx]
+ mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod)
+ for k, v := range mon_info {
+ mon_info_map[k] = v
+ }
+ }
+ return mon_info_map, nil
+}
+
+func (client *ClovisorK8s) fetch_svcs_pods(nodeName string,
+ labels_list []string) (string,
+ []*core_v1.ServiceList,
+ []*core_v1.PodList, error) {
+ namespace := DEFAULT_NAMESPACE
+ /*
+ * Three cases:
+ * 1.) no configured namespaces, monitoring all pods in default namesapce
+ * 2.) if any config only has namespace, monitoring all pods in namespace
+ * 3.) label is configured, only monitor pods with that label
+ */
+ var svcs []*core_v1.ServiceList
+ var pods []*core_v1.PodList
+ if len(labels_list) == 0 {
+ if svcs_list, err :=
+ client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
+ err != nil {
+ fmt.Printf("Error fetching service list for namespace %s\n",
+ namespace)
+ return namespace, nil, nil, err
+ } else {
+ svcs = append(svcs, svcs_list)
+ }
+
+ if pods_list, err :=
+ client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{});
+ err != nil {
+ fmt.Printf("Error fetching pods list for namespace %s\n",
+ namespace)
+ return namespace, nil, nil, err
+ } else {
+ pods = append(pods, pods_list)
+ }
+ } else {
+ for _, label_str := range labels_list {
+ var label_selector string
+ namespace, key, value := parse_label_cfg(label_str)
+ if len(namespace) == 0 {
+ fmt.Printf("Error in config: %s not a valid config\n", label_str)
+ continue
+ }
+ if len(key) == 0 {
+ fmt.Printf("Namespace only config for %s\n", namespace)
+ } else {
+ label_selector = fmt.Sprintf("%s=%s", key, value)
+ }
+ if svc_list, err :=
+ client.client.CoreV1().Services(namespace).List(metav1.ListOptions{
+ LabelSelector: label_selector,
+ }); err != nil {
+ fmt.Printf("Error listing services with label %v:%v:%v - %v\n",
+ key, value, namespace, err.Error())
+ continue
+ } else {
+ svcs = append(svcs, svc_list)
+ }
+ if pod_list, err :=
+ client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{
+ LabelSelector: label_selector,
+ }); err != nil {
+ fmt.Printf("Error listing pods with label %v:%v:%v - %v\n",
+ key, value, namespace, err.Error())
+ continue
+ } else {
+ pods = append(pods, pod_list)
+ }
+ }
+ }
+ return namespace, svcs, pods, nil
+}
+
+func (client *ClovisorK8s) get_monitoring_pods(
+ namespace string,
+ node_name string,
+ svcs *core_v1.ServiceList,
+ pods *core_v1.PodList) (map[string]*monitoring_info_t) {
+ monitoring_info := make(map[string]*monitoring_info_t)
+ svc_map := make(map[string][]string)
+
+ for _, svc := range svcs.Items {
+ svc_port := svc.Spec.Ports[0]
+ target_port := svc_port.TargetPort.String()
+ svc_port_name := svc_port.Name
+ svc_map[target_port] = append(svc_map[target_port], svc.GetName())
+ if len(svc_port_name) > 0 {
+ svc_map[target_port] = append(svc_map[target_port], svc_port_name)
+ } else {
+ svc_map[target_port] = append(svc_map[target_port], "tcp")
+ }
+ }
+ for _, v := range pods.Items {
+ if v.Spec.NodeName == node_name {
+ pod_name := v.GetName()
+ monitoring_info[pod_name] = &(monitoring_info_t{})
+ monitoring_info[pod_name].namespace = namespace
+ monitoring_info[pod_name].pod_name = pod_name
+ monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name
+ monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort)
+ tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num))
+ svc_array := svc_map[tp_string]
+ monitoring_info[pod_name].svc_name = svc_array[0]
+ if (strings.Contains(svc_array[1], "-")) {
+ monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")]
+ } else {
+ monitoring_info[pod_name].protocol = svc_array[1]
+ }
+ }
+ }
+ return monitoring_info
+}
+
+func (client *ClovisorK8s) exec_command(command string,
+ monitoring_info *monitoring_info_t) (string, error) {
+
+ // Following code based on:
+ // https://stackoverflow.com/questions/43314689/example-of-exec-in-k8ss-pod-by-using-go-client
+ // https://github.com/a4abhishek/Client-Go-Examples/blob/master/exec_to_pod/exec_to_pod.go
+ exec_req := client.client.CoreV1().RESTClient().Post().
+ Resource("pods").
+ Name(monitoring_info.pod_name).
+ Namespace(monitoring_info.namespace).
+ SubResource("exec")
+ scheme := runtime.NewScheme()
+ if err := core_v1.AddToScheme(scheme); err != nil {
+ fmt.Printf("Error in exec pods: %v\n", err.Error())
+ return "", err
+ }
+
+ parameterCodec := runtime.NewParameterCodec(scheme)
+ exec_req.VersionedParams(&core_v1.PodExecOptions{
+ Command: strings.Fields(command),
+ Container: monitoring_info.container_name,
+ Stdin: false,
+ Stdout: true,
+ Stderr: true,
+ TTY: false,
+ }, parameterCodec)
+
+ exec, err := remotecommand.NewSPDYExecutor(client.config, "POST", exec_req.URL())
+ if err != nil {
+ fmt.Printf("Error in remotecommand exec: %v\n", err.Error())
+ return "", err
+ }
+
+ var stdout, stderr bytes.Buffer
+ err = exec.Stream(remotecommand.StreamOptions{
+ Stdin: nil,
+ Stdout: &stdout,
+ Stderr: &stderr,
+ Tty: false,
+ })
+ if err != nil {
+ fmt.Printf("Error in exec stream: %v\n", err.Error())
+ return "", err
+ }
+
+ stdout_no_newline := strings.TrimSuffix(stdout.String(), "\n")
+ return stdout_no_newline, nil
+}