From 6aa27547b71bff174e3017f637a002546033bf39 Mon Sep 17 00:00:00 2001 From: Stephen Wong Date: Fri, 18 Jan 2019 01:50:08 +0000 Subject: Various changes to improve Clovisor: 1.) make clovisor work on GKE 2.) running more efficient correlation between k8s service, pods, and service port name for the pod's container port 3.) add per session trace metrics on Clovisor's traces, including request and response sizes, trace-id, request-id, and more HTTP header fields 4.) improve eBPF code to account for TCP sessions which do not finish with either FIN or RST flags 5.) tested with Clover sample app (the "SDC") Change-Id: Ia1a6275caf31a63fb1288c93cea42b32a4606307 Signed-off-by: Stephen Wong --- clover/clovisor/libclovisor/clovisor_bcc.go | 76 +++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 3 deletions(-) (limited to 'clover/clovisor/libclovisor/clovisor_bcc.go') diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go index 4dc936d..e2d9cd8 100644 --- a/clover/clovisor/libclovisor/clovisor_bcc.go +++ b/clover/clovisor/libclovisor/clovisor_bcc.go @@ -119,6 +119,7 @@ func print_network_traces(tracer opentracing.Tracer) { value := sess_info.session if _, ok := value["done"]; ok { span := tracer.StartSpan("http-tracing") + span.SetTag("Node-Name", value["nodename"]) span.SetTag("Pod-Name", value["podname"]) span.SetTag("Source-IP", value["srcip"]) span.SetTag("Destination-IP", value["dstip"]) @@ -127,9 +128,28 @@ func print_network_traces(tracer opentracing.Tracer) { span.SetTag("HTTP Request Method", value["reqmethod"]) span.SetTag("HTTP Request URL", value["requrl"]) span.SetTag("HTTP Request Protocol", value["reqproto"]) + if _, exist := value["host"]; exist { + span.SetTag("HTTP Request Host", value["host"]) + } + if _, exist := value["useragent"]; exist { + span.SetTag("HTTP Client User Agent", value["useragent"]) + } + if _, exist := value["requestid"]; exist { + span.SetTag("OpenTracing Request ID", value["requestid"]) + } + if _, exist := value["envoydecorator"]; exist { + span.SetTag("Envoy Decorator", value["envoydecorator"]) + } + if _, exist := value["traceid"]; exist { + span.SetTag("Trace ID", value["traceid"]) + } + span.SetTag("HTTP Request Packet Count", value["reqpakcount"]) + span.SetTag("HTTP Request Byte Count", value["reqbytecount"]) span.SetTag("HTTP Response Status", value["respstatus"]) span.SetTag("HTTP Response Status Code", value["respstatuscode"]) span.SetTag("HTTP Response Protocol", value["respproto"]) + span.SetTag("HTTP Response Packet Count", value["resppakcount"]) + span.SetTag("HTTP Response Byte Count", value["respbytecount"]) span.SetTag("HTTP Session Duration", value["duration"]) span.Finish() delete(sessionMap, key) @@ -137,11 +157,12 @@ func print_network_traces(tracer opentracing.Tracer) { } } -func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, +func handle_skb_event(data *[]byte, node_name string, pod_name string, + session_table *bcc.Table, monitoring_info *monitoring_info_t, egress_match_list []egress_match_t) (error) { //fmt.Printf("monitoring info has %v\n", monitoring_info) - fmt.Printf("\n\n%s", hex.Dump(*data)) + fmt.Printf("\n\nnode[%s] pod[%s]\n%s", node_name, pod_name, hex.Dump(*data)) var src_ip, dst_ip uint32 var src_port, dst_port uint16 var session_key, src_ip_str, dst_ip_str string @@ -203,14 +224,24 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, sess_map.session = make(map[string]string) sess_map.buf = []byte{} sessionMap[session_key] = sess_map + zero := strconv.Itoa(0) + sessionMap[session_key].session["reqpakcount"] = zero + sessionMap[session_key].session["reqbytecount"] = zero + sessionMap[session_key].session["resppakcount"] = zero + sessionMap[session_key].session["respbytecount"] = zero } map_val := sessionMap[session_key].session + map_val["nodename"] = node_name map_val["podname"] = pod_name map_val["srcip"] = src_ip_str map_val["dstip"] = dst_ip_str map_val["srcport"] = fmt.Sprintf("%d", src_port) map_val["dstport"] = fmt.Sprintf("%d", dst_port) + curr_pak_count, _ := strconv.Atoi(map_val["reqpakcount"]) + curr_byte_count, _ := strconv.Atoi(map_val["reqbytecount"]) + curr_pak_count++ if proto == HTTP { + curr_byte_count += len(app_layer.Payload()) reader := bytes.NewReader(app_layer.Payload()) buf := bufio.NewReader(reader) req, err := http.ReadRequest(buf) @@ -225,11 +256,33 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, map_val["reqmethod"] = req.Method map_val["requrl"] = fmt.Sprintf("%v", req.URL) map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) + if user_agent := req.UserAgent(); len(user_agent) > 0 { + map_val["useragent"] = user_agent + } + if len(req.Host) > 0 { + map_val["host"] = req.Host + } + header := req.Header + if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { + map_val["requestid"] = req_id + } + if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { + map_val["envoydecorator"] = envoy_dec + } + if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { + map_val["traceid"] = trace_id + } if _, ok := map_val["respstatus"]; ok { map_val["done"] = "true" } } + } else { + // TODO(s3wong): TCP assumed for now + curr_byte_count += (len(*data) - 4) } + map_val["reqpakcount"] = strconv.Itoa(curr_pak_count) + map_val["reqbytecount"] = strconv.Itoa(curr_byte_count) + fmt.Printf("Current session packet count %v and byte count %v\n", map_val["reqpakcount"], map_val["reqbytecount"]) } else { session_key := session_key_t { src_ip: dst_ip, @@ -276,16 +329,26 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, sess_map.session = make(map[string]string) sess_map.buf = []byte{} sessionMap[sess_key] = sess_map + zero := strconv.Itoa(0) + sessionMap[sess_key].session["reqpakcount"] = zero + sessionMap[sess_key].session["reqbytecount"] = zero + sessionMap[sess_key].session["resppakcount"] = zero + sessionMap[sess_key].session["respbytecount"] = zero } var map_val = sessionMap[sess_key].session + map_val["nodename"] = node_name map_val["podname"] = pod_name map_val["srcip"] = dst_ip_str map_val["dstip"] = src_ip_str map_val["srcport"] = fmt.Sprintf("%d", dst_port) map_val["dstport"] = fmt.Sprintf("%d", src_port) map_val["duration"] = fmt.Sprintf("%v usec", duration) + curr_pak_count, _ := strconv.Atoi(map_val["resppakcount"]) + curr_byte_count, _ := strconv.Atoi(map_val["respbytecount"]) + curr_pak_count++ if proto == HTTP { + curr_byte_count += len(app_layer.Payload()) reader := bytes.NewReader(app_layer.Payload()) buf := bufio.NewReader(reader) resp, err := http.ReadResponse(buf, nil) @@ -325,7 +388,13 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, if resp != nil { resp.Body.Close() } + } else { + // TODO(s3wong): TCP assumed for now + curr_byte_count += (len(*data) - 4) } + map_val["resppakcount"] = strconv.Itoa(curr_pak_count) + map_val["respbytecount"] = strconv.Itoa(curr_byte_count) + fmt.Printf("Current session packet count %v and byte count %v\n", map_val["resppakcount"], map_val["respbytecount"]) if duration > 0 { map_val["done"] = "true" } @@ -373,6 +442,7 @@ func setEgressTable(egress_table *bcc.Table, } func ClovisorNewPodInit(k8s_client *ClovisorK8s, + node_name string, pod_name string, monitoring_info *monitoring_info_t) (*ClovisorBCC, error) { @@ -520,7 +590,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s, case <- ticker.C: print_network_traces(tracer) case data := <-skb_rev_chan: - err = handle_skb_event(&data, pod_name, session_table, + err = handle_skb_event(&data, node_name, pod_name, session_table, monitoring_info, egress_match_list) if err != nil { fmt.Printf("failed to decode received data: %s\n", err) -- cgit 1.2.3-korg