summaryrefslogtreecommitdiffstats
path: root/clover/clovisor/libclovisor/clovisor_k8s.go
diff options
context:
space:
mode:
authorStephen Wong <stephen.kf.wong@gmail.com>2019-01-18 01:50:08 +0000
committerStephen Wong <stephen.kf.wong@gmail.com>2019-01-18 01:50:08 +0000
commit6aa27547b71bff174e3017f637a002546033bf39 (patch)
treeb89410b7bf14d896c04686ed05d7259f01a3e43e /clover/clovisor/libclovisor/clovisor_k8s.go
parentadf4c7d34840acbc4676d895075d7098c0064f9c (diff)
Various changes to improve Clovisor:
1.) make clovisor work on GKE 2.) running more efficient correlation between k8s service, pods, and service port name for the pod's container port 3.) add per session trace metrics on Clovisor's traces, including request and response sizes, trace-id, request-id, and more HTTP header fields 4.) improve eBPF code to account for TCP sessions which do not finish with either FIN or RST flags 5.) tested with Clover sample app (the "SDC") Change-Id: Ia1a6275caf31a63fb1288c93cea42b32a4606307 Signed-off-by: Stephen Wong <stephen.kf.wong@gmail.com>
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_k8s.go')
-rw-r--r--clover/clovisor/libclovisor/clovisor_k8s.go165
1 files changed, 86 insertions, 79 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go
index 85b0ea4..8f4b481 100644
--- a/clover/clovisor/libclovisor/clovisor_k8s.go
+++ b/clover/clovisor/libclovisor/clovisor_k8s.go
@@ -9,13 +9,15 @@ package clovisor
import (
"bytes"
+ "errors"
"fmt"
"strconv"
"strings"
core_v1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
@@ -36,6 +38,7 @@ type monitoring_info_t struct {
}
var DEFAULT_NAMESPACE = "default"
+var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"}
func K8s_client_init(nodeName string) (*ClovisorK8s, error) {
config, err := rest.InClusterConfig()
@@ -73,55 +76,48 @@ func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*mon
return nil, err
}
- namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list)
- if err != nil {
- return nil, err
+ mon_info_map := client.get_monitoring_pods(nodeName, labels_list)
+ if mon_info_map == nil {
+ return nil, errors.New("monitoring info empty")
}
- mon_info_map := make(map[string]*monitoring_info_t)
- for idx, _ := range svcs {
- svc := svcs[idx]
- pod := pods[idx]
- mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod)
- for k, v := range mon_info {
- mon_info_map[k] = v
- }
- }
return mon_info_map, nil
}
-func (client *ClovisorK8s) fetch_svcs_pods(nodeName string,
- labels_list []string) (string,
- []*core_v1.ServiceList,
- []*core_v1.PodList, error) {
- namespace := DEFAULT_NAMESPACE
+func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service,
+ namespace string) (*core_v1.PodList, error) {
+ set := labels.Set(svc.Spec.Selector)
+ listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()}
+ return client.client.CoreV1().Pods(namespace).List(listOptions)
+}
+
+func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
+ labels_list []string) (map[string]*monitoring_info_t) {
/*
* Three cases:
* 1.) no configured namespaces, monitoring all pods in default namesapce
* 2.) if any config only has namespace, monitoring all pods in namespace
* 3.) label is configured, only monitor pods with that label
*/
- var svcs []*core_v1.ServiceList
- var pods []*core_v1.PodList
+ var namespace string
+ ns_svc_map := make(map[string][]*core_v1.ServiceList)
+ monitoring_info := make(map[string]*monitoring_info_t)
if len(labels_list) == 0 {
+ // TODO(s3wong): set it to 'default'
+ namespace = "linux-foundation-gke"
if svcs_list, err :=
client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
err != nil {
fmt.Printf("Error fetching service list for namespace %s\n",
namespace)
- return namespace, nil, nil, err
- } else {
- svcs = append(svcs, svcs_list)
- }
-
- if pods_list, err :=
- client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{});
- err != nil {
- fmt.Printf("Error fetching pods list for namespace %s\n",
- namespace)
- return namespace, nil, nil, err
+ return nil
} else {
- pods = append(pods, pods_list)
+ /*
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ */
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list)
}
} else {
for _, label_str := range labels_list {
@@ -144,60 +140,71 @@ func (client *ClovisorK8s) fetch_svcs_pods(nodeName string,
key, value, namespace, err.Error())
continue
} else {
- svcs = append(svcs, svc_list)
- }
- if pod_list, err :=
- client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{
- LabelSelector: label_selector,
- }); err != nil {
- fmt.Printf("Error listing pods with label %v:%v:%v - %v\n",
- key, value, namespace, err.Error())
- continue
- } else {
- pods = append(pods, pod_list)
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list)
}
}
}
- return namespace, svcs, pods, nil
-}
-func (client *ClovisorK8s) get_monitoring_pods(
- namespace string,
- node_name string,
- svcs *core_v1.ServiceList,
- pods *core_v1.PodList) (map[string]*monitoring_info_t) {
- monitoring_info := make(map[string]*monitoring_info_t)
- svc_map := make(map[string][]string)
-
- for _, svc := range svcs.Items {
- svc_port := svc.Spec.Ports[0]
- target_port := svc_port.TargetPort.String()
- svc_port_name := svc_port.Name
- svc_map[target_port] = append(svc_map[target_port], svc.GetName())
- if len(svc_port_name) > 0 {
- svc_map[target_port] = append(svc_map[target_port], svc_port_name)
- } else {
- svc_map[target_port] = append(svc_map[target_port], "tcp")
- }
- }
- for _, v := range pods.Items {
- if v.Spec.NodeName == node_name {
- pod_name := v.GetName()
- monitoring_info[pod_name] = &(monitoring_info_t{})
- monitoring_info[pod_name].namespace = namespace
- monitoring_info[pod_name].pod_name = pod_name
- monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name
- monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort)
- tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num))
- svc_array := svc_map[tp_string]
- monitoring_info[pod_name].svc_name = svc_array[0]
- if (strings.Contains(svc_array[1], "-")) {
- monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")]
- } else {
- monitoring_info[pod_name].protocol = svc_array[1]
+ for ns, svc_slice := range ns_svc_map {
+ for _, svc_list_ := range svc_slice {
+ for _, svc := range svc_list_.Items {
+ fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName())
+ var svc_port_map = map[string]core_v1.ServicePort{}
+ for _, svc_port := range svc.Spec.Ports {
+ if len(svc_port.Name) > 0 {
+ for _, sp := range SUPPORTED_PROTOCOLS {
+ if strings.Contains(svc_port.Name, sp) {
+ target_port := svc_port.TargetPort.String()
+ svc_port_map[target_port] = svc_port
+ }
+ }
+ }
+ }
+ if len(svc_port_map) == 0 {
+ fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName())
+ continue
+ }
+ fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName())
+ pod_list, err := client.getPodsForSvc(&svc, ns)
+ if err != nil {
+ fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+ continue
+ }
+ for _, pod := range pod_list.Items {
+ if pod.Spec.NodeName == nodeName {
+ for _, container := range pod.Spec.Containers {
+ var port_num uint32
+ var tp_string string
+ for _, port := range container.Ports {
+ port_num = uint32(port.ContainerPort)
+ tp_string = strconv.Itoa(int(port_num))
+ if _, in := svc_port_map[tp_string]; !in {
+ continue
+ }
+ pod_name := pod.GetName()
+ monitoring_info[pod_name] = &(monitoring_info_t{})
+ monitoring_info[pod_name].namespace = ns
+ monitoring_info[pod_name].svc_name = svc.GetName()
+ monitoring_info[pod_name].pod_name = pod_name
+ monitoring_info[pod_name].container_name = container.Name
+ monitoring_info[pod_name].port_num = port_num
+ svc_port_name := svc_port_map[tp_string].Name
+ if (strings.Contains(svc_port_name, "-")) {
+ monitoring_info[pod_name].protocol = svc_port_name[:strings.Index(svc_port_name, "-")]
+ } else {
+ monitoring_info[pod_name].protocol = svc_port_name
+ }
+ }
+ }
+ }
+ }
}
}
}
+
return monitoring_info
}