summaryrefslogtreecommitdiffstats
path: root/clover/clovisor/libclovisor/clovisor_k8s.go
diff options
context:
space:
mode:
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_k8s.go')
-rw-r--r--clover/clovisor/libclovisor/clovisor_k8s.go165
1 files changed, 86 insertions, 79 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go
index 85b0ea4..8f4b481 100644
--- a/clover/clovisor/libclovisor/clovisor_k8s.go
+++ b/clover/clovisor/libclovisor/clovisor_k8s.go
@@ -9,13 +9,15 @@ package clovisor
import (
"bytes"
+ "errors"
"fmt"
"strconv"
"strings"
core_v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
@@ -36,6 +38,7 @@ type monitoring_info_t struct {
}
var DEFAULT_NAMESPACE = "default"
+var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"}
func K8s_client_init(nodeName string) (*ClovisorK8s, error) {
config, err := rest.InClusterConfig()
@@ -73,55 +76,48 @@ func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*mon
return nil, err
}
- namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list)
- if err != nil {
- return nil, err
+ mon_info_map := client.get_monitoring_pods(nodeName, labels_list)
+ if mon_info_map == nil {
+ return nil, errors.New("monitoring info empty")
}
- mon_info_map := make(map[string]*monitoring_info_t)
- for idx, _ := range svcs {
- svc := svcs[idx]
- pod := pods[idx]
- mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod)
- for k, v := range mon_info {
- mon_info_map[k] = v
- }
- }
return mon_info_map, nil
}
-func (client *ClovisorK8s) fetch_svcs_pods(nodeName string,
- labels_list []string) (string,
- []*core_v1.ServiceList,
- []*core_v1.PodList, error) {
- namespace := DEFAULT_NAMESPACE
+func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service,
+ namespace string) (*core_v1.PodList, error) {
+ set := labels.Set(svc.Spec.Selector)
+ listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()}
+ return client.client.CoreV1().Pods(namespace).List(listOptions)
+}
+
+func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
+ labels_list []string) (map[string]*monitoring_info_t) {
/*
* Three cases:
* 1.) no configured namespaces, monitoring all pods in default namesapce
* 2.) if any config only has namespace, monitoring all pods in namespace
* 3.) label is configured, only monitor pods with that label
*/
- var svcs []*core_v1.ServiceList
- var pods []*core_v1.PodList
+ var namespace string
+ ns_svc_map := make(map[string][]*core_v1.ServiceList)
+ monitoring_info := make(map[string]*monitoring_info_t)
if len(labels_list) == 0 {
+ // TODO(s3wong): set it to 'default'
+ namespace = "linux-foundation-gke"
if svcs_list, err :=
client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
err != nil {
fmt.Printf("Error fetching service list for namespace %s\n",
namespace)
- return namespace, nil, nil, err
- } else {
- svcs = append(svcs, svcs_list)
- }
-
- if pods_list, err :=
- client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{});
- err != nil {
- fmt.Printf("Error fetching pods list for namespace %s\n",
- namespace)
- return namespace, nil, nil, err
+ return nil
} else {
- pods = append(pods, pods_list)
+ /*
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ */
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list)
}
} else {
for _, label_str := range labels_list {
@@ -144,60 +140,71 @@ func (client *ClovisorK8s) fetch_svcs_pods(nodeName string,
key, value, namespace, err.Error())
continue
} else {
- svcs = append(svcs, svc_list)
- }
- if pod_list, err :=
- client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{
- LabelSelector: label_selector,
- }); err != nil {
- fmt.Printf("Error listing pods with label %v:%v:%v - %v\n",
- key, value, namespace, err.Error())
- continue
- } else {
- pods = append(pods, pod_list)
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list)
}
}
}
- return namespace, svcs, pods, nil
-}
-func (client *ClovisorK8s) get_monitoring_pods(
- namespace string,
- node_name string,
- svcs *core_v1.ServiceList,
- pods *core_v1.PodList) (map[string]*monitoring_info_t) {
- monitoring_info := make(map[string]*monitoring_info_t)
- svc_map := make(map[string][]string)
-
- for _, svc := range svcs.Items {
- svc_port := svc.Spec.Ports[0]
- target_port := svc_port.TargetPort.String()
- svc_port_name := svc_port.Name
- svc_map[target_port] = append(svc_map[target_port], svc.GetName())
- if len(svc_port_name) > 0 {
- svc_map[target_port] = append(svc_map[target_port], svc_port_name)
- } else {
- svc_map[target_port] = append(svc_map[target_port], "tcp")
- }
- }
- for _, v := range pods.Items {
- if v.Spec.NodeName == node_name {
- pod_name := v.GetName()
- monitoring_info[pod_name] = &(monitoring_info_t{})
- monitoring_info[pod_name].namespace = namespace
- monitoring_info[pod_name].pod_name = pod_name
- monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name
- monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort)
- tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num))
- svc_array := svc_map[tp_string]
- monitoring_info[pod_name].svc_name = svc_array[0]
- if (strings.Contains(svc_array[1], "-")) {
- monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")]
- } else {
- monitoring_info[pod_name].protocol = svc_array[1]
+ for ns, svc_slice := range ns_svc_map {
+ for _, svc_list_ := range svc_slice {
+ for _, svc := range svc_list_.Items {
+ fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName())
+ var svc_port_map = map[string]core_v1.ServicePort{}
+ for _, svc_port := range svc.Spec.Ports {
+ if len(svc_port.Name) > 0 {
+ for _, sp := range SUPPORTED_PROTOCOLS {
+ if strings.Contains(svc_port.Name, sp) {
+ target_port := svc_port.TargetPort.String()
+ svc_port_map[target_port] = svc_port
+ }
+ }
+ }
+ }
+ if len(svc_port_map) == 0 {
+ fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName())
+ continue
+ }
+ fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName())
+ pod_list, err := client.getPodsForSvc(&svc, ns)
+ if err != nil {
+ fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+ continue
+ }
+ for _, pod := range pod_list.Items {
+ if pod.Spec.NodeName == nodeName {
+ for _, container := range pod.Spec.Containers {
+ var port_num uint32
+ var tp_string string
+ for _, port := range container.Ports {
+ port_num = uint32(port.ContainerPort)
+ tp_string = strconv.Itoa(int(port_num))
+ if _, in := svc_port_map[tp_string]; !in {
+ continue
+ }
+ pod_name := pod.GetName()
+ monitoring_info[pod_name] = &(monitoring_info_t{})
+ monitoring_info[pod_name].namespace = ns
+ monitoring_info[pod_name].svc_name = svc.GetName()
+ monitoring_info[pod_name].pod_name = pod_name
+ monitoring_info[pod_name].container_name = container.Name
+ monitoring_info[pod_name].port_num = port_num
+ svc_port_name := svc_port_map[tp_string].Name
+ if (strings.Contains(svc_port_name, "-")) {
+ monitoring_info[pod_name].protocol = svc_port_name[:strings.Index(svc_port_name, "-")]
+ } else {
+ monitoring_info[pod_name].protocol = svc_port_name
+ }
+ }
+ }
+ }
+ }
}
}
}
+
return monitoring_info
}