diff options
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_k8s.go')
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_k8s.go | 165 |
1 files changed, 86 insertions, 79 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go index 85b0ea4..8f4b481 100644 --- a/clover/clovisor/libclovisor/clovisor_k8s.go +++ b/clover/clovisor/libclovisor/clovisor_k8s.go @@ -9,13 +9,15 @@ package clovisor import ( "bytes" + "errors" "fmt" "strconv" "strings" core_v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" @@ -36,6 +38,7 @@ type monitoring_info_t struct { } var DEFAULT_NAMESPACE = "default" +var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"} func K8s_client_init(nodeName string) (*ClovisorK8s, error) { config, err := rest.InClusterConfig() @@ -73,55 +76,48 @@ func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*mon return nil, err } - namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list) - if err != nil { - return nil, err + mon_info_map := client.get_monitoring_pods(nodeName, labels_list) + if mon_info_map == nil { + return nil, errors.New("monitoring info empty") } - mon_info_map := make(map[string]*monitoring_info_t) - for idx, _ := range svcs { - svc := svcs[idx] - pod := pods[idx] - mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod) - for k, v := range mon_info { - mon_info_map[k] = v - } - } return mon_info_map, nil } -func (client *ClovisorK8s) fetch_svcs_pods(nodeName string, - labels_list []string) (string, - []*core_v1.ServiceList, - []*core_v1.PodList, error) { - namespace := DEFAULT_NAMESPACE +func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service, + namespace string) (*core_v1.PodList, error) { + set := labels.Set(svc.Spec.Selector) + listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()} + return client.client.CoreV1().Pods(namespace).List(listOptions) +} + +func (client *ClovisorK8s) get_monitoring_pods(nodeName string, + labels_list []string) (map[string]*monitoring_info_t) { /* * Three cases: * 1.) no configured namespaces, monitoring all pods in default namesapce * 2.) if any config only has namespace, monitoring all pods in namespace * 3.) label is configured, only monitor pods with that label */ - var svcs []*core_v1.ServiceList - var pods []*core_v1.PodList + var namespace string + ns_svc_map := make(map[string][]*core_v1.ServiceList) + monitoring_info := make(map[string]*monitoring_info_t) if len(labels_list) == 0 { + // TODO(s3wong): set it to 'default' + namespace = "linux-foundation-gke" if svcs_list, err := client.client.CoreV1().Services(namespace).List(metav1.ListOptions{}); err != nil { fmt.Printf("Error fetching service list for namespace %s\n", namespace) - return namespace, nil, nil, err - } else { - svcs = append(svcs, svcs_list) - } - - if pods_list, err := - client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{}); - err != nil { - fmt.Printf("Error fetching pods list for namespace %s\n", - namespace) - return namespace, nil, nil, err + return nil } else { - pods = append(pods, pods_list) + /* + if _, ok := ns_svc_map[namespace]; !ok { + ns_svc_map[namespace] = []*core_v1.ServiceList{} + } + */ + ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list) } } else { for _, label_str := range labels_list { @@ -144,60 +140,71 @@ func (client *ClovisorK8s) fetch_svcs_pods(nodeName string, key, value, namespace, err.Error()) continue } else { - svcs = append(svcs, svc_list) - } - if pod_list, err := - client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{ - LabelSelector: label_selector, - }); err != nil { - fmt.Printf("Error listing pods with label %v:%v:%v - %v\n", - key, value, namespace, err.Error()) - continue - } else { - pods = append(pods, pod_list) + if _, ok := ns_svc_map[namespace]; !ok { + ns_svc_map[namespace] = []*core_v1.ServiceList{} + } + ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list) } } } - return namespace, svcs, pods, nil -} -func (client *ClovisorK8s) get_monitoring_pods( - namespace string, - node_name string, - svcs *core_v1.ServiceList, - pods *core_v1.PodList) (map[string]*monitoring_info_t) { - monitoring_info := make(map[string]*monitoring_info_t) - svc_map := make(map[string][]string) - - for _, svc := range svcs.Items { - svc_port := svc.Spec.Ports[0] - target_port := svc_port.TargetPort.String() - svc_port_name := svc_port.Name - svc_map[target_port] = append(svc_map[target_port], svc.GetName()) - if len(svc_port_name) > 0 { - svc_map[target_port] = append(svc_map[target_port], svc_port_name) - } else { - svc_map[target_port] = append(svc_map[target_port], "tcp") - } - } - for _, v := range pods.Items { - if v.Spec.NodeName == node_name { - pod_name := v.GetName() - monitoring_info[pod_name] = &(monitoring_info_t{}) - monitoring_info[pod_name].namespace = namespace - monitoring_info[pod_name].pod_name = pod_name - monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name - monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort) - tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num)) - svc_array := svc_map[tp_string] - monitoring_info[pod_name].svc_name = svc_array[0] - if (strings.Contains(svc_array[1], "-")) { - monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")] - } else { - monitoring_info[pod_name].protocol = svc_array[1] + for ns, svc_slice := range ns_svc_map { + for _, svc_list_ := range svc_slice { + for _, svc := range svc_list_.Items { + fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName()) + var svc_port_map = map[string]core_v1.ServicePort{} + for _, svc_port := range svc.Spec.Ports { + if len(svc_port.Name) > 0 { + for _, sp := range SUPPORTED_PROTOCOLS { + if strings.Contains(svc_port.Name, sp) { + target_port := svc_port.TargetPort.String() + svc_port_map[target_port] = svc_port + } + } + } + } + if len(svc_port_map) == 0 { + fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName()) + continue + } + fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName()) + pod_list, err := client.getPodsForSvc(&svc, ns) + if err != nil { + fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err) + continue + } + for _, pod := range pod_list.Items { + if pod.Spec.NodeName == nodeName { + for _, container := range pod.Spec.Containers { + var port_num uint32 + var tp_string string + for _, port := range container.Ports { + port_num = uint32(port.ContainerPort) + tp_string = strconv.Itoa(int(port_num)) + if _, in := svc_port_map[tp_string]; !in { + continue + } + pod_name := pod.GetName() + monitoring_info[pod_name] = &(monitoring_info_t{}) + monitoring_info[pod_name].namespace = ns + monitoring_info[pod_name].svc_name = svc.GetName() + monitoring_info[pod_name].pod_name = pod_name + monitoring_info[pod_name].container_name = container.Name + monitoring_info[pod_name].port_num = port_num + svc_port_name := svc_port_map[tp_string].Name + if (strings.Contains(svc_port_name, "-")) { + monitoring_info[pod_name].protocol = svc_port_name[:strings.Index(svc_port_name, "-")] + } else { + monitoring_info[pod_name].protocol = svc_port_name + } + } + } + } + } } } } + return monitoring_info } |