diff options
Diffstat (limited to 'clover/clovisor')
-rw-r--r-- | clover/clovisor/Dockerfile | 19 | ||||
-rwxr-xr-x | clover/clovisor/bin/clovisor | bin | 0 -> 46419408 bytes | |||
-rwxr-xr-x | clover/clovisor/build-docker | 20 | ||||
-rwxr-xr-x | clover/clovisor/build.sh | 49 | ||||
-rw-r--r-- | clover/clovisor/clovisor.yaml | 55 | ||||
-rw-r--r-- | clover/clovisor/clovisor_main.go | 77 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_bcc.go | 948 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_cfg.go | 189 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/clovisor_k8s.go | 272 | ||||
-rwxr-xr-x | clover/clovisor/libclovisor/ebpf/node_interface.c | 158 | ||||
-rwxr-xr-x | clover/clovisor/libclovisor/ebpf/session_tracking.c | 292 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/jaeger-all-in-one-template.yml | 151 | ||||
-rwxr-xr-x | clover/clovisor/libclovisor/libproto/build-plugin | 9 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/libproto/clovisor_http.go | 84 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/libproto/http_alt.go | 93 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/mongo.yaml | 41 | ||||
-rw-r--r-- | clover/clovisor/libclovisor/redis.yaml | 53 | ||||
-rw-r--r-- | clover/clovisor/proto/http.so | bin | 0 -> 11742704 bytes |
18 files changed, 2510 insertions, 0 deletions
diff --git a/clover/clovisor/Dockerfile b/clover/clovisor/Dockerfile new file mode 100644 index 0000000..06ddd23 --- /dev/null +++ b/clover/clovisor/Dockerfile @@ -0,0 +1,19 @@ +FROM ubuntu:18.04 + +# the following is the Linux version for GKE for k8s 1.11.4-gke.8 +ARG TARGET_KERNEL_VER="linux-headers-4.15.0-1023-gcp" + +RUN set -ex; \ + echo "deb [trusted=yes] http://repo.iovisor.org/apt/bionic bionic main" > /etc/apt/sources.list.d/iovisor.list; \ + apt-get update -y; \ + DEBIAN_FRONTEND=noninteractive apt-get install -y \ + auditd \ + bcc-tools \ + $TARGET_KERNEL_VER \ + libelf1; + +COPY . . +COPY bin/clovisor . +RUN chmod +x clovisor + +CMD ["./clovisor"] diff --git a/clover/clovisor/bin/clovisor b/clover/clovisor/bin/clovisor Binary files differnew file mode 100755 index 0000000..c601de3 --- /dev/null +++ b/clover/clovisor/bin/clovisor diff --git a/clover/clovisor/build-docker b/clover/clovisor/build-docker new file mode 100755 index 0000000..4f776ec --- /dev/null +++ b/clover/clovisor/build-docker @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +if [ -z "$1" ] + then + kernel_ver=linux-headers-`uname -r` + else + kernel_ver=$1 +fi +cp bin/clovisor . +docker build --build-arg TARGET_KERNEL_VER=$kernel_ver -t clovisor . +docker tag clovisor localhost:5000/clovisor +docker push localhost:5000/clovisor +#docker tag clovisor s3wong/clovisor +#docker push s3wong/clovisor diff --git a/clover/clovisor/build.sh b/clover/clovisor/build.sh new file mode 100755 index 0000000..95c0768 --- /dev/null +++ b/clover/clovisor/build.sh @@ -0,0 +1,49 @@ +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +GOVERSION=1.12 +OS=linux +ARCH=amd64 +GOPATH=/home/s3wong/go +GOLANGUNIXVERSION=release-branch.go1.11 +CLIENTGOVERSION=v10.0.0 + +SRCDIR=`pwd` + +wget https://dl.google.com/go/go$GOVERSION.$OS-$ARCH.tar.gz +sudo tar -C /usr/local -xzf go$GOVERSION.$OS-$ARCH.tar.gz +export PATH=$PATH:/usr/local/go/bin +export PATH=$GOPATH/bin:$PATH + +sudo apt install -y gcc +sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys D4284CDD +echo "deb https://repo.iovisor.org/apt/bionic bionic main" | sudo tee /etc/apt/sources.list.d/iovisor.list +sudo apt-get update -y +sudo apt-get install -y bcc-tools libbcc-examples linux-headers-$(uname -r) + +go get github.com/google/gopacket +go get github.com/iovisor/gobpf +go get github.com/opentracing/opentracing-go +go get github.com/pkg/errors +go get github.com/go-redis/redis +go get github.com/uber/jaeger-client-go +go get github.com/vishvananda/netlink +go get github.com/vishvananda/netns +go get golang.org/x/sys/unix +cd $GOPATH/src/golang.org/x/sys/unix +git checkout $GOLANGUNIXVERSION + +go get github.com/tools/godep +go get k8s.io/client-go/... +cd $GOPATH/src/k8s.io/client-go +git checkout $CLIENTGOVERSION +godep restore ./... + +#cd $SRCDIR/libclovisor +#go build . +#cd ../ +#go build -o clovisor . diff --git a/clover/clovisor/clovisor.yaml b/clover/clovisor/clovisor.yaml new file mode 100644 index 0000000..7d5e43b --- /dev/null +++ b/clover/clovisor/clovisor.yaml @@ -0,0 +1,55 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: clovisor + labels: + name: clovisor +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: clovisor + namespace: clovisor +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: serv-account-rbac-clovisor +subjects: + - kind: ServiceAccount + # Reference to upper's `metadata.name` + name: default + # Reference to upper's `metadata.namespace` + namespace: clovisor +roleRef: + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: clovisor + namespace: clovisor +spec: + selector: + matchLabels: + app: clovisor + template: + metadata: + name: clovisor + labels: + app: clovisor + spec: + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + containers: + - name: clovisor + image: localhost:5000/clovisor + securityContext: + privileged: true + env: + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName diff --git a/clover/clovisor/clovisor_main.go b/clover/clovisor/clovisor_main.go new file mode 100644 index 0000000..b8e6508 --- /dev/null +++ b/clover/clovisor/clovisor_main.go @@ -0,0 +1,77 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package main + +import ( + "fmt" + "io/ioutil" + "os" + "os/signal" + "path/filepath" + "syscall" + + clovisor "./libclovisor" +) + +var podMonitoringMap map[string]*clovisor.ClovisorBCC + +func main() { + node_name := os.Getenv("MY_NODE_NAME") + + ex, err := os.Executable() + if err != nil { + fmt.Println(err.Error()) + } else { + exPath := filepath.Dir(ex) + fmt.Printf("Current Working Directory is %v\n", exPath) + files, _ := ioutil.ReadDir(exPath) + for _, f := range files { + fmt.Printf("%v ",f.Name()) + } + fmt.Printf("\n") + } + + clovisor.Monitor_proto_plugin_cfg() + + clovisor.ClovisorPhyInfSetup() + + podMonitoringMap = make(map[string]*clovisor.ClovisorBCC) + + clovisor_k8s_client, err := clovisor.K8s_client_init(node_name) + if err != nil { + fmt.Printf("Clovisor to Kubernetes connectivity failed: %v\n", err) + return + } + fmt.Printf("Clovisor got k8s client succeed\n") + + monitoring_info_map, err := clovisor_k8s_client.Get_monitoring_info(node_name) + if err != nil { + fmt.Printf("Clovisor getting monitoring info failed: %v\n", err) + return + } + fmt.Printf("Clovisor get monitoring info succeed: %v\n", monitoring_info_map) + + for pod := range monitoring_info_map { + podMon, err := clovisor.ClovisorNewPodInit(clovisor_k8s_client, node_name, + pod, monitoring_info_map[pod]) + if err != nil { + fmt.Printf("Clovisor monitoring pod %s failed: %v\n", pod, err) + continue + } + podMonitoringMap[pod] = podMon + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, os.Kill, syscall.SIGTERM) + <-sig + for pod := range podMonitoringMap { + fmt.Printf("Send stop pod to pod %v\n", pod) + podMonitoringMap[pod].StopPod() + } +} + diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go new file mode 100644 index 0000000..8f9a7bf --- /dev/null +++ b/clover/clovisor/libclovisor/clovisor_bcc.go @@ -0,0 +1,948 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package clovisor + +import ( + "encoding/hex" + //"encoding/json" + "bytes" + "encoding/binary" + "errors" + "fmt" + "io/ioutil" + "net" + //"net/http" + "plugin" + "strconv" + "strings" + "time" + + //"github.com/go-redis/redis" + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/iovisor/gobpf/bcc" + opentracing "github.com/opentracing/opentracing-go" + "github.com/vishvananda/netlink" + + "golang.org/x/sys/unix" +) + +/* +#cgo CFLAGS: -I/usr/include/bcc/compat +#cgo LDFLAGS: -lbcc +#include <bcc/bcc_common.h> +#include <bcc/libbpf.h> +*/ +import "C" + +type Parser interface { + Parse(session_key string, is_req bool, data []byte)([]byte, map[string]string) +} + +type ClovisorBCC struct { + stopChan chan bool + // TODO(s3wong): remove once k8s watcher available + qdisc *netlink.GenericQdisc +} + +type session_key_t struct { + src_ip uint32 + dst_ip uint32 + src_port uint16 + dst_port uint16 +} + +type session_t struct { + Req_time uint64 + Resp_time uint64 +} + +type egress_match_t struct { + dst_ip uint32 + dst_port uint16 +} + +type egress_match_cfg struct { + egress_match egress_match_t + action string + params string +} + +type session_info_t struct { + done bool + service string + generalInfo map[string]string + traces []map[string]string + reqBuf []byte + respBuf []byte +} + +const ( + HTTP = 1 << iota + HTTP2 = 1 << iota + TCP = 1 << iota + UDP = 1 << iota +) + +//var sessionMap map[string]map[string]string; +var sessionMap map[string]*session_info_t; +var protocolParser = map[string]Parser{}; +var defaultModPath = map[string]string{ + "http": "/proto/http.so", +} +var tracerMap = map[string]opentracing.Tracer{}; + +var veth_ifidx_command = "cat /sys/class/net/eth0/iflink"; + +var protocolMap = map[string]int{ + "http": 1, + "http2": 2, + "tcp": 3, + "udp": 4, +} + +var traceTable string = "NetworkTraces" + +/* + * redisConnect: redis client connecting to redis server +func redisConnect() *redis.Client { + client := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:6379", redisServer), + Password: "", + DB: 0, + }) + return client +} + */ + +func linkSetup(ifname string) netlink.Link { + link, err := netlink.LinkByName(ifname) + netlink.LinkSetUp(link) + if err != nil { + fmt.Println(err) + return nil + } + return link +} + +/* + * dumpBPFTable: for debug purpose + */ +func dumpBPFTable(table *bcc.Table) { + iterator := table.Iter() + if iterator == nil { + fmt.Printf("Table %v does not exist\n", table.Name()) + } else { + for iterator.Next() { + key_str, _ := table.KeyBytesToStr(iterator.Key()) + leaf_str, _ := table.LeafBytesToStr(iterator.Leaf()) + fmt.Printf("table %v key: %v leaf: %v\n", table.Name(), key_str, leaf_str) + } + } +} + +func loadProtoParser(protocol string, update bool) error { + var modPath = "" + + if !update { + if _, ok := protocolParser[protocol]; ok { + fmt.Printf("Found parse function for protocol %s\n", protocol) + return nil + } + } + + client := redisConnect() + + redisResult := client.HGet(ProtoCfg, protocol) + if redisResult.Err() == nil { + if len(redisResult.Val()) > 0 { + modPath = redisResult.Val() + } + } + if len(modPath) == 0 { + if _, ok := defaultModPath[protocol]; ok { + modPath = defaultModPath[protocol] + } else { + return errors.New(fmt.Sprintf("Unable to find module path for protocol %s", protocol)) + } + } + + fmt.Printf("Loading plugin for protocol %v with %v\n", protocol, modPath) + + plug, err := plugin.Open(modPath) + if err != nil { + fmt.Println(err) + return err + } + + symParse, err := plug.Lookup("Parser") + if err != nil { + fmt.Println(err) + return err + } + + var parser Parser + parser, ok := symParse.(Parser) + if !ok { + fmt.Printf("Unexpected type from mod %s symbol parse\n", modPath) + return errors.New(fmt.Sprintf("Wrong type for func parse from %s", modPath)) + } + + protocolParser[protocol] = parser + return nil +} + +func print_network_traces() { + /* + client := redisConnect() + + traces, err := client.HGetAll(traceTable).Result() + if err != nil { + fmt.Printf("Error retriving traces from redis: %v\n", err.Error()) + return + } + */ + /* + structure: + "done": "true", + "traces": array of protocol traces + [0] : "admin": map[string]string + [1] : "ipv4 or ipv6": map[string]string + [2] : "tcp or udp": map[string]string + [3] : "http"... + */ + /* + for key, value := range traces { + traceMap := map[string]interface{} + json.Unmarshal([]byte(value), &traceMap) + if _, ok := traceMap["done"]; ok { + span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key)) + for idx, trace := range traceMap["traces"] { + span.SetTag(fmt.Sprintf("protocol-%d", idx), trace['protocol']) + for tag, tagVal := range trace { + if tag == "protocol" { + continue + } + span.SetTag(tag, tagVal) + } + } + span.Finish() + ret := client.HDel(traceTable, key) + if ret.Err() != nil { + fmt.Printf("Error deleting %v from %v: %v\n", key, traceTable, ret.Err()) + } + } + } + */ + for key, value := range sessionMap { + if value.done { + tracer := tracerMap[value.service] + span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key)) + for genTag, genVal := range value.generalInfo { + fmt.Printf("general info writing %v: %v\n", genTag, genVal) + span.SetTag(genTag, genVal) + } + for idx, trace := range value.traces { + span.SetTag(fmt.Sprintf("protocol-%d", idx), trace["protocol"]) + for tag, tagVal := range trace { + if tag == "protocol" { + continue + } + fmt.Printf("%v writing %v: %v\n", trace["protocol"], tag, tagVal) + span.SetTag(tag, tagVal) + } + } + span.Finish() + delete(sessionMap, key) + } + } +} + +func handle_skb_event(data *[]byte, node_name string, pod_name string, + session_table *bcc.Table, + monitoring_info *monitoring_info_t, + egress_match_list []egress_match_t, + svc_name string) (error) { + //fmt.Printf("monitoring info has %v\n", monitoring_info) + fmt.Printf("\n\nnode[%s] pod[%s]\n%s\n", node_name, pod_name, hex.Dump(*data)) + var ipproto layers.IPProtocol + var src_ip, dst_ip uint32 + var src_port, dst_port uint16 + var session_key, src_ip_str, dst_ip_str string + is_req := false + proto := HTTP + is_ingress:= binary.LittleEndian.Uint32((*data)[0:4]) + packet := gopacket.NewPacket((*data)[4:len(*data)], + layers.LayerTypeEthernet, + gopacket.Default) + if ipv4_layer := packet.Layer(layers.LayerTypeIPv4); ipv4_layer != nil { + ipv4, _ := ipv4_layer.(*layers.IPv4) + src_ip_str = ipv4.SrcIP.String() + dst_ip_str = ipv4.DstIP.String() + ipproto = ipv4.Protocol + fmt.Printf("Source: %s Dest: %s\n", src_ip_str, dst_ip_str) + // Note: the src_ip and dst_ip var here are ONLY being used as + // lookup key to eBPF hash table, hence preserving network + // byte order + src_ip = binary.BigEndian.Uint32(ipv4.SrcIP) + dst_ip = binary.BigEndian.Uint32(ipv4.DstIP) + } + tcp_layer := packet.Layer(layers.LayerTypeTCP) + if tcp_layer != nil { + tcp, _ := tcp_layer.(*layers.TCP) + fmt.Printf("From src port %d to dst port %d [%v]: FIN:%v|SYN:%v|RST:%v|PSH:%v|ACK:%v|URG:%v|ECE:%v|CWR:%v|NS:%v\n", + tcp.SrcPort, tcp.DstPort, tcp.DataOffset, tcp.FIN, tcp.SYN, + tcp.RST, tcp.PSH, tcp.ACK, tcp.URG, tcp.ECE, tcp.CWR, tcp.NS) + //src_port := binary.LittleEndian.Uint16(uint16(tcp.SrcPort)) + //dst_port := binary.LittleEndian.Uint16(uint16(tcp.DstPort)) + src_port = uint16(tcp.SrcPort) + dst_port = uint16(tcp.DstPort) + } else { + fmt.Printf("Non-TCP packet, skip tracing...\n") + return nil + } + fmt.Printf("proto: %d is_ingress: %d data length %v\n", proto, is_ingress, len(*data)) + fmt.Println("dst_port is ", dst_port) + if dst_port == 0 { + return nil + } + // TODO(s3wong): dump table + dumpBPFTable(session_table) + egress_port_req := false + for _, port := range egress_match_list { + if port.dst_port == dst_port { + egress_port_req = true + break + } + } + if dst_port == uint16(monitoring_info.port_num) || egress_port_req { + is_req = true + } + if is_req { + session_key = fmt.Sprintf("%x:%x:%d:%d:%d", src_ip, dst_ip, ipproto, + src_port, dst_port) + } else { + session_key = fmt.Sprintf("%x:%x:%d:%d:%d", dst_ip, src_ip, ipproto, + dst_port, src_port) + } + var sess_map *session_info_t + if _, ok := sessionMap[session_key]; !ok { + sess_map = &session_info_t{} + sess_map.done = false + sess_map.service = svc_name + sess_map.generalInfo = make(map[string]string) + sess_map.traces = []map[string]string{} + sess_map.reqBuf = []byte{} + sess_map.respBuf = []byte{} + sessionMap[session_key] = sess_map + zero := strconv.Itoa(0) + sessionMap[session_key].generalInfo["reqpakcount"] = zero + sessionMap[session_key].generalInfo["reqbytecount"] = zero + sessionMap[session_key].generalInfo["resppakcount"] = zero + sessionMap[session_key].generalInfo["respbytecount"] = zero + sessionMap[session_key].generalInfo["nodename"] = node_name + sessionMap[session_key].generalInfo["podname"] = pod_name + } else { + sess_map = sessionMap[session_key] + } + + curr_pak_count := 0 + curr_byte_count := 0 + map_val := sess_map.generalInfo + if is_req { + curr_pak_count, _ = strconv.Atoi(map_val["reqpakcount"]) + curr_byte_count, _ = strconv.Atoi(map_val["reqbytecount"]) + } else { + curr_pak_count, _ = strconv.Atoi(map_val["resppakcount"]) + curr_byte_count, _ = strconv.Atoi(map_val["respbytecount"]) + } + curr_pak_count++ + curr_byte_count += len(packet.Data()) + if is_req { + map_val["reqpakcount"] = strconv.Itoa(curr_pak_count) + map_val["reqbytecount"] = strconv.Itoa(curr_byte_count) + } else { + map_val["resppakcount"] = strconv.Itoa(curr_pak_count) + map_val["respbytecount"] = strconv.Itoa(curr_byte_count) + } + + if is_req { + // TODO (s3wong): just do IPv4 and TCP without using the plugin for now + // the condition check itself is cheating also... + if len(sess_map.traces) <= 1 { + ipv4Map := make(map[string]string) + ipv4Map["protocol"] = "IPv4" + ipv4Map["srcip"] = src_ip_str + ipv4Map["dstip"] = dst_ip_str + sess_map.traces = append(sess_map.traces, ipv4Map) + tcpMap := make(map[string]string) + tcpMap["protocol"] = "TCP" + tcpMap["srcport"] = fmt.Sprintf("%d", src_port) + tcpMap["dstport"] = fmt.Sprintf("%d", dst_port) + sess_map.traces = append(sess_map.traces, tcpMap) + } + } + + var dataptr []byte + app_layer := packet.ApplicationLayer() + errStr := "" + if app_layer != nil { + if is_req { + dataptr = append(sess_map.reqBuf, app_layer.Payload()...) + } else { + dataptr = append(sess_map.respBuf, app_layer.Payload()...) + } + for _, protocol := range monitoring_info.protocols { + if _, ok := protocolParser[protocol]; ok { + parser := protocolParser[protocol] + new_dataptr, parseMap := parser.Parse(session_key, is_req, + dataptr) + if parseMap != nil { + protocolTag := strings.ToUpper(protocol) + merged := false + for _, existing := range sess_map.traces { + if existing["protocol"] == protocolTag { + for k, v := range parseMap { + existing[k] = v + } + merged = true + break + } + } + if !merged { + parseMap["protocol"] = strings.ToUpper(protocol) + sess_map.traces = append(sess_map.traces, parseMap) + } + dataptr = new_dataptr + } else { + // offset to packet is off, no need to continue + // parsing, return error + errStr = fmt.Sprintf("Error: unable to parse protocol %v", protocol) + fmt.Println(errStr) + //return errors.New(errStr) + break + } + } + } + } else { + fmt.Printf("No application layer, TCP packet\n") + return nil + } + + if len(errStr) > 0 { + // buffer + if is_req { + sess_map.reqBuf = append([]byte(nil), dataptr...) + } else { + sess_map.respBuf = append([]byte(nil), dataptr...) + } + //sessionMap[session_key] = sess_map + } else { + session_key := session_key_t { + src_ip: dst_ip, + dst_ip: src_ip, + src_port: dst_port, + dst_port: src_port, + } + key_buf := &bytes.Buffer{} + err := binary.Write(key_buf, binary.LittleEndian, session_key) + if err != nil { + fmt.Println(err) + return nil + } + key := append([]byte(nil), key_buf.Bytes()...) + if leaf, err := session_table.Get(key); err != nil { + fmt.Printf("Failed to lookup key %v with err %v\n", session_key, err) + return nil + } else { + var duration uint64 = 0 + leaf_buf := bytes.NewBuffer(leaf) + if leaf_buf == nil { + fmt.Println("Error: unable to allocate new byte buffer") + return nil + } + session := session_t{} + if err = binary.Read(leaf_buf, binary.LittleEndian, &session); + err != nil { + fmt.Println(err) + return nil + } + if session.Resp_time == 0 { + fmt.Printf("session response time not set yet\n") + } else { + duration = (session.Resp_time - session.Req_time)/1000 + fmt.Printf("session time : %v\n", session) + fmt.Printf("Duration: %d usec\n", duration) + } + map_val["duration"] = fmt.Sprintf("%v usec", duration) + + node, node_session, err := getNodeIntfSession(session_key) + if err == nil { + map_val["node-interface"] = node + map_val["node-request-ts"] = fmt.Sprintf("%v", node_session.Req_time) + map_val["node-response-ts"] = fmt.Sprintf("%v", node_session.Resp_time) + delNodeIntfSession(node, key) + } else { + fmt.Printf("Session not found in any node interface... posssibly local?") + } + + if duration > 0 { + sess_map.done = true + err := session_table.Delete(key) + if err != nil { + fmt.Printf("Error deleting key %v: %v\n", key, err) + return err + } + } + } + } + + return nil +} + +func setTrafficTable(traffic_table *bcc.Table, port_num int, protocol_id string, dump_table bool) error { + key, _ := traffic_table.KeyStrToBytes(strconv.Itoa(port_num)) + leaf, _ := traffic_table.LeafStrToBytes(strconv.Itoa(protocolMap[protocol_id])) + if err := traffic_table.Set(key, leaf); err != nil { + fmt.Printf("Failed to set traffic table tcpdports: %v\n", err) + return err + } + if dump_table { + dumpBPFTable(traffic_table) + } + return nil +} + +func setEgressTable(egress_table *bcc.Table, + egress_match_list []egress_match_t, + action int, + dump_table bool) error { + for _, egress_match := range egress_match_list { + key_buf := &bytes.Buffer{} + err := binary.Write(key_buf, binary.LittleEndian, egress_match) + if err != nil { + fmt.Printf("Error converting key %v into binary: %v\n", egress_match, err) + continue + } + key := append([]byte(nil), key_buf.Bytes()...) + leaf, _ := egress_table.LeafStrToBytes(strconv.Itoa(action)) + if err := egress_table.Set(key, leaf); err != nil { + fmt.Printf("Failed to add key %v:%v to egress table: %v\n", key,leaf,err) + return err + } + } + if dump_table { + dumpBPFTable(egress_table) + } + return nil +} + +var nodeintfFilterList = [...]string {"lo", "veth", "docker", "flannel"} + +func filterNodeIntf(intf string) bool { + for _, substring := range nodeintfFilterList { + if strings.Contains(intf, substring) { + return false + } + } + return true +} + +type nodeIntf struct { + bpfMod *bcc.Module + ipTrackTable *bcc.Table + sessionTable *bcc.Table +} + +var nodeIntfMap = map[string]*nodeIntf{} + +func setupNodeIntf(ifindex int) (*nodeIntf, error) { + buf, err := ioutil.ReadFile("libclovisor/ebpf/node_interface.c") + if err != nil { + fmt.Println(err) + return nil, err + } + code := string(buf) + + bpf_mod := bcc.NewModule(code, []string{}) + + ingress_fn, err := bpf_mod.Load("handle_ingress", + C.BPF_PROG_TYPE_SCHED_CLS, + 1, 65536) + if err != nil { + fmt.Printf("Failed to load node interface ingress func: %v\n", err) + return nil, err + } + + egress_fn, err := bpf_mod.Load("handle_egress", + C.BPF_PROG_TYPE_SCHED_CLS, + 1, 65536) + if err != nil { + fmt.Printf("Failed to load node interface egress func: %v\n", err) + return nil, err + } + + ip_track_table := bcc.NewTable(bpf_mod.TableId("ip2track"), bpf_mod) + node_sess_table := bcc.NewTable(bpf_mod.TableId("node_sessions"), bpf_mod) + + // check if qdisc clsact filter for this interface already exists + link, err := netlink.LinkByIndex(ifindex) + if err != nil { + fmt.Println(err) + } else { + qdiscs, err := netlink.QdiscList(link) + if err == nil { + for _, qdisc_ := range qdiscs { + if qdisc_.Type() == "clsact" { + netlink.QdiscDel(qdisc_) + break + } + } + } + } + + attrs := netlink.QdiscAttrs { + LinkIndex: ifindex, + Handle: netlink.MakeHandle(0xffff, 0), + Parent: netlink.HANDLE_CLSACT, + } + + qdisc := &netlink.GenericQdisc { + QdiscAttrs: attrs, + QdiscType: "clsact", + } + + if err := netlink.QdiscAdd(qdisc); err != nil { + fmt.Println(err) + return nil, err + } + + ingress_filter_attrs := netlink.FilterAttrs{ + LinkIndex: ifindex, + Parent: netlink.MakeHandle(0xffff, 0xfff3), + Priority: 1, + Protocol: unix.ETH_P_ALL, + } + ingress_filter := &netlink.BpfFilter{ + FilterAttrs: ingress_filter_attrs, + Fd: ingress_fn, + Name: "handle_ingress", + DirectAction: true, + } + if ingress_filter.Fd < 0 { + fmt.Println("Failed to load node interface ingress bpf program") + return nil, err + } + + if err := netlink.FilterAdd(ingress_filter); err != nil { + fmt.Println(err) + return nil, err + } + + egress_filter_attrs := netlink.FilterAttrs{ + LinkIndex: ifindex, + Parent: netlink.MakeHandle(0xffff, 0xfff2), + Priority: 1, + Protocol: unix.ETH_P_ALL, + } + egress_filter := &netlink.BpfFilter{ + FilterAttrs: egress_filter_attrs, + Fd: egress_fn, + Name: "handle_egress", + DirectAction: true, + } + if egress_filter.Fd < 0 { + fmt.Println("Failed to load node interface egress bpf program") + return nil, err + } + + if err := netlink.FilterAdd(egress_filter); err != nil { + fmt.Println(err) + return nil, err + } + + ip_track_table.DeleteAll() + node_sess_table.DeleteAll() + + return &nodeIntf{ + bpfMod: bpf_mod, + ipTrackTable: ip_track_table, + sessionTable: node_sess_table, + }, nil +} + +func ClovisorPhyInfSetup() error { + intfList, err := net.Interfaces() + if err != nil { + fmt.Printf("Failed to get node interfaces: %v\n", err) + return err + } + + for _, f := range intfList { + if !filterNodeIntf(f.Name) { + continue + } + fmt.Printf("Tracking node interface %v w/ index %v\n", f.Name, f.Index) + bpf_node_intf, err := setupNodeIntf(f.Index) + if err != nil { + fmt.Printf("Failed to set up node interface %v: %v\n", f.Name, err) + return err + } + nodeIntfMap[f.Name] = bpf_node_intf + } + return nil +} + +func setIPTrackingTable(table *bcc.Table, ipaddr uint32, action int) error { + key, _ := table.KeyStrToBytes(strconv.Itoa(int(ipaddr))) + leaf, _ := table.LeafStrToBytes(strconv.Itoa(action)) + if err := table.Set(key, leaf); err != nil { + fmt.Printf("Failed to set IP tracking table: %v\n", err) + return err + } + dumpBPFTable(table) + return nil +} + +func setNodeIntfTrackingIP(ipaddr uint32) { + for name, node_intf := range nodeIntfMap { + err := setIPTrackingTable(node_intf.ipTrackTable, ipaddr, 1) + if err != nil { + fmt.Printf("Failed to add ip address %v to node interface %v: %v\n", backtoIP4(int64(ipaddr)), name, err) + } + } +} + +func getNodeIntfSession(session_key session_key_t) (string, *session_t, error) { + key_buf := &bytes.Buffer{} + binary.Write(key_buf, binary.LittleEndian, session_key) + key := append([]byte(nil), key_buf.Bytes()...) + + for node, node_intf := range nodeIntfMap { + fmt.Printf("For node interface %v... ", node) + //dumpBPFTable(node_intf.sessionTable) + if leaf, err := node_intf.sessionTable.Get(key); err == nil { + leaf_buf := bytes.NewBuffer(leaf) + session := session_t{} + binary.Read(leaf_buf, binary.LittleEndian, &session) + return node, &session, nil + } + } + return "", nil, errors.New("session not found") +} + +func delNodeIntfSession(node_iname string, key []byte) error { + nodeIntf := nodeIntfMap[node_iname] + err := nodeIntf.sessionTable.Delete(key) + if err != nil { + fmt.Printf("Error deleting session %v from node interface %v: %v\n", + key, node_iname, err) + } + return err +} + +func ClovisorNewPodInit(k8s_client *ClovisorK8s, + node_name string, + pod_name string, + monitoring_info *monitoring_info_t) (*ClovisorBCC, error) { + + output, err := k8s_client.exec_command(veth_ifidx_command, monitoring_info) + if err != nil { + return nil, err + } + + ifindex , err := strconv.Atoi(output) + if err != nil { + fmt.Printf("Error converting %v to ifindex, error: %v\n", output, err.Error()) + return nil, err + } + + sessionMap = map[string]*session_info_t{}; + + fmt.Printf("Beginning network tracing for pod %v\n", pod_name) + + buf, err := ioutil.ReadFile("libclovisor/ebpf/session_tracking.c") + if err != nil { + fmt.Println(err) + return nil, err + } + code := string(buf) + + bpf_mod := bcc.NewModule(code, []string{}) + //defer bpf_mod.Close() + + ingress_fn, err := bpf_mod.Load("handle_ingress", + C.BPF_PROG_TYPE_SCHED_CLS, + 1, 65536) + if err != nil { + fmt.Println("Failed to load ingress func: %v\n", err) + return nil, err + } + fmt.Println("Loaded Ingress func to structure") + + egress_fn, err := bpf_mod.Load("handle_egress", + C.BPF_PROG_TYPE_SCHED_CLS, + 1, 65536) + if err != nil { + fmt.Println("Failed to load egress func: %v\n", err) + return nil, err + } + + fmt.Println("Loaded Egress func to structure") + + traffic_table := bcc.NewTable(bpf_mod.TableId("dports2proto"), bpf_mod) + if err := setTrafficTable(traffic_table, int(monitoring_info.port_num), + monitoring_info.protocols[0], true); + err != nil { + fmt.Printf("Error on setting traffic port") + return nil, err + } + + egress_match_list := get_egress_match_list(pod_name) + + egress_table := bcc.NewTable(bpf_mod.TableId("egress_lookup_table"), bpf_mod) + if egress_match_list != nil { + if err := setEgressTable(egress_table, egress_match_list, 1, true); err != nil { + return nil, err + } + } + + session_table := bcc.NewTable(bpf_mod.TableId("sessions"), bpf_mod) + + // check if qdisc clsact filter for this interface already exists + link, err := netlink.LinkByIndex(ifindex) + if err != nil { + fmt.Println(err) + } else { + qdiscs, err := netlink.QdiscList(link) + if err == nil { + for _, qdisc := range qdiscs { + if qdisc.Type() == "clsact" { + netlink.QdiscDel(qdisc) + break + } + } + } + } + + attrs := netlink.QdiscAttrs { + LinkIndex: ifindex, + Handle: netlink.MakeHandle(0xffff, 0), + Parent: netlink.HANDLE_CLSACT, + } + + qdisc := &netlink.GenericQdisc { + QdiscAttrs: attrs, + QdiscType: "clsact", + } + + if err := netlink.QdiscAdd(qdisc); err != nil { + fmt.Println(err) + return nil, err + } + + fmt.Printf("Qdisc for clsact added for index %v\n", ifindex) + + ingress_filter_attrs := netlink.FilterAttrs{ + LinkIndex: ifindex, + Parent: netlink.MakeHandle(0xffff, 0xfff3), + Priority: 1, + Protocol: unix.ETH_P_ALL, + } + ingress_filter := &netlink.BpfFilter{ + FilterAttrs: ingress_filter_attrs, + Fd: ingress_fn, + Name: "handle_ingress", + DirectAction: true, + } + if ingress_filter.Fd < 0 { + fmt.Println("Failed to load ingress bpf program") + return nil, err + } + + if err := netlink.FilterAdd(ingress_filter); err != nil { + fmt.Println(err) + return nil, err + } + + egress_filter_attrs := netlink.FilterAttrs{ + LinkIndex: ifindex, + Parent: netlink.MakeHandle(0xffff, 0xfff2), + Priority: 1, + Protocol: unix.ETH_P_ALL, + } + egress_filter := &netlink.BpfFilter{ + FilterAttrs: egress_filter_attrs, + Fd: egress_fn, + Name: "handle_egress", + DirectAction: true, + } + if egress_filter.Fd < 0 { + fmt.Println("Failed to load egress bpf program") + return nil, err + } + + if err := netlink.FilterAdd(egress_filter); err != nil { + fmt.Println(err) + return nil, err + } + + setNodeIntfTrackingIP(ip2Long(monitoring_info.pod_ip)) + + table := bcc.NewTable(bpf_mod.TableId("skb_events"), bpf_mod) + + skb_rev_chan := make(chan []byte) + + perfMap, err := bcc.InitPerfMap(table, skb_rev_chan) + if err != nil { + fmt.Println(err) + return nil, err + } + + stop := make(chan bool) + go func() { + fmt.Printf("Start tracing to Jaeger with service %v\n", monitoring_info.svc_name) + tracer, closer := initJaeger(monitoring_info.svc_name) + tracerMap[monitoring_info.svc_name] = tracer + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <- ticker.C: + print_network_traces() + case data := <-skb_rev_chan: + err = handle_skb_event(&data, node_name, pod_name, session_table, + monitoring_info, egress_match_list, + monitoring_info.svc_name) + if err != nil { + fmt.Printf("failed to decode received data: %s\n", err) + } + case <- stop: + fmt.Printf("Receiving stop for pod %v\n", pod_name) + ticker.Stop() + perfMap.Stop() + closer.Close() + // TODO(s3wong): uncomment remove qdisc del once k8s watcher implemented + //netlink.QdiscDel(qdisc) + bpf_mod.Close() + return + } + } + }() + + perfMap.Start() + return &ClovisorBCC{ + stopChan: stop, + qdisc: qdisc, + }, nil +} + +func (clovBcc *ClovisorBCC) StopPod() { + // TODO(s3wong): remove qdisc del once k8s watcher implemented + netlink.QdiscDel(clovBcc.qdisc) + clovBcc.stopChan <- true +} diff --git a/clover/clovisor/libclovisor/clovisor_cfg.go b/clover/clovisor/libclovisor/clovisor_cfg.go new file mode 100644 index 0000000..ca3200e --- /dev/null +++ b/clover/clovisor/libclovisor/clovisor_cfg.go @@ -0,0 +1,189 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package clovisor + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "net" + "strconv" + "strings" + + "github.com/go-redis/redis" + opentracing "github.com/opentracing/opentracing-go" + jaeger "github.com/uber/jaeger-client-go" + jaeger_config "github.com/uber/jaeger-client-go/config" +) + +var redisServer string = "redis.clovisor" +var jaegerCollector string = "jaeger-collector.clovisor:14268" +var jaegerAgent string = "jaeger-agent.clovisor:6831" +var ProtoCfg string = "clovisor_proto_cfg" +var protoPluginCfgChan string = "clovisor_proto_plugin_cfg_chan" + +/* + * redisConnect: redis client connecting to redis server + */ +func redisConnect() *redis.Client { + client := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:6379", redisServer), + Password: "", + DB: 0, + }) + return client +} + +func get_cfg_labels(node_name string) ([]string, error) { + client := redisConnect() + defer client.Close() + labels_list, err := client.LRange("clovisor_labels", 0, -1).Result() + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + + return labels_list, err +} + +func parse_label_cfg(label_cfg string) (string, string, string) { + label_slice := strings.Split(label_cfg, ":") + if len(label_slice) == 1 { + return label_slice[0], "", "" + } + return label_slice[0], label_slice[1], label_slice[2] +} + +func get_egress_match_list(pod_name string) ([]egress_match_t) { + client := redisConnect() + defer client.Close() + egress_cfg_list, err := client.LRange("clovior_egress_match", 0, -1).Result() + if err != nil { + fmt.Println(err.Error()) + return nil + } + ret_list := make([]egress_match_t, 0, len(egress_cfg_list)) + for _, em_cfg_str := range(egress_cfg_list) { + fmt.Printf("egress match cfg == %v\n", em_cfg_str) + em_cfg_slice := strings.Split(em_cfg_str, ":") + if len(em_cfg_slice) < 2 { + fmt.Printf("egress match config requires at least two fields [%v]\n", em_cfg_slice) + continue + } else if len(em_cfg_slice) == 3 { + if strings.Contains(pod_name, em_cfg_slice[2]) { + fmt.Printf("%v != %v, filtering out this config for pod %v\n", + em_cfg_slice[2], pod_name, pod_name) + continue + } + } + var ip uint32 = 0 + if em_cfg_slice[0] != "0" { + ip = ip2Long(em_cfg_slice[0]) + } + port_32, _ := strconv.Atoi(em_cfg_slice[1]) + port := uint16(port_32) + ret_list = append(ret_list, egress_match_t{ip, port}) + } + return ret_list +} + +// following function comes from +// https://www.socketloop.com/tutorials/golang-convert-ip-address-string-to-long-unsigned-32-bit-integer +func ip2Long(ip string) uint32 { + var long uint32 + //binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long) + binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.BigEndian, &long) + return long +} + +func backtoIP4(ipInt int64) string { + // need to do two bit shifting and “0xff” masking + b0 := strconv.FormatInt((ipInt>>24)&0xff, 10) + b1 := strconv.FormatInt((ipInt>>16)&0xff, 10) + b2 := strconv.FormatInt((ipInt>>8)&0xff, 10) + b3 := strconv.FormatInt((ipInt & 0xff), 10) + return b0 + "." + b1 + "." + b2 + "." + b3 +} + +func get_cfg_session_match() ([]egress_match_cfg, error) { + var ret_list []egress_match_cfg + client := redisConnect() + defer client.Close() + keys, err := client.HKeys("clovisor_session_match").Result() + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + for _, key := range keys { + value, err := client.HGet("clovisor_session_match", key).Result() + if err != nil { + fmt.Println(err.Error()) + continue + } + match_slice := strings.Split(key, "-") + dst_ip := ip2Long(match_slice[0]) + dst_port, _ := strconv.Atoi(match_slice[1]) + egress_match := egress_match_t{ + dst_ip: dst_ip, + dst_port: uint16(dst_port), + } + // organize into internally understandable struct + ret_list = append(ret_list, egress_match_cfg{ + egress_match: egress_match, + action: value, + }) + } + return ret_list, nil +} + +func initJaeger(service string) (opentracing.Tracer, io.Closer) { + cfg := &jaeger_config.Configuration{ + Sampler: &jaeger_config.SamplerConfig{ + Type: "const", + Param: 1, + }, + Reporter: &jaeger_config.ReporterConfig{ + LogSpans: true, + CollectorEndpoint: fmt.Sprintf("http://%s/api/traces", jaegerCollector), + LocalAgentHostPort: fmt.Sprintf("%s", jaegerAgent), + }, + } + tracer, closer, err := cfg.New(service, jaeger_config.Logger(jaeger.StdLogger)) + if err != nil { + panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) + } + return tracer, closer +} + +func get_jaeger_server() (string, error) { + client := redisConnect() + defer client.Close() + return client.Get("clovisor_jaeger_server").Result() +} + +func Monitor_proto_plugin_cfg() { + client := redisConnect() + //defer client.Close() + + pubsub := client.Subscribe(protoPluginCfgChan) + //defer pubsub.Close() + + go func() { + for { + msg, err := pubsub.ReceiveMessage() + if err != nil { + fmt.Printf("Error getting protocol plugin configuration: %v\n", err) + return + } + + fmt.Printf("Update on protocol %v notification received\n", msg.Payload) + loadProtoParser(msg.Payload, true) + } + }() +} diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go new file mode 100644 index 0000000..a53e308 --- /dev/null +++ b/clover/clovisor/libclovisor/clovisor_k8s.go @@ -0,0 +1,272 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package clovisor + +import ( + "bytes" + "errors" + "fmt" + "strconv" + "strings" + + core_v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +type ClovisorK8s struct { + client *kubernetes.Clientset + config *rest.Config +} + +type monitoring_info_t struct { + namespace string + svc_name string + pod_name string + container_name string + protocols []string + port_num uint32 + pod_ip string +} + +var DEFAULT_NAMESPACE = "default" +var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"} + +func K8s_client_init(nodeName string) (*ClovisorK8s, error) { + config, err := rest.InClusterConfig() + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + fmt.Println(err.Error()) + return nil, err + } + + return &ClovisorK8s{ + client: client, + config: config, + }, nil +} + +func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*monitoring_info_t, + error) { + + labels_list, err := get_cfg_labels(nodeName) + if err != nil { + fmt.Printf("Error getting cfg labels: %v\n", err) + return nil, err + } + + mon_info_map := client.get_monitoring_pods(nodeName, labels_list) + if mon_info_map == nil { + return nil, errors.New("monitoring info empty") + } + + return mon_info_map, nil +} + +func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service, + namespace string) (*core_v1.PodList, error) { + set := labels.Set(svc.Spec.Selector) + //label := strings.Split(set.AsSelector().String(), ",")[0] + //fmt.Printf("Trying to get pods for service %v with label %v from %v\n", svc.GetName(), label, set.AsSelector().String()) + listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()} + //listOptions := metav1.ListOptions{LabelSelector: label} + return client.client.CoreV1().Pods(namespace).List(listOptions) +} + +func (client *ClovisorK8s) get_monitoring_pods(nodeName string, + labels_list []string) (map[string]*monitoring_info_t) { + /* + * Three cases: + * 1.) no configured namespaces, monitoring all pods in default namesapce + * 2.) if any config only has namespace, monitoring all pods in namespace + * 3.) label is configured, only monitor pods with that label + */ + var namespace string + ns_svc_map := make(map[string][]*core_v1.ServiceList) + monitoring_info := make(map[string]*monitoring_info_t) + if len(labels_list) == 0 { + // TODO(s3wong): set it to 'default' + //namespace = "linux-foundation-gke" + namespace = "default" + if svcs_list, err := + client.client.CoreV1().Services(namespace).List(metav1.ListOptions{}); + err != nil { + fmt.Printf("Error fetching service list for namespace %s\n", + namespace) + return nil + } else { + /* + if _, ok := ns_svc_map[namespace]; !ok { + ns_svc_map[namespace] = []*core_v1.ServiceList{} + } + */ + ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list) + } + } else { + for _, label_str := range labels_list { + var label_selector string + namespace, key, value := parse_label_cfg(label_str) + if len(namespace) == 0 { + fmt.Printf("Error in config: %s not a valid config\n", label_str) + continue + } + if len(key) == 0 { + fmt.Printf("Namespace only config for %s\n", namespace) + } else { + label_selector = fmt.Sprintf("%s=%s", key, value) + } + if svc_list, err := + client.client.CoreV1().Services(namespace).List(metav1.ListOptions{ + LabelSelector: label_selector, + }); err != nil { + fmt.Printf("Error listing services with label %v:%v:%v - %v\n", + key, value, namespace, err.Error()) + continue + } else { + if _, ok := ns_svc_map[namespace]; !ok { + ns_svc_map[namespace] = []*core_v1.ServiceList{} + } + ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list) + } + } + } + + for ns, svc_slice := range ns_svc_map { + for _, svc_list_ := range svc_slice { + for _, svc := range svc_list_.Items { + if ns == "default" && svc.GetName() == "kubernetes" { + continue + } + //fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName()) + //var svc_port_map = map[string]core_v1.ServicePort{} + var svc_port_map = map[string][]string{} + for _, svc_port := range svc.Spec.Ports { + if len(svc_port.Name) > 0 { + svc_protos := strings.Split(svc_port.Name, "-") + for _, proto := range svc_protos { + if err := loadProtoParser(proto, false); err == nil { + for _, sp := range SUPPORTED_PROTOCOLS { + if strings.Contains(proto, sp) { + target_port := svc_port.TargetPort.String() + svc_port_map[target_port] = append(svc_port_map[target_port], proto) + } + } + } else { + fmt.Printf("Unsupported protocol: %v\n", proto) + } + } + } + } + if len(svc_port_map) == 0 { + fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName()) + continue + } else { + fmt.Printf("svc_port_map for service %v is %v\n", svc.GetName(), svc_port_map) + } + //fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName()) + pod_list, err := client.getPodsForSvc(&svc, ns) + if err != nil { + fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err) + continue + } + /* + labelSet := labels.Set(svc.Spec.Selector) + pod_list, err := client.client.CoreV1().Pods(ns).List(metav1.ListOptions{}) + if err != nil { + fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err) + continue + } + */ + for _, pod := range pod_list.Items { + if pod.Spec.NodeName == nodeName { + for _, container := range pod.Spec.Containers { + var port_num uint32 + var tp_string string + for _, port := range container.Ports { + port_num = uint32(port.ContainerPort) + tp_string = strconv.Itoa(int(port_num)) + if _, in := svc_port_map[tp_string]; !in { + continue + } + pod_name := pod.GetName() + monitoring_info[pod_name] = &(monitoring_info_t{}) + monitoring_info[pod_name].namespace = ns + monitoring_info[pod_name].svc_name = svc.GetName() + monitoring_info[pod_name].pod_name = pod_name + monitoring_info[pod_name].container_name = container.Name + monitoring_info[pod_name].port_num = port_num + monitoring_info[pod_name].protocols = svc_port_map[tp_string] + monitoring_info[pod_name].pod_ip = pod.Status.PodIP + } + } + } + } + } + } + } + + return monitoring_info +} + +func (client *ClovisorK8s) exec_command(command string, + monitoring_info *monitoring_info_t) (string, error) { + + // Following code based on: + // https://stackoverflow.com/questions/43314689/example-of-exec-in-k8ss-pod-by-using-go-client + // https://github.com/a4abhishek/Client-Go-Examples/blob/master/exec_to_pod/exec_to_pod.go + exec_req := client.client.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(monitoring_info.pod_name). + Namespace(monitoring_info.namespace). + SubResource("exec") + scheme := runtime.NewScheme() + if err := core_v1.AddToScheme(scheme); err != nil { + fmt.Printf("Error in exec pods: %v\n", err.Error()) + return "", err + } + + parameterCodec := runtime.NewParameterCodec(scheme) + exec_req.VersionedParams(&core_v1.PodExecOptions{ + Command: strings.Fields(command), + Container: monitoring_info.container_name, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, parameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(client.config, "POST", exec_req.URL()) + if err != nil { + fmt.Printf("Error in remotecommand exec: %v\n", err.Error()) + return "", err + } + + var stdout, stderr bytes.Buffer + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + if err != nil { + fmt.Printf("Error in exec stream: %v\n", err.Error()) + return "", err + } + + stdout_no_newline := strings.TrimSuffix(stdout.String(), "\n") + return stdout_no_newline, nil +} diff --git a/clover/clovisor/libclovisor/ebpf/node_interface.c b/clover/clovisor/libclovisor/ebpf/node_interface.c new file mode 100755 index 0000000..cd14a50 --- /dev/null +++ b/clover/clovisor/libclovisor/ebpf/node_interface.c @@ -0,0 +1,158 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 +#include <uapi/linux/if_ether.h> +#include <uapi/linux/in.h> +#include <uapi/linux/ip.h> +#include <uapi/linux/tcp.h> +#include <uapi/linux/pkt_cls.h> +#include <uapi/linux/bpf.h> + +#include <bcc/proto.h> + +#define MAX_SESSION_TABLE_ENTRIES 8192 + +typedef struct session_key_ { + u32 src_ip; + u32 dst_ip; + unsigned short src_port; + unsigned short dst_port; +} session_key_t; + +typedef struct session_ { + u64 req_time; + u64 resp_time; +} session_t; + +BPF_HASH(ip2track, u32, u32); +BPF_HASH(node_sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES); + +struct eth_hdr { + unsigned char h_dest[ETH_ALEN]; + unsigned char h_source[ETH_ALEN]; + unsigned short h_proto; +}; + +static inline int ipv4_hdrlen(struct iphdr *ip4) +{ + return ip4->ihl * 4; +} + +static inline int tcp_doff(struct tcphdr *tcp_hdr) +{ + return tcp_hdr->doff * 4; +} + +static inline void fill_up_sess_key(session_key_t *key, u32 src_ip, + u32 dst_ip, u16 src_port, u16 dst_port) +{ + key->src_ip = src_ip; + key->dst_ip = dst_ip; + key->src_port = src_port; + key->dst_port = dst_port; +} + +static inline void process_response(u32 src_ip, u32 dst_ip, u16 src_port, + u16 dst_port) +{ + session_key_t sess_key = {}; + session_t *session_ptr = NULL; + fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port); + session_ptr = node_sessions.lookup(&sess_key); + if (session_ptr != NULL) { + u64 resp_time = bpf_ktime_get_ns(); + session_t update_session = { + session_ptr->req_time, + resp_time + }; + node_sessions.update(&sess_key, &update_session); + } +} + +static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port, + u16 dst_port) +{ + session_key_t sess_key = {}; + session_t *session_ptr = NULL; + session_t new_session = { + bpf_ktime_get_ns(), + 0 + }; + fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port); + session_ptr = node_sessions.lookup(&sess_key); + if (! session_ptr) { + node_sessions.insert(&sess_key, &new_session); + } +} + +static inline void ingress_parsing(struct tcphdr *tcp_hdr, + struct iphdr *ipv4_hdr) +{ + unsigned int dst_ip = ntohl(ipv4_hdr->daddr); + int ret = 0; + + unsigned int *proto = ip2track.lookup(&dst_ip); + if (proto != NULL) { + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + } +} + +static inline void egress_parsing(struct tcphdr *tcp_hdr, + struct iphdr *ipv4_hdr) +{ + unsigned int src_ip = ntohl(ipv4_hdr->saddr); + + unsigned int *proto = ip2track.lookup(&src_ip); + + if (proto != NULL) { + process_request(ntohl(ipv4_hdr->saddr), + ntohl(ipv4_hdr->daddr), + ntohs(tcp_hdr->source), + ntohs(tcp_hdr->dest)); + } +} + +static inline int handle_packet(struct __sk_buff *skb, int is_ingress) +{ + void *data = (void *)(long)skb->data; + void *data_end = (void *)(long)skb->data_end; + struct eth_hdr *eth = data; + struct iphdr *ipv4_hdr = data + sizeof(*eth); + struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr); + + /* TODO(s3wong): assuming TCP only for now */ + /* single length check */ + if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end) + return TC_ACT_OK; + + if (eth->h_proto != htons(ETH_P_IP)) + return TC_ACT_OK; + + // TODO(s3wong): no support for IP options + if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5) + return TC_ACT_OK; + + if (is_ingress == 1) { + ingress_parsing(tcp_hdr, ipv4_hdr); + } else{ + egress_parsing(tcp_hdr, ipv4_hdr); + } + + return TC_ACT_OK; +} + +int handle_ingress(struct __sk_buff *skb) +{ + return handle_packet(skb, 1); +} + +int handle_egress(struct __sk_buff *skb) +{ + return handle_packet(skb, 0); +} diff --git a/clover/clovisor/libclovisor/ebpf/session_tracking.c b/clover/clovisor/libclovisor/ebpf/session_tracking.c new file mode 100755 index 0000000..ea68788 --- /dev/null +++ b/clover/clovisor/libclovisor/ebpf/session_tracking.c @@ -0,0 +1,292 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 +#include <uapi/linux/if_ether.h> +#include <uapi/linux/in.h> +#include <uapi/linux/ip.h> +#include <uapi/linux/tcp.h> +#include <uapi/linux/pkt_cls.h> +#include <uapi/linux/bpf.h> + +#include <bcc/proto.h> + +#define HTTP_HDR_MIN_LEN 7 +#define MAX_SESSION_TABLE_ENTRIES 8192 + +typedef enum { + UNDEFINED = 0, + HTTP = 1, + HTTP2 = 2, + TCP = 3, + UDP = 4, +} app_proto_t; + +typedef struct session_key_ { + u32 src_ip; + u32 dst_ip; + unsigned short src_port; + unsigned short dst_port; +} session_key_t; + +typedef struct session_ { + u64 req_time; + u64 resp_time; +} session_t; + +typedef struct egress_match_ { + u32 dst_ip; + unsigned short dst_port; +} egress_match_t; + +typedef enum policy_action_ { + RECORD = 1, +} policy_action_t; + +BPF_PERF_OUTPUT(skb_events); +BPF_HASH(dports2proto, u16, u32); +BPF_HASH(egress_lookup_table, egress_match_t, policy_action_t); +BPF_HASH(sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES); + +struct eth_hdr { + unsigned char h_dest[ETH_ALEN]; + unsigned char h_source[ETH_ALEN]; + unsigned short h_proto; +}; + +static inline int ipv4_hdrlen(struct iphdr *ip4) +{ + return ip4->ihl * 4; +} + +static inline int tcp_doff(struct tcphdr *tcp_hdr) +{ + return tcp_hdr->doff * 4; +} + +static inline int http_parsing(void *data, void *data_end) +{ + + int is_http = 1; + if (data + HTTP_HDR_MIN_LEN > data_end) { + bpf_trace_printk("No HTTP Header in TCP segment"); + return 0; + } + if (strncmp((char*)data, "HTTP", 4)) { + if (strncmp((char*)data, "GET", 3)) { + if (strncmp((char*)data, "POST", 4)) { + if (strncmp((char*)data, "PUT", 3)) { + if (strncmp((char*)data, "HEAD", 4)) { + is_http = 0; + } + } + } + } + } + return is_http; +} + +static inline void fill_up_sess_key(session_key_t *key, u32 src_ip, + u32 dst_ip, u16 src_port, u16 dst_port) +{ + key->src_ip = src_ip; + key->dst_ip = dst_ip; + key->src_port = src_port; + key->dst_port = dst_port; +} + +static inline int process_response(u32 src_ip, u32 dst_ip, u16 src_port, + u16 dst_port) +{ + session_key_t sess_key = {}; + session_t *session_ptr = NULL; + fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port); + session_ptr = sessions.lookup(&sess_key); + if (session_ptr != NULL) { + u64 resp_time = bpf_ktime_get_ns(); + session_t update_session = { + session_ptr->req_time, + resp_time + }; + sessions.update(&sess_key, &update_session); + return 1; + } + return 0; +} + +static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port, + u16 dst_port) +{ + session_key_t sess_key = {}; + session_t *session_ptr = NULL; + session_t new_session = { + bpf_ktime_get_ns(), + 0 + }; + fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port); + session_ptr = sessions.lookup(&sess_key); + if (! session_ptr) { + sessions.insert(&sess_key, &new_session); + } + /* + if (session_ptr != NULL) { + sessions.update(&sess_key, &new_session); + } else { + sessions.insert(&sess_key, &new_session); + } + */ +} + +static inline app_proto_t ingress_tcp_parsing(struct tcphdr *tcp_hdr, + struct iphdr *ipv4_hdr, + void *data_end) +{ + unsigned short dest_port = htons(tcp_hdr->dest); + egress_match_t egress_match = {}; + policy_action_t *policy_ptr = NULL; + app_proto_t ret = TCP; + + unsigned int *proto = dports2proto.lookup(&dest_port); + if (proto != NULL) { + /* + if (tcp_hdr->syn && !tcp_hdr->ack) { + return ret; + } + */ + ret = HTTP; + if (tcp_hdr->fin || tcp_hdr->rst) { + process_response(ntohl(ipv4_hdr->saddr), + ntohl(ipv4_hdr->daddr), + ntohs(tcp_hdr->source), + ntohs(tcp_hdr->dest)); + } else { + process_request(ntohl(ipv4_hdr->saddr), + ntohl(ipv4_hdr->daddr), + ntohs(tcp_hdr->source), + ntohs(tcp_hdr->dest)); + } + } else { + dest_port = htons(tcp_hdr->source); + proto = dports2proto.lookup(&dest_port); + if (proto != NULL) { + // clock response receiving time + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + } + egress_match.dst_ip = ntohl(ipv4_hdr->saddr); + egress_match.dst_port = ntohs(tcp_hdr->source); + policy_ptr = egress_lookup_table.lookup(&egress_match); + if (policy_ptr == NULL) { + egress_match.dst_ip = 0; + policy_ptr = egress_lookup_table.lookup(&egress_match); + } + + if (policy_ptr != NULL) { + if (*policy_ptr == RECORD) { + ret = HTTP; + if (tcp_hdr->fin || tcp_hdr->rst) { + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + } + } + } + } + + // everything else drops to TCP + //return ((void*)tcp_hdr); + return ret; +} + +static inline app_proto_t egress_tcp_parsing(struct tcphdr *tcp_hdr, + struct iphdr *ipv4_hdr, + void *data_end) +{ + unsigned short src_port = htons(tcp_hdr->source); + app_proto_t ret = TCP; + egress_match_t egress_match = {}; + policy_action_t *policy_ptr = NULL; + + unsigned int *proto = dports2proto.lookup(&src_port); + + if (proto != NULL) { + //if (tcp_hdr->fin || tcp_hdr->rst) { + process_response(ntohl(ipv4_hdr->daddr), + ntohl(ipv4_hdr->saddr), + ntohs(tcp_hdr->dest), + ntohs(tcp_hdr->source)); + //} + ret = HTTP; + } else { + + egress_match.dst_ip = ntohl(ipv4_hdr->daddr); + egress_match.dst_port = ntohs(tcp_hdr->dest); + policy_ptr = egress_lookup_table.lookup(&egress_match); + if (policy_ptr == NULL) { + egress_match.dst_ip = 0; + policy_ptr = egress_lookup_table.lookup(&egress_match); + } + + if (policy_ptr != NULL) { + if (*policy_ptr == RECORD) { + process_request(ntohl(ipv4_hdr->saddr), + ntohl(ipv4_hdr->daddr), + ntohs(tcp_hdr->source), + ntohs(tcp_hdr->dest)); + ret = HTTP; + } + } + } + //return(ret_hdr); + return ret; +} + +static inline int handle_packet(struct __sk_buff *skb, int is_ingress) +{ + void *data = (void *)(long)skb->data; + void *data_end = (void *)(long)skb->data_end; + struct eth_hdr *eth = data; + struct iphdr *ipv4_hdr = data + sizeof(*eth); + struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr); + app_proto_t proto = TCP; + + /* TODO(s3wong): assuming TCP only for now */ + /* single length check */ + if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end) + return TC_ACT_OK; + + if (eth->h_proto != htons(ETH_P_IP)) + return TC_ACT_OK; + + // TODO(s3wong): no support for IP options + if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5) + return TC_ACT_OK; + + if (is_ingress == 1) { + proto = ingress_tcp_parsing(tcp_hdr, ipv4_hdr, data_end); + } else{ + proto = egress_tcp_parsing(tcp_hdr, ipv4_hdr, data_end); + } + + if (proto == HTTP) { + int offset = is_ingress; + skb_events.perf_submit_skb(skb, skb->len, &offset, sizeof(offset)); + } + + return TC_ACT_OK; +} + +int handle_ingress(struct __sk_buff *skb) +{ + return handle_packet(skb, 1); +} + +int handle_egress(struct __sk_buff *skb) +{ + return handle_packet(skb, 0); +} diff --git a/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml b/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml new file mode 100644 index 0000000..1260ef5 --- /dev/null +++ b/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml @@ -0,0 +1,151 @@ +# +# Copyright 2017-2018 The Jaeger Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +apiVersion: v1 +kind: List +items: +- apiVersion: extensions/v1beta1 + kind: Deployment + metadata: + name: jaeger-deployment + namespace: clovisor + labels: + app: jaeger + jaeger-infra: jaeger-deployment + spec: + replicas: 1 + strategy: + type: Recreate + template: + metadata: + labels: + app: jaeger + jaeger-infra: jaeger-pod + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "16686" + spec: + containers: + - env: + - name: COLLECTOR_ZIPKIN_HTTP_PORT + value: "9411" + image: jaegertracing/all-in-one + name: jaeger + ports: + - containerPort: 5775 + protocol: UDP + - containerPort: 6831 + protocol: UDP + - containerPort: 6832 + protocol: UDP + - containerPort: 5778 + protocol: TCP + - containerPort: 16686 + protocol: TCP + - containerPort: 9411 + protocol: TCP + readinessProbe: + httpGet: + path: "/" + port: 14269 + initialDelaySeconds: 5 +- apiVersion: v1 + kind: Service + metadata: + name: jaeger-query + namespace: clovisor + labels: + app: jaeger + jaeger-infra: jaeger-service + spec: + ports: + - name: query-http + port: 80 + protocol: TCP + targetPort: 16686 + selector: + jaeger-infra: jaeger-pod + type: LoadBalancer +- apiVersion: v1 + kind: Service + metadata: + name: jaeger-collector + namespace: clovisor + labels: + app: jaeger + jaeger-infra: collector-service + spec: + ports: + - name: jaeger-collector-tchannel + port: 14267 + protocol: TCP + targetPort: 14267 + - name: jaeger-collector-http + port: 14268 + protocol: TCP + targetPort: 14268 + - name: jaeger-collector-zipkin + port: 9411 + protocol: TCP + targetPort: 9411 + selector: + jaeger-infra: jaeger-pod + type: ClusterIP +- apiVersion: v1 + kind: Service + metadata: + name: jaeger-agent + namespace: clovisor + labels: + app: jaeger + jaeger-infra: agent-service + spec: + ports: + - name: agent-zipkin-thrift + port: 5775 + protocol: UDP + targetPort: 5775 + - name: agent-compact + port: 6831 + protocol: UDP + targetPort: 6831 + - name: agent-binary + port: 6832 + protocol: UDP + targetPort: 6832 + - name: agent-configs + port: 5778 + protocol: TCP + targetPort: 5778 + clusterIP: None + selector: + jaeger-infra: jaeger-pod +- apiVersion: v1 + kind: Service + metadata: + name: zipkin + namespace: clovisor + labels: + app: jaeger + jaeger-infra: zipkin-service + spec: + ports: + - name: jaeger-collector-zipkin + port: 9411 + protocol: TCP + targetPort: 9411 + clusterIP: None + selector: + jaeger-infra: jaeger-pod + diff --git a/clover/clovisor/libclovisor/libproto/build-plugin b/clover/clovisor/libclovisor/libproto/build-plugin new file mode 100755 index 0000000..743071b --- /dev/null +++ b/clover/clovisor/libclovisor/libproto/build-plugin @@ -0,0 +1,9 @@ +#!/bin/bash +# +# Copyright (c) Authors of Clover +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +go build --buildmode=plugin -o ../../proto/http.so clovisor_http.go diff --git a/clover/clovisor/libclovisor/libproto/clovisor_http.go b/clover/clovisor/libclovisor/libproto/clovisor_http.go new file mode 100644 index 0000000..6440d5a --- /dev/null +++ b/clover/clovisor/libclovisor/libproto/clovisor_http.go @@ -0,0 +1,84 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package main + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +type httpparser string + +func (p httpparser) Parse(session_key string, + is_req bool, + data []byte) ([]byte, map[string]string) { + map_val := make(map[string]string) + reader := bytes.NewReader(data) + buf := bufio.NewReader(reader) + if is_req == true { + req, err := http.ReadRequest(buf) + if err != nil { + fmt.Printf("Request error: ") + fmt.Println(err) + return nil, nil + } else if req == nil { + fmt.Println("request is nil") + return nil, nil + } else { + fmt.Printf("HTTP Request Method %s url %v proto %v\n", + req.Method, req.URL, req.Proto) + map_val["reqmethod"] = req.Method + map_val["requrl"] = fmt.Sprintf("%v", req.URL) + map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) + if user_agent := req.UserAgent(); len(user_agent) > 0 { + map_val["useragent"] = user_agent + } + if len(req.Host) > 0 { + map_val["host"] = req.Host + } + header := req.Header + if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { + map_val["requestid"] = req_id + } + if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { + map_val["envoydecorator"] = envoy_dec + } + if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { + map_val["traceid"] = trace_id + } + body, err := ioutil.ReadAll(req.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } + } else { + // response + resp, err := http.ReadResponse(buf, nil) + if err != nil { + fmt.Printf("Response error: ") + fmt.Println(err) + return nil, nil + } + fmt.Printf("HTTP Response Status %v code %v Proto %v\n", + resp.Status, resp.StatusCode, resp.Proto) + map_val["respstatus"] = resp.Status + map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode) + map_val["respproto"] = fmt.Sprintf("%v", resp.Proto) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } +} + +var Parser httpparser diff --git a/clover/clovisor/libclovisor/libproto/http_alt.go b/clover/clovisor/libclovisor/libproto/http_alt.go new file mode 100644 index 0000000..f7581fb --- /dev/null +++ b/clover/clovisor/libclovisor/libproto/http_alt.go @@ -0,0 +1,93 @@ +// Copyright (c) Authors of Clover +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Apache License, Version 2.0 +// which accompanies this distribution, and is available at +// http://www.apache.org/licenses/LICENSE-2.0 + +package main + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +type httpparser string + +func (p httpparser) Parse(session_key string, + is_req bool, + data []byte) ([]byte, map[string]string) { + map_val := make(map[string]string) + reader := bytes.NewReader(data) + buf := bufio.NewReader(reader) + if is_req == true { + req, err := http.ReadRequest(buf) + if err != nil { + fmt.Printf("Request error: ") + fmt.Println(err) + return nil, nil + } else if req == nil { + fmt.Println("request is nil") + return nil, nil + } else { + fmt.Printf("HTTP Request Method %s url %v proto %v\n", + req.Method, req.URL, req.Proto) + map_val["reqmethod"] = req.Method + map_val["requrl"] = fmt.Sprintf("%v", req.URL) + map_val["reqproto"] = fmt.Sprintf("%v", req.Proto) + if user_agent := req.UserAgent(); len(user_agent) > 0 { + map_val["useragent"] = user_agent + } + if len(req.Host) > 0 { + map_val["host"] = req.Host + } + header := req.Header + if req_id := header.Get("X-Request-Id"); len(req_id) > 0 { + map_val["requestid"] = req_id + } + if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 { + map_val["envoydecorator"] = envoy_dec + } + if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 { + map_val["traceid"] = trace_id + } + body, err := ioutil.ReadAll(req.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } + } else { + // response + resp, err := http.ReadResponse(buf, nil) + if err != nil { + fmt.Printf("Response error: ") + fmt.Println(err) + return nil, nil + } + fmt.Printf("HTTP Response Status %v code %v Proto %v\n", + resp.Status, resp.StatusCode, resp.Proto) + map_val["respstatus"] = resp.Status + map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode) + map_val["respproto"] = fmt.Sprintf("%v", resp.Proto) + header := resp.Header + //fmt.Printf("Response Header contains %v\n", header) + if contentType := header.Get("Content-Type"); len(contentType) > 0 { + map_val["content-type"] = contentType + } + if lastMod := header.Get("Last-Modified"); len(lastMod) > 0 { + map_val["last-modified"] = lastMod + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("Error reading HTTP Request body %v\n", err.Error()) + } + return body, map_val + } +} + +var Parser httpparser diff --git a/clover/clovisor/libclovisor/mongo.yaml b/clover/clovisor/libclovisor/mongo.yaml new file mode 100644 index 0000000..5dea26e --- /dev/null +++ b/clover/clovisor/libclovisor/mongo.yaml @@ -0,0 +1,41 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + labels: + name: mongo + name: mongo-controller + namespace: clovisor +spec: + replicas: 1 + template: + metadata: + labels: + name: mongo + spec: + containers: + - image: mongo + name: mongo + ports: + - name: mongo + containerPort: 27017 + hostPort: 27017 + volumeMounts: + - name: mongo-storage + mountPath: /data/db + volumes: + - name: mongo-storage + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + labels: + name: mongo + name: mongo + namespace: clovisor +spec: + ports: + - port: 27017 + targetPort: 27017 + selector: + name: mongo diff --git a/clover/clovisor/libclovisor/redis.yaml b/clover/clovisor/libclovisor/redis.yaml new file mode 100644 index 0000000..43204ff --- /dev/null +++ b/clover/clovisor/libclovisor/redis.yaml @@ -0,0 +1,53 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: clovisor + labels: + name: clovisor +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + name: redis + redis-sentinel: "true" + role: master + name: redis + namespace: clovisor +spec: + containers: + - name: redis + image: k8s.gcr.io/redis:v1 + env: + - name: MASTER + value: "true" + ports: + - containerPort: 6379 + resources: + limits: + cpu: "0.1" + volumeMounts: + - mountPath: /redis-master-data + name: data + - name: sentinel + image: kubernetes/redis:v1 + env: + - name: SENTINEL + value: "true" + ports: + - containerPort: 26379 + volumes: + - name: data + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: redis + namespace: clovisor +spec: + ports: + - port: 6379 + selector: + name: redis +--- diff --git a/clover/clovisor/proto/http.so b/clover/clovisor/proto/http.so Binary files differnew file mode 100644 index 0000000..1a61fb2 --- /dev/null +++ b/clover/clovisor/proto/http.so |