diff options
author | Stephen Wong <stephen.kf.wong@gmail.com> | 2019-04-05 06:16:53 +0000 |
---|---|---|
committer | Stephen Wong <stephen.kf.wong@gmail.com> | 2019-04-05 06:27:39 +0000 |
commit | ce8578f13d1c27d66c12ba07a3439fc453bde8cf (patch) | |
tree | ba1cf38a66f2ca180d85346e8ee45297bea042cc | |
parent | 3f86a3d611f4d2a4f21d9be2a2284ccf120db36e (diff) |
Clovisor ONS demo related fixes
Change-Id: I9449ee5f699a3cdf471dc8b405de650325ae09f6
Signed-off-by: Stephen Wong <stephen.kf.wong@gmail.com>
-rwxr-xr-x | clover/clovisor/bin/clovisor | bin | 39227920 -> 46391480 bytes | |||
-rwxr-xr-x | clover/clovisor/build-docker | 8 | ||||
-rwxr-xr-x | clover/clovisor/build.sh | 15 | ||||
-rw-r--r-- | clover/clovisor/clovisor_main.go | 19 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_bcc.go | 687 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_cfg.go | 39 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_k8s.go | 52 | ||||
-rwxr-xr-x | clover/clovisor/libclovisor/ebpf/node_interface.c | 158 | ||||
-rwxr-xr-x | clover/clovisor/libclovisor/libproto/build-plugin | 9 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/libproto/clovisor_http.go | 84 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/libproto/http_alt.go | 93 | ||||
-rw-r--r-- | clover/clovisor/proto/http.so | bin | 0 -> 11742704 bytes |
12 files changed, 947 insertions, 217 deletions
diff --git a/clover/clovisor/bin/clovisor b/clover/clovisor/bin/clovisor Binary files differindex 240e0bc..a683ee6 100755 --- a/clover/clovisor/bin/clovisor +++ b/clover/clovisor/bin/clovisor diff --git a/clover/clovisor/build-docker b/clover/clovisor/build-docker index 99668d7..4f776ec 100755 --- a/clover/clovisor/build-docker +++ b/clover/clovisor/build-docker @@ -14,7 +14,7 @@ if [ -z "$1" ] fi cp bin/clovisor . docker build --build-arg TARGET_KERNEL_VER=$kernel_ver -t clovisor . -#docker tag clovisor localhost:5000/clovisor -#docker push localhost:5000/clovisor -docker tag clovisor s3wong/clovisor -docker push s3wong/clovisor +docker tag clovisor localhost:5000/clovisor +docker push localhost:5000/clovisor +#docker tag clovisor s3wong/clovisor +#docker push s3wong/clovisor diff --git a/clover/clovisor/build.sh b/clover/clovisor/build.sh index 4503d5a..4a9cfe6 100755 --- a/clover/clovisor/build.sh +++ b/clover/clovisor/build.sh @@ -5,11 +5,11 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 -GOVERSION=1.10.3 +GOVERSION=1.12 OS=linux ARCH=amd64 -GOPATH=/home/ubuntu/go -CLIENTGOVERSION=v8.0.0 +GOPATH=/home/s3wong/go +CLIENTGOVERSION=v10.0.0 SRCDIR=`pwd` @@ -28,6 +28,7 @@ go get github.com/google/gopacket go get github.com/iovisor/gobpf go get github.com/opentracing/opentracing-go go get github.com/pkg/errors +go get github.com/go-redis/redis go get github.com/uber/jaeger-client-go go get github.com/vishvananda/netlink go get github.com/vishvananda/netns @@ -39,7 +40,7 @@ cd $GOPATH/src/k8s.io/client-go git checkout $CLIENTGOVERSION godep restore ./... -cd $SRCDIR/libclovisor -go build . -cd ../ -go build -o clovisor . +#cd $SRCDIR/libclovisor +#go build . +#cd ../ +#go build -o clovisor . diff --git a/clover/clovisor/clovisor_main.go b/clover/clovisor/clovisor_main.go index e235c50..b8e6508 100644 --- a/clover/clovisor/clovisor_main.go +++ b/clover/clovisor/clovisor_main.go @@ -9,8 +9,10 @@ package main import ( "fmt" + "io/ioutil" "os" "os/signal" + "path/filepath" "syscall" clovisor "./libclovisor" @@ -21,6 +23,23 @@ var podMonitoringMap map[string]*clovisor.ClovisorBCC func main() { node_name := os.Getenv("MY_NODE_NAME") + ex, err := os.Executable() + if err != nil { + fmt.Println(err.Error()) + } else { + exPath := filepath.Dir(ex) + fmt.Printf("Current Working Directory is %v\n", exPath) + files, _ := ioutil.ReadDir(exPath) + for _, f := range files { + fmt.Printf("%v ",f.Name()) + } + fmt.Printf("\n") + } + + clovisor.Monitor_proto_plugin_cfg() + + clovisor.ClovisorPhyInfSetup() + podMonitoringMap = make(map[string]*clovisor.ClovisorBCC) clovisor_k8s_client, err := clovisor.K8s_client_init(node_name) diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go index ab5bc33..a6c74ef 100644 --- a/clover/clovisor/libclovisor/clovisor_bcc.go +++ b/clover/clovisor/libclovisor/clovisor_bcc.go @@ -9,15 +9,20 @@ package clovisor import ( "encoding/hex" - "bufio" + //"encoding/json" "bytes" "encoding/binary" + "errors" "fmt" "io/ioutil" - "net/http" + "net" + //"net/http" + "plugin" "strconv" + "strings" "time" + //"github.com/go-redis/redis" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/iovisor/gobpf/bcc" @@ -30,11 +35,15 @@ import ( /* #cgo CFLAGS: -I/usr/include/bcc/compat #cgo LDFLAGS: -lbcc -#include <bcc/bpf_common.h> +#include <bcc/bcc_common.h> #include <bcc/libbpf.h> */ import "C" +type Parser interface { + Parse(session_key string, is_req bool, data []byte)([]byte, map[string]string) +} + type ClovisorBCC struct { stopChan chan bool // TODO(s3wong): remove once k8s watcher available @@ -65,8 +74,12 @@ type egress_match_cfg struct { } type session_info_t struct { - session map[string]string - buf []byte + done bool + service string + generalInfo map[string]string + traces []map[string]string + reqBuf []byte + respBuf []byte } const ( @@ -77,7 +90,12 @@ const ( ) //var sessionMap map[string]map[string]string; -var sessionMap map[string]session_info_t; +var sessionMap map[string]*session_info_t; +var protocolParser = map[string]Parser{}; +var defaultModPath = map[string]string{ + "http": "/proto/http.so", +} +var tracerMap = map[string]opentracing.Tracer{}; var veth_ifidx_command = "cat /sys/class/net/eth0/iflink"; @@ -88,6 +106,20 @@ var protocolMap = map[string]int{ "udp": 4, } +var traceTable string = "NetworkTraces" + +/* + * redisConnect: redis client connecting to redis server +func redisConnect() *redis.Client { + client := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:6379", redisServer), + Password: "", + DB: 0, + }) + return client +} + */ + func linkSetup(ifname string) netlink.Link { link, err := netlink.LinkByName(ifname) netlink.LinkSetUp(link) @@ -114,43 +146,117 @@ func dumpBPFTable(table *bcc.Table) { } } -func print_network_traces(tracer opentracing.Tracer) { - for key, sess_info := range sessionMap { - value := sess_info.session - if _, ok := value["done"]; ok { - span := tracer.StartSpan("http-tracing") - span.SetTag("Node-Name", value["nodename"]) - span.SetTag("Pod-Name", value["podname"]) - span.SetTag("Source-IP", value["srcip"]) - span.SetTag("Destination-IP", value["dstip"]) - span.SetTag("Source-Port", value["srcport"]) - span.SetTag("Destination-Port", value["dstport"]) - span.SetTag("HTTP Request Method", value["reqmethod"]) - span.SetTag("HTTP Request URL", value["requrl"]) - span.SetTag("HTTP Request Protocol", value["reqproto"]) - if _, exist := value["host"]; exist { - span.SetTag("HTTP Request Host", value["host"]) - } - if _, exist := value["useragent"]; exist { - span.SetTag("HTTP Client User Agent", value["useragent"]) +func loadProtoParser(protocol string, update bool) error { + var modPath = "" + + if !update { + if _, ok := protocolParser[protocol]; ok { + fmt.Printf("Found parse function for protocol %s\n", protocol) + return nil + } + } + + client := redisConnect() + + redisResult := client.HGet(ProtoCfg, protocol) + if redisResult.Err() == nil { + if len(redisResult.Val()) > 0 { + modPath = redisResult.Val() + } + } + if len(modPath) == 0 { + if _, ok := defaultModPath[protocol]; ok { + modPath = defaultModPath[protocol] + } else { + return errors.New(fmt.Sprintf("Unable to find module path for protocol %s", protocol)) + } + } + + fmt.Printf("Loading plugin for protocol %v with %v\n", protocol, modPath) + + plug, err := plugin.Open(modPath) + if err != nil { + fmt.Println(err) + return err + } + + symParse, err := plug.Lookup("Parser") + if err != nil { + fmt.Println(err) + return err + } + + var parser Parser + parser, ok := symParse.(Parser) + if !ok { + fmt.Printf("Unexpected type from mod %s symbol parse\n", modPath) + return errors.New(fmt.Sprintf("Wrong type for func parse from %s", modPath)) + } + + protocolParser[protocol] = parser + return nil +} + +func print_network_traces() { + /* + client := redisConnect() + + traces, err := client.HGetAll(traceTable).Result() + if err != nil { + fmt.Printf("Error retriving traces from redis: %v\n", err.Error()) + return + } + */ + /* + structure: + "done": "true", + "traces": array of protocol traces + [0] : "admin": map[string]string + [1] : "ipv4 or ipv6": map[string]string + [2] : "tcp or udp": map[string]string + [3] : "http"... + */ + /* + for key, value := range traces { + traceMap := map[string]interface{} + json.Unmarshal([]byte(value), &traceMap) + if _, ok := traceMap["done"]; ok { + span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key)) + for idx, trace := range traceMap["traces"] { + span.SetTag(fmt.Sprintf("protocol-%d", idx), trace['protocol']) + for tag, tagVal := range trace { + if tag == "protocol" { + continue + } + span.SetTag(tag, tagVal) + } } - if _, exist := value["requestid"]; exist { - span.SetTag("OpenTracing Request ID", value["requestid"]) + span.Finish() + ret := client.HDel(traceTable, key) + if ret.Err() != nil { + fmt.Printf("Error deleting %v from %v: %v\n", key, traceTable, ret.Err()) } - if _, exist := value["envoydecorator"]; exist { - span.SetTag("Envoy Decorator", value["envoydecorator"]) + } + } + */ + for key, value := range sessionMap { + if value.done { + tracer := tracerMap[value.service] + span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key)) + for genTag, genVal := range value.generalInfo { + fmt.Printf("general info writing %v: %v\n", genTag, genVal) + span.SetTag(genTag, genVal) } - if _, exist := value["traceid"]; exist { - span.SetTag("Trace ID", value["traceid"]) + for idx, trace := range value.traces { + span.SetTag(fmt.Sprintf("protocol-%d", idx), trace["protocol"]) + for tag, tagVal := range trace { + if tag == "protocol" { + continue + } + fmt.Printf("%v writing %v: %v\n", trace["protocol"], tag, tagVal) + span.SetTag(tag, tagVal) + } } - span.SetTag("HTTP Request Packet Count", value["reqpakcount"]) - span.SetTag("HTTP Request Byte Count", value["reqbytecount"]) - span.SetTag("HTTP Response Status", value["respstatus"]) - span.SetTag("HTTP Response Status Code", value["respstatuscode"]) - span.SetTag("HTTP Response Protocol", value["respproto"]) - span.SetTag("HTTP Response Packet Count", value["resppakcount"]) - span.SetTag("HTTP Response Byte Count", value["respbytecount"]) - span.SetTag("HTTP Session Duration", value["duration"]) span.Finish() delete(sessionMap, key) } @@ -160,12 +266,15 @@ func print_network_traces(tracer opentracing.Tracer) { func handle_skb_event(data *[]byte, node_name string, pod_name string, session_table *bcc.Table, monitoring_info *monitoring_info_t, - egress_match_list []egress_match_t) (error) { + egress_match_list []egress_match_t, + svc_name string) (error) { //fmt.Printf("monitoring info has %v\n", monitoring_info) - fmt.Printf("\n\nnode[%s] pod[%s]\n%s", node_name, pod_name, hex.Dump(*data)) + fmt.Printf("\n\nnode[%s] pod[%s]\n%s\n", node_name, pod_name, hex.Dump(*data)) + var ipproto layers.IPProtocol var src_ip, dst_ip uint32 var src_port, dst_port uint16 var session_key, src_ip_str, dst_ip_str string + is_req := false proto := HTTP is_ingress:= binary.LittleEndian.Uint32((*data)[0:4]) packet := gopacket.NewPacket((*data)[4:len(*data)], @@ -175,6 +284,7 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string, ipv4, _ := ipv4_layer.(*layers.IPv4) src_ip_str = ipv4.SrcIP.String() dst_ip_str = ipv4.DstIP.String() + ipproto = ipv4.Protocol fmt.Printf("Source: %s Dest: %s\n", src_ip_str, dst_ip_str) // Note: the src_ip and dst_ip var here are ONLY being used as // lookup key to eBPF hash table, hence preserving network @@ -210,79 +320,128 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string, break } } - app_layer := packet.ApplicationLayer() - if app_layer == nil { - fmt.Printf("No application layer, TCP packet\n") - proto = TCP - } if dst_port == uint16(monitoring_info.port_num) || egress_port_req { - session_key = fmt.Sprintf("%x.%x:%d:%d", src_ip, dst_ip, src_port, - dst_port) - if _, ok := sessionMap[session_key]; !ok { - sessionMap[session_key] = session_info_t{} - sess_map := sessionMap[session_key] - sess_map.session = make(map[string]string) - sess_map.buf = []byte{} - sessionMap[session_key] = sess_map - zero := strconv.Itoa(0) - sessionMap[session_key].session["reqpakcount"] = zero - sessionMap[session_key].session["reqbytecount"] = zero - sessionMap[session_key].session["resppakcount"] = zero - sessionMap[session_key].session["respbytecount"] = zero + is_req = true + } + if is_req { + session_key = fmt.Sprintf("%x:%x:%d:%d:%d", src_ip, dst_ip, ipproto, + src_port, dst_port) + } else { + session_key = fmt.Sprintf("%x:%x:%d:%d:%d", dst_ip, src_ip, ipproto, + dst_port, src_port) + } + var sess_map *session_info_t + if _, ok := sessionMap[session_key]; !ok { + sess_map = &session_info_t{} + sess_map.done = false + sess_map.service = svc_name + sess_map.generalInfo = make(map[string]string) + sess_map.traces = []map[string]string{} + sess_map.reqBuf = []byte{} + sess_map.respBuf = []byte{} + sessionMap[session_key] = sess_map + zero := strconv.Itoa(0) + sessionMap[session_key].generalInfo["reqpakcount"] = zero + sessionMap[session_key].generalInfo["reqbytecount"] = zero + sessionMap[session_key].generalInfo["resppakcount"] = zero + sessionMap[session_key].generalInfo["respbytecount"] = zero + sessionMap[session_key].generalInfo["nodename"] = node_name + sessionMap[session_key].generalInfo["podname"] = pod_name + } else { + sess_map = sessionMap[session_key] + } + + curr_pak_count := 0 + curr_byte_count := 0 + map_val := sess_map.generalInfo + if is_req { + curr_pak_count, _ = strconv.Atoi(map_val["reqpakcount"]) + curr_byte_count, _ = strconv.Atoi(map_val["reqbytecount"]) + } else { + curr_pak_count, _ = strconv.Atoi(map_val["resppakcount"]) + curr_byte_count, _ = strconv.Atoi(map_val["respbytecount"]) + } + curr_pak_count++ + curr_byte_count += len(packet.Data()) + if is_req { + map_val["reqpakcount"] = strconv.Itoa(curr_pak_count) + map_val["reqbytecount"] = strconv.Itoa(curr_byte_count) + } else { + map_val["resppakcount"] = strconv.Itoa(curr_pak_count) + map_val["respbytecount"] = strconv.Itoa(curr_byte_count) + } + + if is_req { + // TODO (s3wong): just do IPv4 and TCP without using the plugin for now + // the condition check itself is cheating also... + if len(sess_map.traces) <= 1 { + ipv4Map := make(map[string]string) + ipv4Map["protocol"] = "IPv4" + ipv4Map["srcip"] = src_ip_str + ipv4Map["dstip"] = dst_ip_str + sess_map.traces = append(sess_map.traces, ipv4Map) + tcpMap := make(map[string]string) + tcpMap["protocol"] = "TCP" + tcpMap["srcport"] = fmt.Sprintf("%d", src_port) + tcpMap["dstport"] = fmt.Sprintf("%d", dst_port) + sess_map.traces = append(sess_map.traces, tcpMap) } - map_val := sessionMap[session_key].session - map_val["nodename"] = node_name - map_val["podname"] = pod_name - map_val["srcip"] = src_ip_str - map_val["dstip"] = dst_ip_str - map_val["srcport"] = fmt.Sprintf("%d", src_port) - map_val["dstport"] = fmt.Sprintf("%d", dst_port) - curr_pak_count, _ := strconv.Atoi(map_val["reqpakcount"]) - curr_byte_count, _ := strconv.Atoi(map_val["reqbytecount"]) - curr_pak_count++ - if proto == HTTP { - curr_byte_count += len(app_layer.Payload()) - reader := bytes.NewReader(app_layer.Payload()) - buf := bufio.NewReader(reader) - req, err := http.ReadRequest(buf) - if err != nil { - fmt.Printf("Request error: ") - fmt.Println(err) - } else if req == nil { - fmt.Println("request is nil") - } else { - fmt.Printf("HTTP Request Method %s url %v proto %v\n", - req.Method, req.URL, req.Proto) - map_val["reqmethod"] = req.Method - map_val["requrl"] = fmt.Sprintf("%v", req.URL) - map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) - if user_agent := req.UserAgent(); len(user_agent) > 0 { - map_val["useragent"] = user_agent - } - if len(req.Host) > 0 { - map_val["host"] = req.Host - } - header := req.Header - if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { - map_val["requestid"] = req_id - } - if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { - map_val["envoydecorator"] = envoy_dec - } - if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { - map_val["traceid"] = trace_id - } - if _, ok := map_val["respstatus"]; ok { - map_val["done"] = "true" + } + + var dataptr []byte + app_layer := packet.ApplicationLayer() + errStr := "" + if app_layer != nil { + if is_req { + dataptr = append(sess_map.reqBuf, app_layer.Payload()...) + } else { + dataptr = append(sess_map.respBuf, app_layer.Payload()...) + } + for _, protocol := range monitoring_info.protocols { + if _, ok := protocolParser[protocol]; ok { + parser := protocolParser[protocol] + new_dataptr, parseMap := parser.Parse(session_key, is_req, + dataptr) + if parseMap != nil { + protocolTag := strings.ToUpper(protocol) + merged := false + for _, existing := range sess_map.traces { + if existing["protocol"] == protocolTag { + for k, v := range parseMap { + existing[k] = v + } + merged = true + break + } + } + if !merged { + parseMap["protocol"] = strings.ToUpper(protocol) + sess_map.traces = append(sess_map.traces, parseMap) + } + dataptr = new_dataptr + } else { + // offset to packet is off, no need to continue + // parsing, return error + errStr = fmt.Sprintf("Error: unable to parse protocol %v", protocol) + fmt.Println(errStr) + //return errors.New(errStr) + break } } + } + } else { + fmt.Printf("No application layer, TCP packet\n") + return nil + } + + if len(errStr) > 0 { + // buffer + if is_req { + sess_map.reqBuf = append([]byte(nil), dataptr...) } else { - // TODO(s3wong): TCP assumed for now - curr_byte_count += (len(*data) - 4) + sess_map.respBuf = append([]byte(nil), dataptr...) } - map_val["reqpakcount"] = strconv.Itoa(curr_pak_count) - map_val["reqbytecount"] = strconv.Itoa(curr_byte_count) - fmt.Printf("Current session packet count %v and byte count %v\n", map_val["reqpakcount"], map_val["reqbytecount"]) + //sessionMap[session_key] = sess_map } else { session_key := session_key_t { src_ip: dst_ip, @@ -304,7 +463,7 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string, var duration uint64 = 0 leaf_buf := bytes.NewBuffer(leaf) if leaf_buf == nil { - fmt.Println("Error: leaf is nil") + fmt.Println("Error: unable to allocate new byte buffer") return nil } session := session_t{} @@ -314,89 +473,31 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string, return nil } if session.Resp_time == 0 { - fmt.Printf("session response time not set?\n") + fmt.Printf("session response time not set yet\n") } else { duration = (session.Resp_time - session.Req_time)/1000 - fmt.Printf("Leaf %v\n", leaf) + fmt.Printf("session time : %v\n", session) fmt.Printf("Duration: %d usec\n", duration) } - sess_key := fmt.Sprintf("%x.%x:%d:%d", dst_ip, src_ip, - dst_port, src_port) - if _, ok := sessionMap[sess_key]; !ok { - //sessionMap[sess_key] = make(map[string]string) - sessionMap[sess_key] = session_info_t{} - sess_map := sessionMap[sess_key] - sess_map.session = make(map[string]string) - sess_map.buf = []byte{} - sessionMap[sess_key] = sess_map - zero := strconv.Itoa(0) - sessionMap[sess_key].session["reqpakcount"] = zero - sessionMap[sess_key].session["reqbytecount"] = zero - sessionMap[sess_key].session["resppakcount"] = zero - sessionMap[sess_key].session["respbytecount"] = zero - } - var map_val = sessionMap[sess_key].session - map_val["nodename"] = node_name - map_val["podname"] = pod_name - map_val["srcip"] = dst_ip_str - map_val["dstip"] = src_ip_str - map_val["srcport"] = fmt.Sprintf("%d", dst_port) - map_val["dstport"] = fmt.Sprintf("%d", src_port) map_val["duration"] = fmt.Sprintf("%v usec", duration) - curr_pak_count, _ := strconv.Atoi(map_val["resppakcount"]) - curr_byte_count, _ := strconv.Atoi(map_val["respbytecount"]) - curr_pak_count++ - - if proto == HTTP { - curr_byte_count += len(app_layer.Payload()) - reader := bytes.NewReader(app_layer.Payload()) - buf := bufio.NewReader(reader) - resp, err := http.ReadResponse(buf, nil) - read_http := true - if err != nil { - fmt.Printf("Response error: ") - fmt.Println(err) - sess_map := sessionMap[sess_key] - sess_map.buf = append(sess_map.buf, app_layer.Payload()...) - reader = bytes.NewReader(sess_map.buf) - buf = bufio.NewReader(reader) - resp, err = http.ReadResponse(buf, nil) - if err != nil || resp == nil { - if err != nil { - fmt.Printf("Response error: %v\n", err) - } - read_http = false - } - sessionMap[sess_key] = sess_map - } else if resp == nil { - fmt.Println("response is nil") - read_http = false - } - if read_http { - fmt.Printf("HTTP Response Status %v code %v Proto %v\n", - resp.Status, resp.StatusCode, resp.Proto) - map_val["respstatus"] = resp.Status - map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode) - map_val["respproto"] = fmt.Sprintf("%v", resp.Proto) - //map_val["duration"] = fmt.Sprintf("%v usec", duration) - /* - if _, ok := map_val["reqmethod"]; ok { - map_val["done"] = "true" - } - */ - } - if resp != nil { - resp.Body.Close() - } + + node, node_session, err := getNodeIntfSession(session_key) + if err == nil { + map_val["node-interface"] = node + map_val["node-request-ts"] = fmt.Sprintf("%v", node_session.Req_time) + map_val["node-response-ts"] = fmt.Sprintf("%v", node_session.Resp_time) + delNodeIntfSession(node, key) } else { - // TODO(s3wong): TCP assumed for now - curr_byte_count += (len(*data) - 4) + fmt.Printf("Session not found in any node interface... posssibly local?") } - map_val["resppakcount"] = strconv.Itoa(curr_pak_count) - map_val["respbytecount"] = strconv.Itoa(curr_byte_count) - fmt.Printf("Current session packet count %v and byte count %v\n", map_val["resppakcount"], map_val["respbytecount"]) + if duration > 0 { - map_val["done"] = "true" + sess_map.done = true + err := session_table.Delete(key) + if err != nil { + fmt.Printf("Error deleting key %v: %v\n", key, err) + return err + } } } } @@ -441,6 +542,207 @@ func setEgressTable(egress_table *bcc.Table, return nil } +var nodeintfFilterList = [...]string {"lo", "veth", "docker", "flannel"} + +func filterNodeIntf(intf string) bool { + for _, substring := range nodeintfFilterList { + if strings.Contains(intf, substring) { + return false + } + } + return true +} + +type nodeIntf struct { + bpfMod *bcc.Module + ipTrackTable *bcc.Table + sessionTable *bcc.Table +} + +var nodeIntfMap = map[string]*nodeIntf{} + +func setupNodeIntf(ifindex int) (*nodeIntf, error) { + buf, err := ioutil.ReadFile("libclovisor/ebpf/node_interface.c") + if err != nil { + fmt.Println(err) + return nil, err + } + code := string(buf) + + bpf_mod := bcc.NewModule(code, []string{}) + + ingress_fn, err := bpf_mod.Load("handle_ingress", + C.BPF_PROG_TYPE_SCHED_CLS, + 1, 65536) + if err != nil { + fmt.Printf("Failed to load node interface ingress func: %v\n", err) + return nil, err + } + + egress_fn, err := bpf_mod.Load("handle_egress", + C.BPF_PROG_TYPE_SCHED_CLS, + 1, 65536) + if err != nil { + fmt.Printf("Failed to load node interface egress func: %v\n", err) + return nil, err + } + + ip_track_table := bcc.NewTable(bpf_mod.TableId("ip2track"), bpf_mod) + node_sess_table := bcc.NewTable(bpf_mod.TableId("node_sessions"), bpf_mod) + + // check if qdisc clsact filter for this interface already exists + link, err := netlink.LinkByIndex(ifindex) + if err != nil { + fmt.Println(err) + } else { + qdiscs, err := netlink.QdiscList(link) + if err == nil { + for _, qdisc_ := range qdiscs { + if qdisc_.Type() == "clsact" { + netlink.QdiscDel(qdisc_) + break + } + } + } + } + + attrs := netlink.QdiscAttrs { + LinkIndex: ifindex, + Handle: netlink.MakeHandle(0xffff, 0), + Parent: netlink.HANDLE_CLSACT, + } + + qdisc := &netlink.GenericQdisc { + QdiscAttrs: attrs, + QdiscType: "clsact", + } + + if err := netlink.QdiscAdd(qdisc); err != nil { + fmt.Println(err) + return nil, err + } + + ingress_filter_attrs := netlink.FilterAttrs{ + LinkIndex: ifindex, + Parent: netlink.MakeHandle(0xffff, 0xfff3), + Priority: 1, + Protocol: unix.ETH_P_ALL, + } + ingress_filter := &netlink.BpfFilter{ + FilterAttrs: ingress_filter_attrs, + Fd: ingress_fn, + Name: "handle_ingress", + DirectAction: true, + } + if ingress_filter.Fd < 0 { + fmt.Println("Failed to load node interface ingress bpf program") + return nil, err + } + + if err := netlink.FilterAdd(ingress_filter); err != nil { + fmt.Println(err) + return nil, err + } + + egress_filter_attrs := netlink.FilterAttrs{ + LinkIndex: ifindex, + Parent: netlink.MakeHandle(0xffff, 0xfff2), + Priority: 1, + Protocol: unix.ETH_P_ALL, + } + egress_filter := &netlink.BpfFilter{ + FilterAttrs: egress_filter_attrs, + Fd: egress_fn, + Name: "handle_egress", + DirectAction: true, + } + if egress_filter.Fd < 0 { + fmt.Println("Failed to load node interface egress bpf program") + return nil, err + } + + if err := netlink.FilterAdd(egress_filter); err != nil { + fmt.Println(err) + return nil, err + } + + return &nodeIntf{ + bpfMod: bpf_mod, + ipTrackTable: ip_track_table, + sessionTable: node_sess_table, + }, nil +} + +func ClovisorPhyInfSetup() error { + intfList, err := net.Interfaces() + if err != nil { + fmt.Printf("Failed to get node interfaces: %v\n", err) + return err + } + + for _, f := range intfList { + if !filterNodeIntf(f.Name) { + continue + } + fmt.Printf("Tracking node interface %v w/ index %v\n", f.Name, f.Index) + bpf_node_intf, err := setupNodeIntf(f.Index) + if err != nil { + fmt.Printf("Failed to set up node interface %v: %v\n", f.Name, err) + return err + } + nodeIntfMap[f.Name] = bpf_node_intf + } + return nil +} + +func setIPTrackingTable(table *bcc.Table, ipaddr uint32, action int) error { + key, _ := table.KeyStrToBytes(strconv.Itoa(int(ipaddr))) + leaf, _ := table.LeafStrToBytes(strconv.Itoa(action)) + if err := table.Set(key, leaf); err != nil { + fmt.Printf("Failed to set IP tracking table: %v\n", err) + return err + } + dumpBPFTable(table) + return nil +} + +func setNodeIntfTrackingIP(ipaddr uint32) { + for name, node_intf := range nodeIntfMap { + err := setIPTrackingTable(node_intf.ipTrackTable, ipaddr, 1) + if err != nil { + fmt.Printf("Failed to add ip address %v to node interface %v: %v\n", backtoIP4(int64(ipaddr)), name, err) + } + } +} + +func getNodeIntfSession(session_key session_key_t) (string, *session_t, error) { + key_buf := &bytes.Buffer{} + binary.Write(key_buf, binary.LittleEndian, session_key) + key := append([]byte(nil), key_buf.Bytes()...) + + for node, node_intf := range nodeIntfMap { + fmt.Printf("For node interface %v... ", node) + //dumpBPFTable(node_intf.sessionTable) + if leaf, err := node_intf.sessionTable.Get(key); err == nil { + leaf_buf := bytes.NewBuffer(leaf) + session := session_t{} + binary.Read(leaf_buf, binary.LittleEndian, &session) + return node, &session, nil + } + } + return "", nil, errors.New("session not found") +} + +func delNodeIntfSession(node_iname string, key []byte) error { + nodeIntf := nodeIntfMap[node_iname] + err := nodeIntf.sessionTable.Delete(key) + if err != nil { + fmt.Printf("Error deleting session %v from node interface %v: %v\n", + key, node_iname, err) + } + return err +} + func ClovisorNewPodInit(k8s_client *ClovisorK8s, node_name string, pod_name string, @@ -457,7 +759,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s, return nil, err } - sessionMap = map[string]session_info_t{}; + sessionMap = map[string]*session_info_t{}; fmt.Printf("Beginning network tracing for pod %v\n", pod_name) @@ -492,7 +794,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s, traffic_table := bcc.NewTable(bpf_mod.TableId("dports2proto"), bpf_mod) if err := setTrafficTable(traffic_table, int(monitoring_info.port_num), - monitoring_info.protocol, true); + monitoring_info.protocols[0], true); err != nil { fmt.Printf("Error on setting traffic port") return nil, err @@ -587,6 +889,8 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s, return nil, err } + setNodeIntfTrackingIP(ip2Long(monitoring_info.pod_ip)) + table := bcc.NewTable(bpf_mod.TableId("skb_events"), bpf_mod) skb_rev_chan := make(chan []byte) @@ -597,17 +901,20 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s, return nil, err } - tracer, closer := initJaeger(monitoring_info.svc_name) - ticker := time.NewTicker(500 * time.Millisecond) stop := make(chan bool) go func() { + fmt.Printf("Start tracing to Jaeger with service %v\n", monitoring_info.svc_name) + tracer, closer := initJaeger(monitoring_info.svc_name) + tracerMap[monitoring_info.svc_name] = tracer + ticker := time.NewTicker(500 * time.Millisecond) for { select { case <- ticker.C: - print_network_traces(tracer) + print_network_traces() case data := <-skb_rev_chan: err = handle_skb_event(&data, node_name, pod_name, session_table, - monitoring_info, egress_match_list) + monitoring_info, egress_match_list, + monitoring_info.svc_name) if err != nil { fmt.Printf("failed to decode received data: %s\n", err) } diff --git a/clover/clovisor/libclovisor/clovisor_cfg.go b/clover/clovisor/libclovisor/clovisor_cfg.go index f3c631a..9c552da 100644 --- a/clover/clovisor/libclovisor/clovisor_cfg.go +++ b/clover/clovisor/libclovisor/clovisor_cfg.go @@ -25,6 +25,8 @@ import ( var redisServer string = "redis.clover-system" var jaegerCollector string = "jaeger-collector.clover-system:14268" var jaegerAgent string = "jaeger-agent.clover-system:6831" +var ProtoCfg string = "clovisor_proto_cfg" +var protoPluginCfgChan string = "clovisor_proto_plugin_cfg_chan" /* * redisConnect: redis client connecting to redis server @@ -40,6 +42,7 @@ func redisConnect() *redis.Client { func get_cfg_labels(node_name string) ([]string, error) { client := redisConnect() + defer client.Close() labels_list, err := client.LRange("clovisor_labels", 0, -1).Result() if err != nil { fmt.Println(err.Error()) @@ -51,6 +54,7 @@ func get_cfg_labels(node_name string) ([]string, error) { func get_egress_match_list(pod_name string) ([]egress_match_t) { client := redisConnect() + defer client.Close() egress_cfg_list, err := client.LRange("clovior_egress_match", 0, -1).Result() if err != nil { fmt.Println(err.Error()) @@ -85,13 +89,24 @@ func get_egress_match_list(pod_name string) ([]egress_match_t) { // https://www.socketloop.com/tutorials/golang-convert-ip-address-string-to-long-unsigned-32-bit-integer func ip2Long(ip string) uint32 { var long uint32 - binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long) + //binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long) + binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.BigEndian, &long) return long } +func backtoIP4(ipInt int64) string { + // need to do two bit shifting and “0xff” masking + b0 := strconv.FormatInt((ipInt>>24)&0xff, 10) + b1 := strconv.FormatInt((ipInt>>16)&0xff, 10) + b2 := strconv.FormatInt((ipInt>>8)&0xff, 10) + b3 := strconv.FormatInt((ipInt & 0xff), 10) + return b0 + "." + b1 + "." + b2 + "." + b3 +} + func get_cfg_session_match() ([]egress_match_cfg, error) { var ret_list []egress_match_cfg client := redisConnect() + defer client.Close() keys, err := client.HKeys("clovisor_session_match").Result() if err != nil { fmt.Println(err.Error()) @@ -140,5 +155,27 @@ func initJaeger(service string) (opentracing.Tracer, io.Closer) { func get_jaeger_server() (string, error) { client := redisConnect() + defer client.Close() return client.Get("clovisor_jaeger_server").Result() } + +func Monitor_proto_plugin_cfg() { + client := redisConnect() + //defer client.Close() + + pubsub := client.Subscribe(protoPluginCfgChan) + //defer pubsub.Close() + + go func() { + for { + msg, err := pubsub.ReceiveMessage() + if err != nil { + fmt.Printf("Error getting protocol plugin configuration: %v\n", err) + return + } + + fmt.Printf("Update on protocol %v notification received\n", msg.Payload) + loadProtoParser(msg.Payload, true) + } + }() +} diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go index 8f4b481..9587437 100644 --- a/clover/clovisor/libclovisor/clovisor_k8s.go +++ b/clover/clovisor/libclovisor/clovisor_k8s.go @@ -33,8 +33,9 @@ type monitoring_info_t struct { svc_name string pod_name string container_name string - protocol string + protocols []string port_num uint32 + pod_ip string } var DEFAULT_NAMESPACE = "default" @@ -87,7 +88,10 @@ func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*mon func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service, namespace string) (*core_v1.PodList, error) { set := labels.Set(svc.Spec.Selector) + //label := strings.Split(set.AsSelector().String(), ",")[0] + //fmt.Printf("Trying to get pods for service %v with label %v from %v\n", svc.GetName(), label, set.AsSelector().String()) listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()} + //listOptions := metav1.ListOptions{LabelSelector: label} return client.client.CoreV1().Pods(namespace).List(listOptions) } @@ -104,7 +108,8 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string, monitoring_info := make(map[string]*monitoring_info_t) if len(labels_list) == 0 { // TODO(s3wong): set it to 'default' - namespace = "linux-foundation-gke" + //namespace = "linux-foundation-gke" + namespace = "default" if svcs_list, err := client.client.CoreV1().Services(namespace).List(metav1.ListOptions{}); err != nil { @@ -151,14 +156,25 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string, for ns, svc_slice := range ns_svc_map { for _, svc_list_ := range svc_slice { for _, svc := range svc_list_.Items { - fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName()) - var svc_port_map = map[string]core_v1.ServicePort{} + if ns == "default" && svc.GetName() == "kubernetes" { + continue + } + //fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName()) + //var svc_port_map = map[string]core_v1.ServicePort{} + var svc_port_map = map[string][]string{} for _, svc_port := range svc.Spec.Ports { if len(svc_port.Name) > 0 { - for _, sp := range SUPPORTED_PROTOCOLS { - if strings.Contains(svc_port.Name, sp) { - target_port := svc_port.TargetPort.String() - svc_port_map[target_port] = svc_port + svc_protos := strings.Split(svc_port.Name, "-") + for _, proto := range svc_protos { + if err := loadProtoParser(proto, false); err == nil { + for _, sp := range SUPPORTED_PROTOCOLS { + if strings.Contains(proto, sp) { + target_port := svc_port.TargetPort.String() + svc_port_map[target_port] = append(svc_port_map[target_port], proto) + } + } + } else { + fmt.Printf("Unsupported protocol: %v\n", proto) } } } @@ -166,13 +182,23 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string, if len(svc_port_map) == 0 { fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName()) continue + } else { + fmt.Printf("svc_port_map for service %v is %v\n", svc.GetName(), svc_port_map) } - fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName()) + //fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName()) pod_list, err := client.getPodsForSvc(&svc, ns) if err != nil { fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err) continue } + /* + labelSet := labels.Set(svc.Spec.Selector) + pod_list, err := client.client.CoreV1().Pods(ns).List(metav1.ListOptions{}) + if err != nil { + fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err) + continue + } + */ for _, pod := range pod_list.Items { if pod.Spec.NodeName == nodeName { for _, container := range pod.Spec.Containers { @@ -191,12 +217,8 @@ func (client *ClovisorK8s) get_monitoring_pods(nodeName string, monitoring_info[pod_name].pod_name = pod_name monitoring_info[pod_name].container_name = container.Name monitoring_info[pod_name].port_num = port_num - svc_port_name := svc_port_map[tp_string].Name - if (strings.Contains(svc_port_name, "-")) { - monitoring_info[pod_name].protocol = svc_port_name[:strings.Index(svc_port_name, "-")] - } else { - monitoring_info[pod_name].protocol = svc_port_name - } + monitoring_info[pod_name].protocols = svc_port_map[tp_string] + monitoring_info[pod_name].pod_ip = pod.Status.PodIP } } } diff --git a/clover/clovisor/libclovisor/ebpf/node_interface.c b/clover/clovisor/libclovisor/ebpf/node_interface.c new file mode 100755 index 0000000..cd14a50 --- /dev/null +++ b/clover/clovisor/libclovisor/ebpf/node_interface.c @@ -0,0 +1,158 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 +#include <uapi/linux/if_ether.h> +#include <uapi/linux/in.h> +#include <uapi/linux/ip.h> +#include <uapi/linux/tcp.h> +#include <uapi/linux/pkt_cls.h> +#include <uapi/linux/bpf.h> + +#include <bcc/proto.h> + +#define MAX_SESSION_TABLE_ENTRIES 8192 + +typedef struct session_key_ { + u32 src_ip; + u32 dst_ip; + unsigned short src_port; + unsigned short dst_port; +} session_key_t; + +typedef struct session_ { + u64 req_time; + u64 resp_time; +} session_t; + +BPF_HASH(ip2track, u32, u32); +BPF_HASH(node_sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES); + +struct eth_hdr { + unsigned char h_dest[ETH_ALEN]; + unsigned char h_source[ETH_ALEN]; + unsigned short h_proto; +}; + +static inline int ipv4_hdrlen(struct iphdr *ip4) +{ + return ip4->ihl * 4; +} + +static inline int tcp_doff(struct tcphdr *tcp_hdr) +{ + return tcp_hdr->doff * 4; +} + +static inline void fill_up_sess_key(session_key_t *key, u32 src_ip, + u32 dst_ip, u16 src_port, u16 dst_port) +{ + key->src_ip = src_ip; + key->dst_ip = dst_ip; + key->src_port = src_port; + key->dst_port = dst_port; +} + +static inline void process_response(u32 src_ip, u32 dst_ip, u16 src_port, + u16 dst_port) +{ + session_key_t sess_key = {}; + session_t *session_ptr = NULL; + fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port); + session_ptr = node_sessions.lookup(&sess_key); + if (session_ptr != NULL) { + u64 resp_time = bpf_ktime_get_ns(); + session_t update_session = { + session_ptr->req_time, + resp_time + }; + node_sessions.update(&sess_key, &update_session); + } +} + +static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port, + u16 dst_port) +{ + session_key_t sess_key = {}; + session_t *session_ptr = NULL; + session_t new_session = { + bpf_ktime_get_ns(), + 0 + }; + fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port); + session_ptr = node_sessions.lookup(&sess_key); + if (! session_ptr) { + node_sessions.insert(&sess_key, &new_session); + } +} + +static inline void ingress_parsing(struct tcphdr *tcp_hdr, + struct iphdr *ipv4_hdr) +{ + unsigned int dst_ip = ntohl(ipv4_hdr->daddr); + int ret = 0; + + unsigned int *proto = ip2track.lookup(&dst_ip); + if (proto != NULL) { + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + } +} + +static inline void egress_parsing(struct tcphdr *tcp_hdr, + struct iphdr *ipv4_hdr) +{ + unsigned int src_ip = ntohl(ipv4_hdr->saddr); + + unsigned int *proto = ip2track.lookup(&src_ip); + + if (proto != NULL) { + process_request(ntohl(ipv4_hdr->saddr), + ntohl(ipv4_hdr->daddr), + ntohs(tcp_hdr->source), + ntohs(tcp_hdr->dest)); + } +} + +static inline int handle_packet(struct __sk_buff *skb, int is_ingress) +{ + void *data = (void *)(long)skb->data; + void *data_end = (void *)(long)skb->data_end; + struct eth_hdr *eth = data; + struct iphdr *ipv4_hdr = data + sizeof(*eth); + struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr); + + /* TODO(s3wong): assuming TCP only for now */ + /* single length check */ + if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end) + return TC_ACT_OK; + + if (eth->h_proto != htons(ETH_P_IP)) + return TC_ACT_OK; + + // TODO(s3wong): no support for IP options + if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5) + return TC_ACT_OK; + + if (is_ingress == 1) { + ingress_parsing(tcp_hdr, ipv4_hdr); + } else{ + egress_parsing(tcp_hdr, ipv4_hdr); + } + + return TC_ACT_OK; +} + +int handle_ingress(struct __sk_buff *skb) +{ + return handle_packet(skb, 1); +} + +int handle_egress(struct __sk_buff *skb) +{ + return handle_packet(skb, 0); +} diff --git a/clover/clovisor/libclovisor/libproto/build-plugin b/clover/clovisor/libclovisor/libproto/build-plugin new file mode 100755 index 0000000..743071b --- /dev/null +++ b/clover/clovisor/libclovisor/libproto/build-plugin @@ -0,0 +1,9 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +go build --buildmode=plugin -o ../../proto/http.so clovisor_http.go diff --git a/clover/clovisor/libclovisor/libproto/clovisor_http.go b/clover/clovisor/libclovisor/libproto/clovisor_http.go new file mode 100644 index 0000000..6440d5a --- /dev/null +++ b/clover/clovisor/libclovisor/libproto/clovisor_http.go @@ -0,0 +1,84 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package main + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +type httpparser string + +func (p httpparser) Parse(session_key string, + is_req bool, + data []byte) ([]byte, map[string]string) { + map_val := make(map[string]string) + reader := bytes.NewReader(data) + buf := bufio.NewReader(reader) + if is_req == true { + req, err := http.ReadRequest(buf) + if err != nil { + fmt.Printf("Request error: ") + fmt.Println(err) + return nil, nil + } else if req == nil { + fmt.Println("request is nil") + return nil, nil + } else { + fmt.Printf("HTTP Request Method %s url %v proto %v\n", + req.Method, req.URL, req.Proto) + map_val["reqmethod"] = req.Method + map_val["requrl"] = fmt.Sprintf("%v", req.URL) + map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) + if user_agent := req.UserAgent(); len(user_agent) > 0 { + map_val["useragent"] = user_agent + } + if len(req.Host) > 0 { + map_val["host"] = req.Host + } + header := req.Header + if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { + map_val["requestid"] = req_id + } + if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { + map_val["envoydecorator"] = envoy_dec + } + if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { + map_val["traceid"] = trace_id + } + body, err := ioutil.ReadAll(req.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } + } else { + // response + resp, err := http.ReadResponse(buf, nil) + if err != nil { + fmt.Printf("Response error: ") + fmt.Println(err) + return nil, nil + } + fmt.Printf("HTTP Response Status %v code %v Proto %v\n", + resp.Status, resp.StatusCode, resp.Proto) + map_val["respstatus"] = resp.Status + map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode) + map_val["respproto"] = fmt.Sprintf("%v", resp.Proto) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } +} + +var Parser httpparser diff --git a/clover/clovisor/libclovisor/libproto/http_alt.go b/clover/clovisor/libclovisor/libproto/http_alt.go new file mode 100644 index 0000000..f7581fb --- /dev/null +++ b/clover/clovisor/libclovisor/libproto/http_alt.go @@ -0,0 +1,93 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package main + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +type httpparser string + +func (p httpparser) Parse(session_key string, + is_req bool, + data []byte) ([]byte, map[string]string) { + map_val := make(map[string]string) + reader := bytes.NewReader(data) + buf := bufio.NewReader(reader) + if is_req == true { + req, err := http.ReadRequest(buf) + if err != nil { + fmt.Printf("Request error: ") + fmt.Println(err) + return nil, nil + } else if req == nil { + fmt.Println("request is nil") + return nil, nil + } else { + fmt.Printf("HTTP Request Method %s url %v proto %v\n", + req.Method, req.URL, req.Proto) + map_val["reqmethod"] = req.Method + map_val["requrl"] = fmt.Sprintf("%v", req.URL) + map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) + if user_agent := req.UserAgent(); len(user_agent) > 0 { + map_val["useragent"] = user_agent + } + if len(req.Host) > 0 { + map_val["host"] = req.Host + } + header := req.Header + if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { + map_val["requestid"] = req_id + } + if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { + map_val["envoydecorator"] = envoy_dec + } + if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { + map_val["traceid"] = trace_id + } + body, err := ioutil.ReadAll(req.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } + } else { + // response + resp, err := http.ReadResponse(buf, nil) + if err != nil { + fmt.Printf("Response error: ") + fmt.Println(err) + return nil, nil + } + fmt.Printf("HTTP Response Status %v code %v Proto %v\n", + resp.Status, resp.StatusCode, resp.Proto) + map_val["respstatus"] = resp.Status + map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode) + map_val["respproto"] = fmt.Sprintf("%v", resp.Proto) + header := resp.Header + //fmt.Printf("Response Header contains %v\n", header) + if contentType := header.Get("Content-Type"); len(contentType) > 0 { + map_val["content-type"] = contentType + } + if lastMod := header.Get("Last-Modified"); len(lastMod) > 0 { + map_val["last-modified"] = lastMod + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } +} + +var Parser httpparser diff --git a/clover/clovisor/proto/http.so b/clover/clovisor/proto/http.so Binary files differnew file mode 100644 index 0000000..1a61fb2 --- /dev/null +++ b/clover/clovisor/proto/http.so |