diff options
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_k8s.go')
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_k8s.go | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go new file mode 100644 index 0000000..85b0ea4 --- /dev/null +++ b/clover/clovisor/libclovisor/clovisor_k8s.go @@ -0,0 +1,251 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package clovisor + +import ( + "bytes" + "fmt" + "strconv" + "strings" + + core_v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +type ClovisorK8s struct { + client *kubernetes.Clientset + config *rest.Config +} + +type monitoring_info_t struct { + namespace string + svc_name string + pod_name string + container_name string + protocol string + port_num uint32 +} + +var DEFAULT_NAMESPACE = "default" + +func K8s_client_init(nodeName string) (*ClovisorK8s, error) { + config, err := rest.InClusterConfig() + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + + return &ClovisorK8s{ + client: client, + config: config, + }, nil +} + +func parse_label_cfg(label_cfg string) (string, string, string) { + label_slice := strings.Split(label_cfg, ":") + if len(label_slice) == 1 { + return label_slice[0], "", "" + } + return label_slice[0], label_slice[1], label_slice[2] +} + +func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*monitoring_info_t, + error) { + + labels_list, err := get_cfg_labels(nodeName) + if err != nil { + fmt.Printf("Error getting cfg labels: %v\n", err) + return nil, err + } + + namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list) + if err != nil { + return nil, err + } + + mon_info_map := make(map[string]*monitoring_info_t) + for idx, _ := range svcs { + svc := svcs[idx] + pod := pods[idx] + mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod) + for k, v := range mon_info { + mon_info_map[k] = v + } + } + return mon_info_map, nil +} + +func (client *ClovisorK8s) fetch_svcs_pods(nodeName string, + labels_list []string) (string, + []*core_v1.ServiceList, + []*core_v1.PodList, error) { + namespace := DEFAULT_NAMESPACE + /* + * Three cases: + * 1.) no configured namespaces, monitoring all pods in default namesapce + * 2.) if any config only has namespace, monitoring all pods in namespace + * 3.) label is configured, only monitor pods with that label + */ + var svcs []*core_v1.ServiceList + var pods []*core_v1.PodList + if len(labels_list) == 0 { + if svcs_list, err := + client.client.CoreV1().Services(namespace).List(metav1.ListOptions{}); + err != nil { + fmt.Printf("Error fetching service list for namespace %s\n", + namespace) + return namespace, nil, nil, err + } else { + svcs = append(svcs, svcs_list) + } + + if pods_list, err := + client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{}); + err != nil { + fmt.Printf("Error fetching pods list for namespace %s\n", + namespace) + return namespace, nil, nil, err + } else { + pods = append(pods, pods_list) + } + } else { + for _, label_str := range labels_list { + var label_selector string + namespace, key, value := parse_label_cfg(label_str) + if len(namespace) == 0 { + fmt.Printf("Error in config: %s not a valid config\n", label_str) + continue + } + if len(key) == 0 { + fmt.Printf("Namespace only config for %s\n", namespace) + } else { + label_selector = fmt.Sprintf("%s=%s", key, value) + } + if svc_list, err := + client.client.CoreV1().Services(namespace).List(metav1.ListOptions{ + LabelSelector: label_selector, + }); err != nil { + fmt.Printf("Error listing services with label %v:%v:%v - %v\n", + key, value, namespace, err.Error()) + continue + } else { + svcs = append(svcs, svc_list) + } + if pod_list, err := + client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{ + LabelSelector: label_selector, + }); err != nil { + fmt.Printf("Error listing pods with label %v:%v:%v - %v\n", + key, value, namespace, err.Error()) + continue + } else { + pods = append(pods, pod_list) + } + } + } + return namespace, svcs, pods, nil +} + +func (client *ClovisorK8s) get_monitoring_pods( + namespace string, + node_name string, + svcs *core_v1.ServiceList, + pods *core_v1.PodList) (map[string]*monitoring_info_t) { + monitoring_info := make(map[string]*monitoring_info_t) + svc_map := make(map[string][]string) + + for _, svc := range svcs.Items { + svc_port := svc.Spec.Ports[0] + target_port := svc_port.TargetPort.String() + svc_port_name := svc_port.Name + svc_map[target_port] = append(svc_map[target_port], svc.GetName()) + if len(svc_port_name) > 0 { + svc_map[target_port] = append(svc_map[target_port], svc_port_name) + } else { + svc_map[target_port] = append(svc_map[target_port], "tcp") + } + } + for _, v := range pods.Items { + if v.Spec.NodeName == node_name { + pod_name := v.GetName() + monitoring_info[pod_name] = &(monitoring_info_t{}) + monitoring_info[pod_name].namespace = namespace + monitoring_info[pod_name].pod_name = pod_name + monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name + monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort) + tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num)) + svc_array := svc_map[tp_string] + monitoring_info[pod_name].svc_name = svc_array[0] + if (strings.Contains(svc_array[1], "-")) { + monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")] + } else { + monitoring_info[pod_name].protocol = svc_array[1] + } + } + } + return monitoring_info +} + +func (client *ClovisorK8s) exec_command(command string, + monitoring_info *monitoring_info_t) (string, error) { + + // Following code based on: + // https://stackoverflow.com/questions/43314689/example-of-exec-in-k8ss-pod-by-using-go-client + // https://github.com/a4abhishek/Client-Go-Examples/blob/master/exec_to_pod/exec_to_pod.go + exec_req := client.client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(monitoring_info.pod_name). + Namespace(monitoring_info.namespace). + SubResource("exec") + scheme := runtime.NewScheme() + if err := core_v1.AddToScheme(scheme); err != nil { + fmt.Printf("Error in exec pods: %v\n", err.Error()) + return "", err + } + + parameterCodec := runtime.NewParameterCodec(scheme) + exec_req.VersionedParams(&core_v1.PodExecOptions{ + Command: strings.Fields(command), + Container: monitoring_info.container_name, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, parameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(client.config, "POST", exec_req.URL()) + if err != nil { + fmt.Printf("Error in remotecommand exec: %v\n", err.Error()) + return "", err + } + + var stdout, stderr bytes.Buffer + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + if err != nil { + fmt.Printf("Error in exec stream: %v\n", err.Error()) + return "", err + } + + stdout_no_newline := strings.TrimSuffix(stdout.String(), "\n") + return stdout_no_newline, nil +} |