diff options
author | Stephen Wong <stephen.kf.wong@gmail.com> | 2019-01-21 08:38:02 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2019-01-21 08:38:02 +0000 |
commit | 43e82fbbf2e62d0aa63a06b8ba5d72f78854cc34 (patch) | |
tree | ad3e40a3f22261e4638f1ccb6b764ff01c1d0c8b | |
parent | 963fa93ae151779ce7240656e6f1302a0336244d (diff) | |
parent | 6aa27547b71bff174e3017f637a002546033bf39 (diff) |
Merge "Various changes to improve Clovisor:"
-rw-r--r-- | clover/clovisor/Dockerfile | 5 | ||||
-rwxr-xr-x | clover/clovisor/bin/clovisor | bin | 39195400 -> 39227600 bytes | |||
-rwxr-xr-x | clover/clovisor/build-docker | 4 | ||||
-rw-r--r-- | clover/clovisor/clovisor_main.go | 4 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_bcc.go | 76 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_k8s.go | 165 | ||||
-rwxr-xr-x | clover/clovisor/libclovisor/ebpf/session_tracking.c | 45 |
7 files changed, 198 insertions, 101 deletions
diff --git a/clover/clovisor/Dockerfile b/clover/clovisor/Dockerfile index 63375a1..06ddd23 100644 --- a/clover/clovisor/Dockerfile +++ b/clover/clovisor/Dockerfile @@ -1,6 +1,7 @@ FROM ubuntu:18.04 -ARG TARGET_KERNEL_VER="4.15.0-36-generic" +# the following is the Linux version for GKE for k8s 1.11.4-gke.8 +ARG TARGET_KERNEL_VER="linux-headers-4.15.0-1023-gcp" RUN set -ex; \ echo "deb [trusted=yes] http://repo.iovisor.org/apt/bionic bionic main" > /etc/apt/sources.list.d/iovisor.list; \ @@ -8,7 +9,7 @@ RUN set -ex; \ DEBIAN_FRONTEND=noninteractive apt-get install -y \ auditd \ bcc-tools \ - linux-headers-$TARGET_KERNEL_VER \ + $TARGET_KERNEL_VER \ libelf1; COPY . . diff --git a/clover/clovisor/bin/clovisor b/clover/clovisor/bin/clovisor Binary files differindex bd94d65..d35f90e 100755 --- a/clover/clovisor/bin/clovisor +++ b/clover/clovisor/bin/clovisor diff --git a/clover/clovisor/build-docker b/clover/clovisor/build-docker index c724c8c..4f776ec 100755 --- a/clover/clovisor/build-docker +++ b/clover/clovisor/build-docker @@ -8,7 +8,7 @@ # http://www.apache.org/licenses/LICENSE-2.0 if [ -z "$1" ] then - kernel_ver=`uname -r` + kernel_ver=linux-headers-`uname -r` else kernel_ver=$1 fi @@ -16,3 +16,5 @@ cp bin/clovisor . docker build --build-arg TARGET_KERNEL_VER=$kernel_ver -t clovisor . docker tag clovisor localhost:5000/clovisor docker push localhost:5000/clovisor +#docker tag clovisor s3wong/clovisor +#docker push s3wong/clovisor diff --git a/clover/clovisor/clovisor_main.go b/clover/clovisor/clovisor_main.go index 46b1780..e235c50 100644 --- a/clover/clovisor/clovisor_main.go +++ b/clover/clovisor/clovisor_main.go @@ -38,8 +38,8 @@ func main() { fmt.Printf("Clovisor get monitoring info succeed: %v\n", monitoring_info_map) for pod := range monitoring_info_map { - podMon, err := clovisor.ClovisorNewPodInit(clovisor_k8s_client, pod, - monitoring_info_map[pod]) + podMon, err := clovisor.ClovisorNewPodInit(clovisor_k8s_client, node_name, + pod, monitoring_info_map[pod]) if err != nil { fmt.Printf("Clovisor monitoring pod %s failed: %v\n", pod, err) continue diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go index 4dc936d..e2d9cd8 100644 --- a/clover/clovisor/libclovisor/clovisor_bcc.go +++ b/clover/clovisor/libclovisor/clovisor_bcc.go @@ -119,6 +119,7 @@ func print_network_traces(tracer opentracing.Tracer) { value := sess_info.session if _, ok := value["done"]; ok { span := tracer.StartSpan("http-tracing") + span.SetTag("Node-Name", value["nodename"]) span.SetTag("Pod-Name", value["podname"]) span.SetTag("Source-IP", value["srcip"]) span.SetTag("Destination-IP", value["dstip"]) @@ -127,9 +128,28 @@ func print_network_traces(tracer opentracing.Tracer) { span.SetTag("HTTP Request Method", value["reqmethod"]) span.SetTag("HTTP Request URL", value["requrl"]) span.SetTag("HTTP Request Protocol", value["reqproto"]) + if _, exist := value["host"]; exist { + span.SetTag("HTTP Request Host", value["host"]) + } + if _, exist := value["useragent"]; exist { + span.SetTag("HTTP Client User Agent", value["useragent"]) + } + if _, exist := value["requestid"]; exist { + span.SetTag("OpenTracing Request ID", value["requestid"]) + } + if _, exist := value["envoydecorator"]; exist { + span.SetTag("Envoy Decorator", value["envoydecorator"]) + } + if _, exist := value["traceid"]; exist { + span.SetTag("Trace ID", value["traceid"]) + } + span.SetTag("HTTP Request Packet Count", value["reqpakcount"]) + span.SetTag("HTTP Request Byte Count", value["reqbytecount"]) span.SetTag("HTTP Response Status", value["respstatus"]) span.SetTag("HTTP Response Status Code", value["respstatuscode"]) span.SetTag("HTTP Response Protocol", value["respproto"]) + span.SetTag("HTTP Response Packet Count", value["resppakcount"]) + span.SetTag("HTTP Response Byte Count", value["respbytecount"]) span.SetTag("HTTP Session Duration", value["duration"]) span.Finish() delete(sessionMap, key) @@ -137,11 +157,12 @@ func print_network_traces(tracer opentracing.Tracer) { } } -func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, +func handle_skb_event(data *[]byte, node_name string, pod_name string, + session_table *bcc.Table, monitoring_info *monitoring_info_t, egress_match_list []egress_match_t) (error) { //fmt.Printf("monitoring info has %v\n", monitoring_info) - fmt.Printf("\n\n%s", hex.Dump(*data)) + fmt.Printf("\n\nnode[%s] pod[%s]\n%s", node_name, pod_name, hex.Dump(*data)) var src_ip, dst_ip uint32 var src_port, dst_port uint16 var session_key, src_ip_str, dst_ip_str string @@ -203,14 +224,24 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, sess_map.session = make(map[string]string) sess_map.buf = []byte{} sessionMap[session_key] = sess_map + zero := strconv.Itoa(0) + sessionMap[session_key].session["reqpakcount"] = zero + sessionMap[session_key].session["reqbytecount"] = zero + sessionMap[session_key].session["resppakcount"] = zero + sessionMap[session_key].session["respbytecount"] = zero } map_val := sessionMap[session_key].session + map_val["nodename"] = node_name map_val["podname"] = pod_name map_val["srcip"] = src_ip_str map_val["dstip"] = dst_ip_str map_val["srcport"] = fmt.Sprintf("%d", src_port) map_val["dstport"] = fmt.Sprintf("%d", dst_port) + curr_pak_count, _ := strconv.Atoi(map_val["reqpakcount"]) + curr_byte_count, _ := strconv.Atoi(map_val["reqbytecount"]) + curr_pak_count++ if proto == HTTP { + curr_byte_count += len(app_layer.Payload()) reader := bytes.NewReader(app_layer.Payload()) buf := bufio.NewReader(reader) req, err := http.ReadRequest(buf) @@ -225,11 +256,33 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, map_val["reqmethod"] = req.Method map_val["requrl"] = fmt.Sprintf("%v", req.URL) map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) + if user_agent := req.UserAgent(); len(user_agent) > 0 { + map_val["useragent"] = user_agent + } + if len(req.Host) > 0 { + map_val["host"] = req.Host + } + header := req.Header + if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { + map_val["requestid"] = req_id + } + if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { + map_val["envoydecorator"] = envoy_dec + } + if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { + map_val["traceid"] = trace_id + } if _, ok := map_val["respstatus"]; ok { map_val["done"] = "true" } } + } else { + // TODO(s3wong): TCP assumed for now + curr_byte_count += (len(*data) - 4) } + map_val["reqpakcount"] = strconv.Itoa(curr_pak_count) + map_val["reqbytecount"] = strconv.Itoa(curr_byte_count) + fmt.Printf("Current session packet count %v and byte count %v\n", map_val["reqpakcount"], map_val["reqbytecount"]) } else { session_key := session_key_t { src_ip: dst_ip, @@ -276,16 +329,26 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, sess_map.session = make(map[string]string) sess_map.buf = []byte{} sessionMap[sess_key] = sess_map + zero := strconv.Itoa(0) + sessionMap[sess_key].session["reqpakcount"] = zero + sessionMap[sess_key].session["reqbytecount"] = zero + sessionMap[sess_key].session["resppakcount"] = zero + sessionMap[sess_key].session["respbytecount"] = zero } var map_val = sessionMap[sess_key].session + map_val["nodename"] = node_name map_val["podname"] = pod_name map_val["srcip"] = dst_ip_str map_val["dstip"] = src_ip_str map_val["srcport"] = fmt.Sprintf("%d", dst_port) map_val["dstport"] = fmt.Sprintf("%d", src_port) map_val["duration"] = fmt.Sprintf("%v usec", duration) + curr_pak_count, _ := strconv.Atoi(map_val["resppakcount"]) + curr_byte_count, _ := strconv.Atoi(map_val["respbytecount"]) + curr_pak_count++ if proto == HTTP { + curr_byte_count += len(app_layer.Payload()) reader := bytes.NewReader(app_layer.Payload()) buf := bufio.NewReader(reader) resp, err := http.ReadResponse(buf, nil) @@ -325,7 +388,13 @@ func handle_skb_event(data *[]byte, pod_name string, session_table *bcc.Table, if resp != nil { resp.Body.Close() } + } else { + // TODO(s3wong): TCP assumed for now + curr_byte_count += (len(*data) - 4) } + map_val["resppakcount"] = strconv.Itoa(curr_pak_count) + map_val["respbytecount"] = strconv.Itoa(curr_byte_count) + fmt.Printf("Current session packet count %v and byte count %v\n", map_val["resppakcount"], map_val["respbytecount"]) if duration > 0 { map_val["done"] = "true" } @@ -373,6 +442,7 @@ func setEgressTable(egress_table *bcc.Table, } func ClovisorNewPodInit(k8s_client *ClovisorK8s, + node_name string, pod_name string, monitoring_info *monitoring_info_t) (*ClovisorBCC, error) { @@ -520,7 +590,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s, case <- ticker.C: print_network_traces(tracer) case data := <-skb_rev_chan: - err = handle_skb_event(&data, pod_name, session_table, + err = handle_skb_event(&data, node_name, pod_name, session_table, monitoring_info, egress_match_list) if err != nil { fmt.Printf("failed to decode received data: %s\n", err) diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go index 85b0ea4..8f4b481 100644 --- a/clover/clovisor/libclovisor/clovisor_k8s.go +++ b/clover/clovisor/libclovisor/clovisor_k8s.go @@ -9,13 +9,15 @@ package clovisor import ( "bytes" + "errors" "fmt" "strconv" "strings" core_v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" @@ -36,6 +38,7 @@ type monitoring_info_t struct { } var DEFAULT_NAMESPACE = "default" +var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"} func K8s_client_init(nodeName string) (*ClovisorK8s, error) { config, err := rest.InClusterConfig() @@ -73,55 +76,48 @@ func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*mon return nil, err } - namespace, svcs, pods, err := client.fetch_svcs_pods(nodeName, labels_list) - if err != nil { - return nil, err + mon_info_map := client.get_monitoring_pods(nodeName, labels_list) + if mon_info_map == nil { + return nil, errors.New("monitoring info empty") } - mon_info_map := make(map[string]*monitoring_info_t) - for idx, _ := range svcs { - svc := svcs[idx] - pod := pods[idx] - mon_info := client.get_monitoring_pods(namespace, nodeName, svc, pod) - for k, v := range mon_info { - mon_info_map[k] = v - } - } return mon_info_map, nil } -func (client *ClovisorK8s) fetch_svcs_pods(nodeName string, - labels_list []string) (string, - []*core_v1.ServiceList, - []*core_v1.PodList, error) { - namespace := DEFAULT_NAMESPACE +func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service, + namespace string) (*core_v1.PodList, error) { + set := labels.Set(svc.Spec.Selector) + listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()} + return client.client.CoreV1().Pods(namespace).List(listOptions) +} + +func (client *ClovisorK8s) get_monitoring_pods(nodeName string, + labels_list []string) (map[string]*monitoring_info_t) { /* * Three cases: * 1.) no configured namespaces, monitoring all pods in default namesapce * 2.) if any config only has namespace, monitoring all pods in namespace * 3.) label is configured, only monitor pods with that label */ - var svcs []*core_v1.ServiceList - var pods []*core_v1.PodList + var namespace string + ns_svc_map := make(map[string][]*core_v1.ServiceList) + monitoring_info := make(map[string]*monitoring_info_t) if len(labels_list) == 0 { + // TODO(s3wong): set it to 'default' + namespace = "linux-foundation-gke" if svcs_list, err := client.client.CoreV1().Services(namespace).List(metav1.ListOptions{}); err != nil { fmt.Printf("Error fetching service list for namespace %s\n", namespace) - return namespace, nil, nil, err - } else { - svcs = append(svcs, svcs_list) - } - - if pods_list, err := - client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{}); - err != nil { - fmt.Printf("Error fetching pods list for namespace %s\n", - namespace) - return namespace, nil, nil, err + return nil } else { - pods = append(pods, pods_list) + /* + if _, ok := ns_svc_map[namespace]; !ok { + ns_svc_map[namespace] = []*core_v1.ServiceList{} + } + */ + ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list) } } else { for _, label_str := range labels_list { @@ -144,60 +140,71 @@ func (client *ClovisorK8s) fetch_svcs_pods(nodeName string, key, value, namespace, err.Error()) continue } else { - svcs = append(svcs, svc_list) - } - if pod_list, err := - client.client.CoreV1().Pods(namespace).List(metav1.ListOptions{ - LabelSelector: label_selector, - }); err != nil { - fmt.Printf("Error listing pods with label %v:%v:%v - %v\n", - key, value, namespace, err.Error()) - continue - } else { - pods = append(pods, pod_list) + if _, ok := ns_svc_map[namespace]; !ok { + ns_svc_map[namespace] = []*core_v1.ServiceList{} + } + ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list) } } } - return namespace, svcs, pods, nil -} -func (client *ClovisorK8s) get_monitoring_pods( - namespace string, - node_name string, - svcs *core_v1.ServiceList, - pods *core_v1.PodList) (map[string]*monitoring_info_t) { - monitoring_info := make(map[string]*monitoring_info_t) - svc_map := make(map[string][]string) - - for _, svc := range svcs.Items { - svc_port := svc.Spec.Ports[0] - target_port := svc_port.TargetPort.String() - svc_port_name := svc_port.Name - svc_map[target_port] = append(svc_map[target_port], svc.GetName()) - if len(svc_port_name) > 0 { - svc_map[target_port] = append(svc_map[target_port], svc_port_name) - } else { - svc_map[target_port] = append(svc_map[target_port], "tcp") - } - } - for _, v := range pods.Items { - if v.Spec.NodeName == node_name { - pod_name := v.GetName() - monitoring_info[pod_name] = &(monitoring_info_t{}) - monitoring_info[pod_name].namespace = namespace - monitoring_info[pod_name].pod_name = pod_name - monitoring_info[pod_name].container_name = v.Spec.Containers[0].Name - monitoring_info[pod_name].port_num = uint32(v.Spec.Containers[0].Ports[0].ContainerPort) - tp_string := strconv.Itoa(int(monitoring_info[pod_name].port_num)) - svc_array := svc_map[tp_string] - monitoring_info[pod_name].svc_name = svc_array[0] - if (strings.Contains(svc_array[1], "-")) { - monitoring_info[pod_name].protocol = svc_array[1][:strings.Index(svc_array[1], "-")] - } else { - monitoring_info[pod_name].protocol = svc_array[1] + for ns, svc_slice := range ns_svc_map { + for _, svc_list_ := range svc_slice { + for _, svc := range svc_list_.Items { + fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName()) + var svc_port_map = map[string]core_v1.ServicePort{} + for _, svc_port := range svc.Spec.Ports { + if len(svc_port.Name) > 0 { + for _, sp := range SUPPORTED_PROTOCOLS { + if strings.Contains(svc_port.Name, sp) { + target_port := svc_port.TargetPort.String() + svc_port_map[target_port] = svc_port + } + } + } + } + if len(svc_port_map) == 0 { + fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName()) + continue + } + fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName()) + pod_list, err := client.getPodsForSvc(&svc, ns) + if err != nil { + fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err) + continue + } + for _, pod := range pod_list.Items { + if pod.Spec.NodeName == nodeName { + for _, container := range pod.Spec.Containers { + var port_num uint32 + var tp_string string + for _, port := range container.Ports { + port_num = uint32(port.ContainerPort) + tp_string = strconv.Itoa(int(port_num)) + if _, in := svc_port_map[tp_string]; !in { + continue + } + pod_name := pod.GetName() + monitoring_info[pod_name] = &(monitoring_info_t{}) + monitoring_info[pod_name].namespace = ns + monitoring_info[pod_name].svc_name = svc.GetName() + monitoring_info[pod_name].pod_name = pod_name + monitoring_info[pod_name].container_name = container.Name + monitoring_info[pod_name].port_num = port_num + svc_port_name := svc_port_map[tp_string].Name + if (strings.Contains(svc_port_name, "-")) { + monitoring_info[pod_name].protocol = svc_port_name[:strings.Index(svc_port_name, "-")] + } else { + monitoring_info[pod_name].protocol = svc_port_name + } + } + } + } + } } } } + return monitoring_info } diff --git a/clover/clovisor/libclovisor/ebpf/session_tracking.c b/clover/clovisor/libclovisor/ebpf/session_tracking.c index 99f704a..ea68788 100755 --- a/clover/clovisor/libclovisor/ebpf/session_tracking.c +++ b/clover/clovisor/libclovisor/ebpf/session_tracking.c @@ -17,6 +17,7 @@ #define MAX_SESSION_TABLE_ENTRIES 8192 typedef enum { + UNDEFINED = 0, HTTP = 1, HTTP2 = 2, TCP = 3, @@ -145,24 +146,37 @@ static inline app_proto_t ingress_tcp_parsing(struct tcphdr *tcp_hdr, unsigned short dest_port = htons(tcp_hdr->dest); egress_match_t egress_match = {}; policy_action_t *policy_ptr = NULL; + app_proto_t ret = TCP; unsigned int *proto = dports2proto.lookup(&dest_port); if (proto != NULL) { + /* if (tcp_hdr->syn && !tcp_hdr->ack) { - return TCP; + return ret; } + */ + ret = HTTP; if (tcp_hdr->fin || tcp_hdr->rst) { process_response(ntohl(ipv4_hdr->saddr), ntohl(ipv4_hdr->daddr), ntohs(tcp_hdr->source), ntohs(tcp_hdr->dest)); - return TCP; + } else { + process_request(ntohl(ipv4_hdr->saddr), + ntohl(ipv4_hdr->daddr), + ntohs(tcp_hdr->source), + ntohs(tcp_hdr->dest)); } - process_request(ntohl(ipv4_hdr->saddr), - ntohl(ipv4_hdr->daddr), - ntohs(tcp_hdr->source), - ntohs(tcp_hdr->dest)); } else { + dest_port = htons(tcp_hdr->source); + proto = dports2proto.lookup(&dest_port); + if (proto != NULL) { + // clock response receiving time + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + } egress_match.dst_ip = ntohl(ipv4_hdr->saddr); egress_match.dst_port = ntohs(tcp_hdr->source); policy_ptr = egress_lookup_table.lookup(&egress_match); @@ -173,6 +187,7 @@ static inline app_proto_t ingress_tcp_parsing(struct tcphdr *tcp_hdr, if (policy_ptr != NULL) { if (*policy_ptr == RECORD) { + ret = HTTP; if (tcp_hdr->fin || tcp_hdr->rst) { process_response(ntohl(ipv4_hdr->daddr), ntohl(ipv4_hdr->saddr), @@ -185,7 +200,7 @@ static inline app_proto_t ingress_tcp_parsing(struct tcphdr *tcp_hdr, // everything else drops to TCP //return ((void*)tcp_hdr); - return HTTP; + return ret; } static inline app_proto_t egress_tcp_parsing(struct tcphdr *tcp_hdr, @@ -200,12 +215,13 @@ static inline app_proto_t egress_tcp_parsing(struct tcphdr *tcp_hdr, unsigned int *proto = dports2proto.lookup(&src_port); if (proto != NULL) { - if (tcp_hdr->fin || tcp_hdr->rst) { - process_response(ntohl(ipv4_hdr->daddr), - ntohl(ipv4_hdr->saddr), - ntohs(tcp_hdr->dest), - ntohs(tcp_hdr->source)); - } + //if (tcp_hdr->fin || tcp_hdr->rst) { + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + //} + ret = HTTP; } else { egress_match.dst_ip = ntohl(ipv4_hdr->daddr); @@ -222,11 +238,12 @@ static inline app_proto_t egress_tcp_parsing(struct tcphdr *tcp_hdr, ntohl(ipv4_hdr->daddr), ntohs(tcp_hdr->source), ntohs(tcp_hdr->dest)); + ret = HTTP; } } } //return(ret_hdr); - return HTTP; + return ret; } static inline int handle_packet(struct __sk_buff *skb, int is_ingress) |