summaryrefslogtreecommitdiffstats
path: root/clover/clovisor/libclovisor
diff options
context:
space:
mode:
Diffstat (limited to 'clover/clovisor/libclovisor')
-rw-r--r--clover/clovisor/libclovisor/clovisor_bcc.go948
-rw-r--r--clover/clovisor/libclovisor/clovisor_cfg.go189
-rw-r--r--clover/clovisor/libclovisor/clovisor_k8s.go272
-rwxr-xr-xclover/clovisor/libclovisor/ebpf/node_interface.c158
-rwxr-xr-xclover/clovisor/libclovisor/ebpf/session_tracking.c292
-rw-r--r--clover/clovisor/libclovisor/jaeger-all-in-one-template.yml151
-rwxr-xr-xclover/clovisor/libclovisor/libproto/build-plugin9
-rw-r--r--clover/clovisor/libclovisor/libproto/clovisor_http.go84
-rw-r--r--clover/clovisor/libclovisor/libproto/http_alt.go93
-rw-r--r--clover/clovisor/libclovisor/mongo.yaml41
-rw-r--r--clover/clovisor/libclovisor/redis.yaml53
11 files changed, 2290 insertions, 0 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go
new file mode 100644
index 0000000..8f9a7bf
--- /dev/null
+++ b/clover/clovisor/libclovisor/clovisor_bcc.go
@@ -0,0 +1,948 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+ "encoding/hex"
+ //"encoding/json"
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net"
+ //"net/http"
+ "plugin"
+ "strconv"
+ "strings"
+ "time"
+
+ //"github.com/go-redis/redis"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/iovisor/gobpf/bcc"
+ opentracing "github.com/opentracing/opentracing-go"
+ "github.com/vishvananda/netlink"
+
+ "golang.org/x/sys/unix"
+)
+
+/*
+#cgo CFLAGS: -I/usr/include/bcc/compat
+#cgo LDFLAGS: -lbcc
+#include <bcc/bcc_common.h>
+#include <bcc/libbpf.h>
+*/
+import "C"
+
+type Parser interface {
+ Parse(session_key string, is_req bool, data []byte)([]byte, map[string]string)
+}
+
+type ClovisorBCC struct {
+ stopChan chan bool
+ // TODO(s3wong): remove once k8s watcher available
+ qdisc *netlink.GenericQdisc
+}
+
+type session_key_t struct {
+ src_ip uint32
+ dst_ip uint32
+ src_port uint16
+ dst_port uint16
+}
+
+type session_t struct {
+ Req_time uint64
+ Resp_time uint64
+}
+
+type egress_match_t struct {
+ dst_ip uint32
+ dst_port uint16
+}
+
+type egress_match_cfg struct {
+ egress_match egress_match_t
+ action string
+ params string
+}
+
+type session_info_t struct {
+ done bool
+ service string
+ generalInfo map[string]string
+ traces []map[string]string
+ reqBuf []byte
+ respBuf []byte
+}
+
+const (
+ HTTP = 1 << iota
+ HTTP2 = 1 << iota
+ TCP = 1 << iota
+ UDP = 1 << iota
+)
+
+//var sessionMap map[string]map[string]string;
+var sessionMap map[string]*session_info_t;
+var protocolParser = map[string]Parser{};
+var defaultModPath = map[string]string{
+ "http": "/proto/http.so",
+}
+var tracerMap = map[string]opentracing.Tracer{};
+
+var veth_ifidx_command = "cat /sys/class/net/eth0/iflink";
+
+var protocolMap = map[string]int{
+ "http": 1,
+ "http2": 2,
+ "tcp": 3,
+ "udp": 4,
+}
+
+var traceTable string = "NetworkTraces"
+
+/*
+ * redisConnect: redis client connecting to redis server
+func redisConnect() *redis.Client {
+ client := redis.NewClient(&redis.Options{
+ Addr: fmt.Sprintf("%s:6379", redisServer),
+ Password: "",
+ DB: 0,
+ })
+ return client
+}
+ */
+
+func linkSetup(ifname string) netlink.Link {
+ link, err := netlink.LinkByName(ifname)
+ netlink.LinkSetUp(link)
+ if err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ return link
+}
+
+/*
+ * dumpBPFTable: for debug purpose
+ */
+func dumpBPFTable(table *bcc.Table) {
+ iterator := table.Iter()
+ if iterator == nil {
+ fmt.Printf("Table %v does not exist\n", table.Name())
+ } else {
+ for iterator.Next() {
+ key_str, _ := table.KeyBytesToStr(iterator.Key())
+ leaf_str, _ := table.LeafBytesToStr(iterator.Leaf())
+ fmt.Printf("table %v key: %v leaf: %v\n", table.Name(), key_str, leaf_str)
+ }
+ }
+}
+
+func loadProtoParser(protocol string, update bool) error {
+ var modPath = ""
+
+ if !update {
+ if _, ok := protocolParser[protocol]; ok {
+ fmt.Printf("Found parse function for protocol %s\n", protocol)
+ return nil
+ }
+ }
+
+ client := redisConnect()
+
+ redisResult := client.HGet(ProtoCfg, protocol)
+ if redisResult.Err() == nil {
+ if len(redisResult.Val()) > 0 {
+ modPath = redisResult.Val()
+ }
+ }
+ if len(modPath) == 0 {
+ if _, ok := defaultModPath[protocol]; ok {
+ modPath = defaultModPath[protocol]
+ } else {
+ return errors.New(fmt.Sprintf("Unable to find module path for protocol %s", protocol))
+ }
+ }
+
+ fmt.Printf("Loading plugin for protocol %v with %v\n", protocol, modPath)
+
+ plug, err := plugin.Open(modPath)
+ if err != nil {
+ fmt.Println(err)
+ return err
+ }
+
+ symParse, err := plug.Lookup("Parser")
+ if err != nil {
+ fmt.Println(err)
+ return err
+ }
+
+ var parser Parser
+ parser, ok := symParse.(Parser)
+ if !ok {
+ fmt.Printf("Unexpected type from mod %s symbol parse\n", modPath)
+ return errors.New(fmt.Sprintf("Wrong type for func parse from %s", modPath))
+ }
+
+ protocolParser[protocol] = parser
+ return nil
+}
+
+func print_network_traces() {
+ /*
+ client := redisConnect()
+
+ traces, err := client.HGetAll(traceTable).Result()
+ if err != nil {
+ fmt.Printf("Error retriving traces from redis: %v\n", err.Error())
+ return
+ }
+ */
+ /*
+ structure:
+ "done": "true",
+ "traces": array of protocol traces
+ [0] : "admin": map[string]string
+ [1] : "ipv4 or ipv6": map[string]string
+ [2] : "tcp or udp": map[string]string
+ [3] : "http"...
+ */
+ /*
+ for key, value := range traces {
+ traceMap := map[string]interface{}
+ json.Unmarshal([]byte(value), &traceMap)
+ if _, ok := traceMap["done"]; ok {
+ span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key))
+ for idx, trace := range traceMap["traces"] {
+ span.SetTag(fmt.Sprintf("protocol-%d", idx), trace['protocol'])
+ for tag, tagVal := range trace {
+ if tag == "protocol" {
+ continue
+ }
+ span.SetTag(tag, tagVal)
+ }
+ }
+ span.Finish()
+ ret := client.HDel(traceTable, key)
+ if ret.Err() != nil {
+ fmt.Printf("Error deleting %v from %v: %v\n", key, traceTable, ret.Err())
+ }
+ }
+ }
+ */
+ for key, value := range sessionMap {
+ if value.done {
+ tracer := tracerMap[value.service]
+ span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key))
+ for genTag, genVal := range value.generalInfo {
+ fmt.Printf("general info writing %v: %v\n", genTag, genVal)
+ span.SetTag(genTag, genVal)
+ }
+ for idx, trace := range value.traces {
+ span.SetTag(fmt.Sprintf("protocol-%d", idx), trace["protocol"])
+ for tag, tagVal := range trace {
+ if tag == "protocol" {
+ continue
+ }
+ fmt.Printf("%v writing %v: %v\n", trace["protocol"], tag, tagVal)
+ span.SetTag(tag, tagVal)
+ }
+ }
+ span.Finish()
+ delete(sessionMap, key)
+ }
+ }
+}
+
+func handle_skb_event(data *[]byte, node_name string, pod_name string,
+ session_table *bcc.Table,
+ monitoring_info *monitoring_info_t,
+ egress_match_list []egress_match_t,
+ svc_name string) (error) {
+ //fmt.Printf("monitoring info has %v\n", monitoring_info)
+ fmt.Printf("\n\nnode[%s] pod[%s]\n%s\n", node_name, pod_name, hex.Dump(*data))
+ var ipproto layers.IPProtocol
+ var src_ip, dst_ip uint32
+ var src_port, dst_port uint16
+ var session_key, src_ip_str, dst_ip_str string
+ is_req := false
+ proto := HTTP
+ is_ingress:= binary.LittleEndian.Uint32((*data)[0:4])
+ packet := gopacket.NewPacket((*data)[4:len(*data)],
+ layers.LayerTypeEthernet,
+ gopacket.Default)
+ if ipv4_layer := packet.Layer(layers.LayerTypeIPv4); ipv4_layer != nil {
+ ipv4, _ := ipv4_layer.(*layers.IPv4)
+ src_ip_str = ipv4.SrcIP.String()
+ dst_ip_str = ipv4.DstIP.String()
+ ipproto = ipv4.Protocol
+ fmt.Printf("Source: %s Dest: %s\n", src_ip_str, dst_ip_str)
+ // Note: the src_ip and dst_ip var here are ONLY being used as
+ // lookup key to eBPF hash table, hence preserving network
+ // byte order
+ src_ip = binary.BigEndian.Uint32(ipv4.SrcIP)
+ dst_ip = binary.BigEndian.Uint32(ipv4.DstIP)
+ }
+ tcp_layer := packet.Layer(layers.LayerTypeTCP)
+ if tcp_layer != nil {
+ tcp, _ := tcp_layer.(*layers.TCP)
+ fmt.Printf("From src port %d to dst port %d [%v]: FIN:%v|SYN:%v|RST:%v|PSH:%v|ACK:%v|URG:%v|ECE:%v|CWR:%v|NS:%v\n",
+ tcp.SrcPort, tcp.DstPort, tcp.DataOffset, tcp.FIN, tcp.SYN,
+ tcp.RST, tcp.PSH, tcp.ACK, tcp.URG, tcp.ECE, tcp.CWR, tcp.NS)
+ //src_port := binary.LittleEndian.Uint16(uint16(tcp.SrcPort))
+ //dst_port := binary.LittleEndian.Uint16(uint16(tcp.DstPort))
+ src_port = uint16(tcp.SrcPort)
+ dst_port = uint16(tcp.DstPort)
+ } else {
+ fmt.Printf("Non-TCP packet, skip tracing...\n")
+ return nil
+ }
+ fmt.Printf("proto: %d is_ingress: %d data length %v\n", proto, is_ingress, len(*data))
+ fmt.Println("dst_port is ", dst_port)
+ if dst_port == 0 {
+ return nil
+ }
+ // TODO(s3wong): dump table
+ dumpBPFTable(session_table)
+ egress_port_req := false
+ for _, port := range egress_match_list {
+ if port.dst_port == dst_port {
+ egress_port_req = true
+ break
+ }
+ }
+ if dst_port == uint16(monitoring_info.port_num) || egress_port_req {
+ is_req = true
+ }
+ if is_req {
+ session_key = fmt.Sprintf("%x:%x:%d:%d:%d", src_ip, dst_ip, ipproto,
+ src_port, dst_port)
+ } else {
+ session_key = fmt.Sprintf("%x:%x:%d:%d:%d", dst_ip, src_ip, ipproto,
+ dst_port, src_port)
+ }
+ var sess_map *session_info_t
+ if _, ok := sessionMap[session_key]; !ok {
+ sess_map = &session_info_t{}
+ sess_map.done = false
+ sess_map.service = svc_name
+ sess_map.generalInfo = make(map[string]string)
+ sess_map.traces = []map[string]string{}
+ sess_map.reqBuf = []byte{}
+ sess_map.respBuf = []byte{}
+ sessionMap[session_key] = sess_map
+ zero := strconv.Itoa(0)
+ sessionMap[session_key].generalInfo["reqpakcount"] = zero
+ sessionMap[session_key].generalInfo["reqbytecount"] = zero
+ sessionMap[session_key].generalInfo["resppakcount"] = zero
+ sessionMap[session_key].generalInfo["respbytecount"] = zero
+ sessionMap[session_key].generalInfo["nodename"] = node_name
+ sessionMap[session_key].generalInfo["podname"] = pod_name
+ } else {
+ sess_map = sessionMap[session_key]
+ }
+
+ curr_pak_count := 0
+ curr_byte_count := 0
+ map_val := sess_map.generalInfo
+ if is_req {
+ curr_pak_count, _ = strconv.Atoi(map_val["reqpakcount"])
+ curr_byte_count, _ = strconv.Atoi(map_val["reqbytecount"])
+ } else {
+ curr_pak_count, _ = strconv.Atoi(map_val["resppakcount"])
+ curr_byte_count, _ = strconv.Atoi(map_val["respbytecount"])
+ }
+ curr_pak_count++
+ curr_byte_count += len(packet.Data())
+ if is_req {
+ map_val["reqpakcount"] = strconv.Itoa(curr_pak_count)
+ map_val["reqbytecount"] = strconv.Itoa(curr_byte_count)
+ } else {
+ map_val["resppakcount"] = strconv.Itoa(curr_pak_count)
+ map_val["respbytecount"] = strconv.Itoa(curr_byte_count)
+ }
+
+ if is_req {
+ // TODO (s3wong): just do IPv4 and TCP without using the plugin for now
+ // the condition check itself is cheating also...
+ if len(sess_map.traces) <= 1 {
+ ipv4Map := make(map[string]string)
+ ipv4Map["protocol"] = "IPv4"
+ ipv4Map["srcip"] = src_ip_str
+ ipv4Map["dstip"] = dst_ip_str
+ sess_map.traces = append(sess_map.traces, ipv4Map)
+ tcpMap := make(map[string]string)
+ tcpMap["protocol"] = "TCP"
+ tcpMap["srcport"] = fmt.Sprintf("%d", src_port)
+ tcpMap["dstport"] = fmt.Sprintf("%d", dst_port)
+ sess_map.traces = append(sess_map.traces, tcpMap)
+ }
+ }
+
+ var dataptr []byte
+ app_layer := packet.ApplicationLayer()
+ errStr := ""
+ if app_layer != nil {
+ if is_req {
+ dataptr = append(sess_map.reqBuf, app_layer.Payload()...)
+ } else {
+ dataptr = append(sess_map.respBuf, app_layer.Payload()...)
+ }
+ for _, protocol := range monitoring_info.protocols {
+ if _, ok := protocolParser[protocol]; ok {
+ parser := protocolParser[protocol]
+ new_dataptr, parseMap := parser.Parse(session_key, is_req,
+ dataptr)
+ if parseMap != nil {
+ protocolTag := strings.ToUpper(protocol)
+ merged := false
+ for _, existing := range sess_map.traces {
+ if existing["protocol"] == protocolTag {
+ for k, v := range parseMap {
+ existing[k] = v
+ }
+ merged = true
+ break
+ }
+ }
+ if !merged {
+ parseMap["protocol"] = strings.ToUpper(protocol)
+ sess_map.traces = append(sess_map.traces, parseMap)
+ }
+ dataptr = new_dataptr
+ } else {
+ // offset to packet is off, no need to continue
+ // parsing, return error
+ errStr = fmt.Sprintf("Error: unable to parse protocol %v", protocol)
+ fmt.Println(errStr)
+ //return errors.New(errStr)
+ break
+ }
+ }
+ }
+ } else {
+ fmt.Printf("No application layer, TCP packet\n")
+ return nil
+ }
+
+ if len(errStr) > 0 {
+ // buffer
+ if is_req {
+ sess_map.reqBuf = append([]byte(nil), dataptr...)
+ } else {
+ sess_map.respBuf = append([]byte(nil), dataptr...)
+ }
+ //sessionMap[session_key] = sess_map
+ } else {
+ session_key := session_key_t {
+ src_ip: dst_ip,
+ dst_ip: src_ip,
+ src_port: dst_port,
+ dst_port: src_port,
+ }
+ key_buf := &bytes.Buffer{}
+ err := binary.Write(key_buf, binary.LittleEndian, session_key)
+ if err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ key := append([]byte(nil), key_buf.Bytes()...)
+ if leaf, err := session_table.Get(key); err != nil {
+ fmt.Printf("Failed to lookup key %v with err %v\n", session_key, err)
+ return nil
+ } else {
+ var duration uint64 = 0
+ leaf_buf := bytes.NewBuffer(leaf)
+ if leaf_buf == nil {
+ fmt.Println("Error: unable to allocate new byte buffer")
+ return nil
+ }
+ session := session_t{}
+ if err = binary.Read(leaf_buf, binary.LittleEndian, &session);
+ err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ if session.Resp_time == 0 {
+ fmt.Printf("session response time not set yet\n")
+ } else {
+ duration = (session.Resp_time - session.Req_time)/1000
+ fmt.Printf("session time : %v\n", session)
+ fmt.Printf("Duration: %d usec\n", duration)
+ }
+ map_val["duration"] = fmt.Sprintf("%v usec", duration)
+
+ node, node_session, err := getNodeIntfSession(session_key)
+ if err == nil {
+ map_val["node-interface"] = node
+ map_val["node-request-ts"] = fmt.Sprintf("%v", node_session.Req_time)
+ map_val["node-response-ts"] = fmt.Sprintf("%v", node_session.Resp_time)
+ delNodeIntfSession(node, key)
+ } else {
+ fmt.Printf("Session not found in any node interface... posssibly local?")
+ }
+
+ if duration > 0 {
+ sess_map.done = true
+ err := session_table.Delete(key)
+ if err != nil {
+ fmt.Printf("Error deleting key %v: %v\n", key, err)
+ return err
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+func setTrafficTable(traffic_table *bcc.Table, port_num int, protocol_id string, dump_table bool) error {
+ key, _ := traffic_table.KeyStrToBytes(strconv.Itoa(port_num))
+ leaf, _ := traffic_table.LeafStrToBytes(strconv.Itoa(protocolMap[protocol_id]))
+ if err := traffic_table.Set(key, leaf); err != nil {
+ fmt.Printf("Failed to set traffic table tcpdports: %v\n", err)
+ return err
+ }
+ if dump_table {
+ dumpBPFTable(traffic_table)
+ }
+ return nil
+}
+
+func setEgressTable(egress_table *bcc.Table,
+ egress_match_list []egress_match_t,
+ action int,
+ dump_table bool) error {
+ for _, egress_match := range egress_match_list {
+ key_buf := &bytes.Buffer{}
+ err := binary.Write(key_buf, binary.LittleEndian, egress_match)
+ if err != nil {
+ fmt.Printf("Error converting key %v into binary: %v\n", egress_match, err)
+ continue
+ }
+ key := append([]byte(nil), key_buf.Bytes()...)
+ leaf, _ := egress_table.LeafStrToBytes(strconv.Itoa(action))
+ if err := egress_table.Set(key, leaf); err != nil {
+ fmt.Printf("Failed to add key %v:%v to egress table: %v\n", key,leaf,err)
+ return err
+ }
+ }
+ if dump_table {
+ dumpBPFTable(egress_table)
+ }
+ return nil
+}
+
+var nodeintfFilterList = [...]string {"lo", "veth", "docker", "flannel"}
+
+func filterNodeIntf(intf string) bool {
+ for _, substring := range nodeintfFilterList {
+ if strings.Contains(intf, substring) {
+ return false
+ }
+ }
+ return true
+}
+
+type nodeIntf struct {
+ bpfMod *bcc.Module
+ ipTrackTable *bcc.Table
+ sessionTable *bcc.Table
+}
+
+var nodeIntfMap = map[string]*nodeIntf{}
+
+func setupNodeIntf(ifindex int) (*nodeIntf, error) {
+ buf, err := ioutil.ReadFile("libclovisor/ebpf/node_interface.c")
+ if err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+ code := string(buf)
+
+ bpf_mod := bcc.NewModule(code, []string{})
+
+ ingress_fn, err := bpf_mod.Load("handle_ingress",
+ C.BPF_PROG_TYPE_SCHED_CLS,
+ 1, 65536)
+ if err != nil {
+ fmt.Printf("Failed to load node interface ingress func: %v\n", err)
+ return nil, err
+ }
+
+ egress_fn, err := bpf_mod.Load("handle_egress",
+ C.BPF_PROG_TYPE_SCHED_CLS,
+ 1, 65536)
+ if err != nil {
+ fmt.Printf("Failed to load node interface egress func: %v\n", err)
+ return nil, err
+ }
+
+ ip_track_table := bcc.NewTable(bpf_mod.TableId("ip2track"), bpf_mod)
+ node_sess_table := bcc.NewTable(bpf_mod.TableId("node_sessions"), bpf_mod)
+
+ // check if qdisc clsact filter for this interface already exists
+ link, err := netlink.LinkByIndex(ifindex)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ qdiscs, err := netlink.QdiscList(link)
+ if err == nil {
+ for _, qdisc_ := range qdiscs {
+ if qdisc_.Type() == "clsact" {
+ netlink.QdiscDel(qdisc_)
+ break
+ }
+ }
+ }
+ }
+
+ attrs := netlink.QdiscAttrs {
+ LinkIndex: ifindex,
+ Handle: netlink.MakeHandle(0xffff, 0),
+ Parent: netlink.HANDLE_CLSACT,
+ }
+
+ qdisc := &netlink.GenericQdisc {
+ QdiscAttrs: attrs,
+ QdiscType: "clsact",
+ }
+
+ if err := netlink.QdiscAdd(qdisc); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ ingress_filter_attrs := netlink.FilterAttrs{
+ LinkIndex: ifindex,
+ Parent: netlink.MakeHandle(0xffff, 0xfff3),
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ }
+ ingress_filter := &netlink.BpfFilter{
+ FilterAttrs: ingress_filter_attrs,
+ Fd: ingress_fn,
+ Name: "handle_ingress",
+ DirectAction: true,
+ }
+ if ingress_filter.Fd < 0 {
+ fmt.Println("Failed to load node interface ingress bpf program")
+ return nil, err
+ }
+
+ if err := netlink.FilterAdd(ingress_filter); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ egress_filter_attrs := netlink.FilterAttrs{
+ LinkIndex: ifindex,
+ Parent: netlink.MakeHandle(0xffff, 0xfff2),
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ }
+ egress_filter := &netlink.BpfFilter{
+ FilterAttrs: egress_filter_attrs,
+ Fd: egress_fn,
+ Name: "handle_egress",
+ DirectAction: true,
+ }
+ if egress_filter.Fd < 0 {
+ fmt.Println("Failed to load node interface egress bpf program")
+ return nil, err
+ }
+
+ if err := netlink.FilterAdd(egress_filter); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ ip_track_table.DeleteAll()
+ node_sess_table.DeleteAll()
+
+ return &nodeIntf{
+ bpfMod: bpf_mod,
+ ipTrackTable: ip_track_table,
+ sessionTable: node_sess_table,
+ }, nil
+}
+
+func ClovisorPhyInfSetup() error {
+ intfList, err := net.Interfaces()
+ if err != nil {
+ fmt.Printf("Failed to get node interfaces: %v\n", err)
+ return err
+ }
+
+ for _, f := range intfList {
+ if !filterNodeIntf(f.Name) {
+ continue
+ }
+ fmt.Printf("Tracking node interface %v w/ index %v\n", f.Name, f.Index)
+ bpf_node_intf, err := setupNodeIntf(f.Index)
+ if err != nil {
+ fmt.Printf("Failed to set up node interface %v: %v\n", f.Name, err)
+ return err
+ }
+ nodeIntfMap[f.Name] = bpf_node_intf
+ }
+ return nil
+}
+
+func setIPTrackingTable(table *bcc.Table, ipaddr uint32, action int) error {
+ key, _ := table.KeyStrToBytes(strconv.Itoa(int(ipaddr)))
+ leaf, _ := table.LeafStrToBytes(strconv.Itoa(action))
+ if err := table.Set(key, leaf); err != nil {
+ fmt.Printf("Failed to set IP tracking table: %v\n", err)
+ return err
+ }
+ dumpBPFTable(table)
+ return nil
+}
+
+func setNodeIntfTrackingIP(ipaddr uint32) {
+ for name, node_intf := range nodeIntfMap {
+ err := setIPTrackingTable(node_intf.ipTrackTable, ipaddr, 1)
+ if err != nil {
+ fmt.Printf("Failed to add ip address %v to node interface %v: %v\n", backtoIP4(int64(ipaddr)), name, err)
+ }
+ }
+}
+
+func getNodeIntfSession(session_key session_key_t) (string, *session_t, error) {
+ key_buf := &bytes.Buffer{}
+ binary.Write(key_buf, binary.LittleEndian, session_key)
+ key := append([]byte(nil), key_buf.Bytes()...)
+
+ for node, node_intf := range nodeIntfMap {
+ fmt.Printf("For node interface %v... ", node)
+ //dumpBPFTable(node_intf.sessionTable)
+ if leaf, err := node_intf.sessionTable.Get(key); err == nil {
+ leaf_buf := bytes.NewBuffer(leaf)
+ session := session_t{}
+ binary.Read(leaf_buf, binary.LittleEndian, &session)
+ return node, &session, nil
+ }
+ }
+ return "", nil, errors.New("session not found")
+}
+
+func delNodeIntfSession(node_iname string, key []byte) error {
+ nodeIntf := nodeIntfMap[node_iname]
+ err := nodeIntf.sessionTable.Delete(key)
+ if err != nil {
+ fmt.Printf("Error deleting session %v from node interface %v: %v\n",
+ key, node_iname, err)
+ }
+ return err
+}
+
+func ClovisorNewPodInit(k8s_client *ClovisorK8s,
+ node_name string,
+ pod_name string,
+ monitoring_info *monitoring_info_t) (*ClovisorBCC, error) {
+
+ output, err := k8s_client.exec_command(veth_ifidx_command, monitoring_info)
+ if err != nil {
+ return nil, err
+ }
+
+ ifindex , err := strconv.Atoi(output)
+ if err != nil {
+ fmt.Printf("Error converting %v to ifindex, error: %v\n", output, err.Error())
+ return nil, err
+ }
+
+ sessionMap = map[string]*session_info_t{};
+
+ fmt.Printf("Beginning network tracing for pod %v\n", pod_name)
+
+ buf, err := ioutil.ReadFile("libclovisor/ebpf/session_tracking.c")
+ if err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+ code := string(buf)
+
+ bpf_mod := bcc.NewModule(code, []string{})
+ //defer bpf_mod.Close()
+
+ ingress_fn, err := bpf_mod.Load("handle_ingress",
+ C.BPF_PROG_TYPE_SCHED_CLS,
+ 1, 65536)
+ if err != nil {
+ fmt.Println("Failed to load ingress func: %v\n", err)
+ return nil, err
+ }
+ fmt.Println("Loaded Ingress func to structure")
+
+ egress_fn, err := bpf_mod.Load("handle_egress",
+ C.BPF_PROG_TYPE_SCHED_CLS,
+ 1, 65536)
+ if err != nil {
+ fmt.Println("Failed to load egress func: %v\n", err)
+ return nil, err
+ }
+
+ fmt.Println("Loaded Egress func to structure")
+
+ traffic_table := bcc.NewTable(bpf_mod.TableId("dports2proto"), bpf_mod)
+ if err := setTrafficTable(traffic_table, int(monitoring_info.port_num),
+ monitoring_info.protocols[0], true);
+ err != nil {
+ fmt.Printf("Error on setting traffic port")
+ return nil, err
+ }
+
+ egress_match_list := get_egress_match_list(pod_name)
+
+ egress_table := bcc.NewTable(bpf_mod.TableId("egress_lookup_table"), bpf_mod)
+ if egress_match_list != nil {
+ if err := setEgressTable(egress_table, egress_match_list, 1, true); err != nil {
+ return nil, err
+ }
+ }
+
+ session_table := bcc.NewTable(bpf_mod.TableId("sessions"), bpf_mod)
+
+ // check if qdisc clsact filter for this interface already exists
+ link, err := netlink.LinkByIndex(ifindex)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ qdiscs, err := netlink.QdiscList(link)
+ if err == nil {
+ for _, qdisc := range qdiscs {
+ if qdisc.Type() == "clsact" {
+ netlink.QdiscDel(qdisc)
+ break
+ }
+ }
+ }
+ }
+
+ attrs := netlink.QdiscAttrs {
+ LinkIndex: ifindex,
+ Handle: netlink.MakeHandle(0xffff, 0),
+ Parent: netlink.HANDLE_CLSACT,
+ }
+
+ qdisc := &netlink.GenericQdisc {
+ QdiscAttrs: attrs,
+ QdiscType: "clsact",
+ }
+
+ if err := netlink.QdiscAdd(qdisc); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ fmt.Printf("Qdisc for clsact added for index %v\n", ifindex)
+
+ ingress_filter_attrs := netlink.FilterAttrs{
+ LinkIndex: ifindex,
+ Parent: netlink.MakeHandle(0xffff, 0xfff3),
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ }
+ ingress_filter := &netlink.BpfFilter{
+ FilterAttrs: ingress_filter_attrs,
+ Fd: ingress_fn,
+ Name: "handle_ingress",
+ DirectAction: true,
+ }
+ if ingress_filter.Fd < 0 {
+ fmt.Println("Failed to load ingress bpf program")
+ return nil, err
+ }
+
+ if err := netlink.FilterAdd(ingress_filter); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ egress_filter_attrs := netlink.FilterAttrs{
+ LinkIndex: ifindex,
+ Parent: netlink.MakeHandle(0xffff, 0xfff2),
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ }
+ egress_filter := &netlink.BpfFilter{
+ FilterAttrs: egress_filter_attrs,
+ Fd: egress_fn,
+ Name: "handle_egress",
+ DirectAction: true,
+ }
+ if egress_filter.Fd < 0 {
+ fmt.Println("Failed to load egress bpf program")
+ return nil, err
+ }
+
+ if err := netlink.FilterAdd(egress_filter); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ setNodeIntfTrackingIP(ip2Long(monitoring_info.pod_ip))
+
+ table := bcc.NewTable(bpf_mod.TableId("skb_events"), bpf_mod)
+
+ skb_rev_chan := make(chan []byte)
+
+ perfMap, err := bcc.InitPerfMap(table, skb_rev_chan)
+ if err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ stop := make(chan bool)
+ go func() {
+ fmt.Printf("Start tracing to Jaeger with service %v\n", monitoring_info.svc_name)
+ tracer, closer := initJaeger(monitoring_info.svc_name)
+ tracerMap[monitoring_info.svc_name] = tracer
+ ticker := time.NewTicker(500 * time.Millisecond)
+ for {
+ select {
+ case <- ticker.C:
+ print_network_traces()
+ case data := <-skb_rev_chan:
+ err = handle_skb_event(&data, node_name, pod_name, session_table,
+ monitoring_info, egress_match_list,
+ monitoring_info.svc_name)
+ if err != nil {
+ fmt.Printf("failed to decode received data: %s\n", err)
+ }
+ case <- stop:
+ fmt.Printf("Receiving stop for pod %v\n", pod_name)
+ ticker.Stop()
+ perfMap.Stop()
+ closer.Close()
+ // TODO(s3wong): uncomment remove qdisc del once k8s watcher implemented
+ //netlink.QdiscDel(qdisc)
+ bpf_mod.Close()
+ return
+ }
+ }
+ }()
+
+ perfMap.Start()
+ return &ClovisorBCC{
+ stopChan: stop,
+ qdisc: qdisc,
+ }, nil
+}
+
+func (clovBcc *ClovisorBCC) StopPod() {
+ // TODO(s3wong): remove qdisc del once k8s watcher implemented
+ netlink.QdiscDel(clovBcc.qdisc)
+ clovBcc.stopChan <- true
+}
diff --git a/clover/clovisor/libclovisor/clovisor_cfg.go b/clover/clovisor/libclovisor/clovisor_cfg.go
new file mode 100644
index 0000000..ca3200e
--- /dev/null
+++ b/clover/clovisor/libclovisor/clovisor_cfg.go
@@ -0,0 +1,189 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net"
+ "strconv"
+ "strings"
+
+ "github.com/go-redis/redis"
+ opentracing "github.com/opentracing/opentracing-go"
+ jaeger "github.com/uber/jaeger-client-go"
+ jaeger_config "github.com/uber/jaeger-client-go/config"
+)
+
+var redisServer string = "redis.clovisor"
+var jaegerCollector string = "jaeger-collector.clovisor:14268"
+var jaegerAgent string = "jaeger-agent.clovisor:6831"
+var ProtoCfg string = "clovisor_proto_cfg"
+var protoPluginCfgChan string = "clovisor_proto_plugin_cfg_chan"
+
+/*
+ * redisConnect: redis client connecting to redis server
+ */
+func redisConnect() *redis.Client {
+ client := redis.NewClient(&redis.Options{
+ Addr: fmt.Sprintf("%s:6379", redisServer),
+ Password: "",
+ DB: 0,
+ })
+ return client
+}
+
+func get_cfg_labels(node_name string) ([]string, error) {
+ client := redisConnect()
+ defer client.Close()
+ labels_list, err := client.LRange("clovisor_labels", 0, -1).Result()
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ return labels_list, err
+}
+
+func parse_label_cfg(label_cfg string) (string, string, string) {
+ label_slice := strings.Split(label_cfg, ":")
+ if len(label_slice) == 1 {
+ return label_slice[0], "", ""
+ }
+ return label_slice[0], label_slice[1], label_slice[2]
+}
+
+func get_egress_match_list(pod_name string) ([]egress_match_t) {
+ client := redisConnect()
+ defer client.Close()
+ egress_cfg_list, err := client.LRange("clovior_egress_match", 0, -1).Result()
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil
+ }
+ ret_list := make([]egress_match_t, 0, len(egress_cfg_list))
+ for _, em_cfg_str := range(egress_cfg_list) {
+ fmt.Printf("egress match cfg == %v\n", em_cfg_str)
+ em_cfg_slice := strings.Split(em_cfg_str, ":")
+ if len(em_cfg_slice) < 2 {
+ fmt.Printf("egress match config requires at least two fields [%v]\n", em_cfg_slice)
+ continue
+ } else if len(em_cfg_slice) == 3 {
+ if strings.Contains(pod_name, em_cfg_slice[2]) {
+ fmt.Printf("%v != %v, filtering out this config for pod %v\n",
+ em_cfg_slice[2], pod_name, pod_name)
+ continue
+ }
+ }
+ var ip uint32 = 0
+ if em_cfg_slice[0] != "0" {
+ ip = ip2Long(em_cfg_slice[0])
+ }
+ port_32, _ := strconv.Atoi(em_cfg_slice[1])
+ port := uint16(port_32)
+ ret_list = append(ret_list, egress_match_t{ip, port})
+ }
+ return ret_list
+}
+
+// following function comes from
+// https://www.socketloop.com/tutorials/golang-convert-ip-address-string-to-long-unsigned-32-bit-integer
+func ip2Long(ip string) uint32 {
+ var long uint32
+ //binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.LittleEndian, &long)
+ binary.Read(bytes.NewBuffer(net.ParseIP(ip).To4()), binary.BigEndian, &long)
+ return long
+}
+
+func backtoIP4(ipInt int64) string {
+ // need to do two bit shifting and “0xff” masking
+ b0 := strconv.FormatInt((ipInt>>24)&0xff, 10)
+ b1 := strconv.FormatInt((ipInt>>16)&0xff, 10)
+ b2 := strconv.FormatInt((ipInt>>8)&0xff, 10)
+ b3 := strconv.FormatInt((ipInt & 0xff), 10)
+ return b0 + "." + b1 + "." + b2 + "." + b3
+}
+
+func get_cfg_session_match() ([]egress_match_cfg, error) {
+ var ret_list []egress_match_cfg
+ client := redisConnect()
+ defer client.Close()
+ keys, err := client.HKeys("clovisor_session_match").Result()
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+ for _, key := range keys {
+ value, err := client.HGet("clovisor_session_match", key).Result()
+ if err != nil {
+ fmt.Println(err.Error())
+ continue
+ }
+ match_slice := strings.Split(key, "-")
+ dst_ip := ip2Long(match_slice[0])
+ dst_port, _ := strconv.Atoi(match_slice[1])
+ egress_match := egress_match_t{
+ dst_ip: dst_ip,
+ dst_port: uint16(dst_port),
+ }
+ // organize into internally understandable struct
+ ret_list = append(ret_list, egress_match_cfg{
+ egress_match: egress_match,
+ action: value,
+ })
+ }
+ return ret_list, nil
+}
+
+func initJaeger(service string) (opentracing.Tracer, io.Closer) {
+ cfg := &jaeger_config.Configuration{
+ Sampler: &jaeger_config.SamplerConfig{
+ Type: "const",
+ Param: 1,
+ },
+ Reporter: &jaeger_config.ReporterConfig{
+ LogSpans: true,
+ CollectorEndpoint: fmt.Sprintf("http://%s/api/traces", jaegerCollector),
+ LocalAgentHostPort: fmt.Sprintf("%s", jaegerAgent),
+ },
+ }
+ tracer, closer, err := cfg.New(service, jaeger_config.Logger(jaeger.StdLogger))
+ if err != nil {
+ panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
+ }
+ return tracer, closer
+}
+
+func get_jaeger_server() (string, error) {
+ client := redisConnect()
+ defer client.Close()
+ return client.Get("clovisor_jaeger_server").Result()
+}
+
+func Monitor_proto_plugin_cfg() {
+ client := redisConnect()
+ //defer client.Close()
+
+ pubsub := client.Subscribe(protoPluginCfgChan)
+ //defer pubsub.Close()
+
+ go func() {
+ for {
+ msg, err := pubsub.ReceiveMessage()
+ if err != nil {
+ fmt.Printf("Error getting protocol plugin configuration: %v\n", err)
+ return
+ }
+
+ fmt.Printf("Update on protocol %v notification received\n", msg.Payload)
+ loadProtoParser(msg.Payload, true)
+ }
+ }()
+}
diff --git a/clover/clovisor/libclovisor/clovisor_k8s.go b/clover/clovisor/libclovisor/clovisor_k8s.go
new file mode 100644
index 0000000..a53e308
--- /dev/null
+++ b/clover/clovisor/libclovisor/clovisor_k8s.go
@@ -0,0 +1,272 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package clovisor
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+
+ core_v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+type ClovisorK8s struct {
+ client *kubernetes.Clientset
+ config *rest.Config
+}
+
+type monitoring_info_t struct {
+ namespace string
+ svc_name string
+ pod_name string
+ container_name string
+ protocols []string
+ port_num uint32
+ pod_ip string
+}
+
+var DEFAULT_NAMESPACE = "default"
+var SUPPORTED_PROTOCOLS = [...]string {"tcp", "http"}
+
+func K8s_client_init(nodeName string) (*ClovisorK8s, error) {
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ client, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ fmt.Println(err.Error())
+ return nil, err
+ }
+
+ return &ClovisorK8s{
+ client: client,
+ config: config,
+ }, nil
+}
+
+func (client *ClovisorK8s) Get_monitoring_info(nodeName string) (map[string]*monitoring_info_t,
+ error) {
+
+ labels_list, err := get_cfg_labels(nodeName)
+ if err != nil {
+ fmt.Printf("Error getting cfg labels: %v\n", err)
+ return nil, err
+ }
+
+ mon_info_map := client.get_monitoring_pods(nodeName, labels_list)
+ if mon_info_map == nil {
+ return nil, errors.New("monitoring info empty")
+ }
+
+ return mon_info_map, nil
+}
+
+func (client *ClovisorK8s) getPodsForSvc(svc *core_v1.Service,
+ namespace string) (*core_v1.PodList, error) {
+ set := labels.Set(svc.Spec.Selector)
+ //label := strings.Split(set.AsSelector().String(), ",")[0]
+ //fmt.Printf("Trying to get pods for service %v with label %v from %v\n", svc.GetName(), label, set.AsSelector().String())
+ listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()}
+ //listOptions := metav1.ListOptions{LabelSelector: label}
+ return client.client.CoreV1().Pods(namespace).List(listOptions)
+}
+
+func (client *ClovisorK8s) get_monitoring_pods(nodeName string,
+ labels_list []string) (map[string]*monitoring_info_t) {
+ /*
+ * Three cases:
+ * 1.) no configured namespaces, monitoring all pods in default namesapce
+ * 2.) if any config only has namespace, monitoring all pods in namespace
+ * 3.) label is configured, only monitor pods with that label
+ */
+ var namespace string
+ ns_svc_map := make(map[string][]*core_v1.ServiceList)
+ monitoring_info := make(map[string]*monitoring_info_t)
+ if len(labels_list) == 0 {
+ // TODO(s3wong): set it to 'default'
+ //namespace = "linux-foundation-gke"
+ namespace = "default"
+ if svcs_list, err :=
+ client.client.CoreV1().Services(namespace).List(metav1.ListOptions{});
+ err != nil {
+ fmt.Printf("Error fetching service list for namespace %s\n",
+ namespace)
+ return nil
+ } else {
+ /*
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ */
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svcs_list)
+ }
+ } else {
+ for _, label_str := range labels_list {
+ var label_selector string
+ namespace, key, value := parse_label_cfg(label_str)
+ if len(namespace) == 0 {
+ fmt.Printf("Error in config: %s not a valid config\n", label_str)
+ continue
+ }
+ if len(key) == 0 {
+ fmt.Printf("Namespace only config for %s\n", namespace)
+ } else {
+ label_selector = fmt.Sprintf("%s=%s", key, value)
+ }
+ if svc_list, err :=
+ client.client.CoreV1().Services(namespace).List(metav1.ListOptions{
+ LabelSelector: label_selector,
+ }); err != nil {
+ fmt.Printf("Error listing services with label %v:%v:%v - %v\n",
+ key, value, namespace, err.Error())
+ continue
+ } else {
+ if _, ok := ns_svc_map[namespace]; !ok {
+ ns_svc_map[namespace] = []*core_v1.ServiceList{}
+ }
+ ns_svc_map[namespace] = append(ns_svc_map[namespace], svc_list)
+ }
+ }
+ }
+
+ for ns, svc_slice := range ns_svc_map {
+ for _, svc_list_ := range svc_slice {
+ for _, svc := range svc_list_.Items {
+ if ns == "default" && svc.GetName() == "kubernetes" {
+ continue
+ }
+ //fmt.Printf("Looking for supported protocols for service %v:%v\n", ns, svc.GetName())
+ //var svc_port_map = map[string]core_v1.ServicePort{}
+ var svc_port_map = map[string][]string{}
+ for _, svc_port := range svc.Spec.Ports {
+ if len(svc_port.Name) > 0 {
+ svc_protos := strings.Split(svc_port.Name, "-")
+ for _, proto := range svc_protos {
+ if err := loadProtoParser(proto, false); err == nil {
+ for _, sp := range SUPPORTED_PROTOCOLS {
+ if strings.Contains(proto, sp) {
+ target_port := svc_port.TargetPort.String()
+ svc_port_map[target_port] = append(svc_port_map[target_port], proto)
+ }
+ }
+ } else {
+ fmt.Printf("Unsupported protocol: %v\n", proto)
+ }
+ }
+ }
+ }
+ if len(svc_port_map) == 0 {
+ fmt.Printf("Found no port with supported protocol for %v:%v\n", ns, svc.GetName())
+ continue
+ } else {
+ fmt.Printf("svc_port_map for service %v is %v\n", svc.GetName(), svc_port_map)
+ }
+ //fmt.Printf("Fetching pods for namespace %v service: %v\n", ns, svc.GetName())
+ pod_list, err := client.getPodsForSvc(&svc, ns)
+ if err != nil {
+ fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+ continue
+ }
+ /*
+ labelSet := labels.Set(svc.Spec.Selector)
+ pod_list, err := client.client.CoreV1().Pods(ns).List(metav1.ListOptions{})
+ if err != nil {
+ fmt.Print("Error fetching pods for %v:%v [%v]\n", ns, svc.GetName(), err)
+ continue
+ }
+ */
+ for _, pod := range pod_list.Items {
+ if pod.Spec.NodeName == nodeName {
+ for _, container := range pod.Spec.Containers {
+ var port_num uint32
+ var tp_string string
+ for _, port := range container.Ports {
+ port_num = uint32(port.ContainerPort)
+ tp_string = strconv.Itoa(int(port_num))
+ if _, in := svc_port_map[tp_string]; !in {
+ continue
+ }
+ pod_name := pod.GetName()
+ monitoring_info[pod_name] = &(monitoring_info_t{})
+ monitoring_info[pod_name].namespace = ns
+ monitoring_info[pod_name].svc_name = svc.GetName()
+ monitoring_info[pod_name].pod_name = pod_name
+ monitoring_info[pod_name].container_name = container.Name
+ monitoring_info[pod_name].port_num = port_num
+ monitoring_info[pod_name].protocols = svc_port_map[tp_string]
+ monitoring_info[pod_name].pod_ip = pod.Status.PodIP
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return monitoring_info
+}
+
+func (client *ClovisorK8s) exec_command(command string,
+ monitoring_info *monitoring_info_t) (string, error) {
+
+ // Following code based on:
+ // https://stackoverflow.com/questions/43314689/example-of-exec-in-k8ss-pod-by-using-go-client
+ // https://github.com/a4abhishek/Client-Go-Examples/blob/master/exec_to_pod/exec_to_pod.go
+ exec_req := client.client.CoreV1().RESTClient().Post().
+ Resource("pods").
+ Name(monitoring_info.pod_name).
+ Namespace(monitoring_info.namespace).
+ SubResource("exec")
+ scheme := runtime.NewScheme()
+ if err := core_v1.AddToScheme(scheme); err != nil {
+ fmt.Printf("Error in exec pods: %v\n", err.Error())
+ return "", err
+ }
+
+ parameterCodec := runtime.NewParameterCodec(scheme)
+ exec_req.VersionedParams(&core_v1.PodExecOptions{
+ Command: strings.Fields(command),
+ Container: monitoring_info.container_name,
+ Stdin: false,
+ Stdout: true,
+ Stderr: true,
+ TTY: false,
+ }, parameterCodec)
+
+ exec, err := remotecommand.NewSPDYExecutor(client.config, "POST", exec_req.URL())
+ if err != nil {
+ fmt.Printf("Error in remotecommand exec: %v\n", err.Error())
+ return "", err
+ }
+
+ var stdout, stderr bytes.Buffer
+ err = exec.Stream(remotecommand.StreamOptions{
+ Stdin: nil,
+ Stdout: &stdout,
+ Stderr: &stderr,
+ Tty: false,
+ })
+ if err != nil {
+ fmt.Printf("Error in exec stream: %v\n", err.Error())
+ return "", err
+ }
+
+ stdout_no_newline := strings.TrimSuffix(stdout.String(), "\n")
+ return stdout_no_newline, nil
+}
diff --git a/clover/clovisor/libclovisor/ebpf/node_interface.c b/clover/clovisor/libclovisor/ebpf/node_interface.c
new file mode 100755
index 0000000..cd14a50
--- /dev/null
+++ b/clover/clovisor/libclovisor/ebpf/node_interface.c
@@ -0,0 +1,158 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+#include <uapi/linux/if_ether.h>
+#include <uapi/linux/in.h>
+#include <uapi/linux/ip.h>
+#include <uapi/linux/tcp.h>
+#include <uapi/linux/pkt_cls.h>
+#include <uapi/linux/bpf.h>
+
+#include <bcc/proto.h>
+
+#define MAX_SESSION_TABLE_ENTRIES 8192
+
+typedef struct session_key_ {
+ u32 src_ip;
+ u32 dst_ip;
+ unsigned short src_port;
+ unsigned short dst_port;
+} session_key_t;
+
+typedef struct session_ {
+ u64 req_time;
+ u64 resp_time;
+} session_t;
+
+BPF_HASH(ip2track, u32, u32);
+BPF_HASH(node_sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES);
+
+struct eth_hdr {
+ unsigned char h_dest[ETH_ALEN];
+ unsigned char h_source[ETH_ALEN];
+ unsigned short h_proto;
+};
+
+static inline int ipv4_hdrlen(struct iphdr *ip4)
+{
+ return ip4->ihl * 4;
+}
+
+static inline int tcp_doff(struct tcphdr *tcp_hdr)
+{
+ return tcp_hdr->doff * 4;
+}
+
+static inline void fill_up_sess_key(session_key_t *key, u32 src_ip,
+ u32 dst_ip, u16 src_port, u16 dst_port)
+{
+ key->src_ip = src_ip;
+ key->dst_ip = dst_ip;
+ key->src_port = src_port;
+ key->dst_port = dst_port;
+}
+
+static inline void process_response(u32 src_ip, u32 dst_ip, u16 src_port,
+ u16 dst_port)
+{
+ session_key_t sess_key = {};
+ session_t *session_ptr = NULL;
+ fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+ session_ptr = node_sessions.lookup(&sess_key);
+ if (session_ptr != NULL) {
+ u64 resp_time = bpf_ktime_get_ns();
+ session_t update_session = {
+ session_ptr->req_time,
+ resp_time
+ };
+ node_sessions.update(&sess_key, &update_session);
+ }
+}
+
+static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port,
+ u16 dst_port)
+{
+ session_key_t sess_key = {};
+ session_t *session_ptr = NULL;
+ session_t new_session = {
+ bpf_ktime_get_ns(),
+ 0
+ };
+ fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+ session_ptr = node_sessions.lookup(&sess_key);
+ if (! session_ptr) {
+ node_sessions.insert(&sess_key, &new_session);
+ }
+}
+
+static inline void ingress_parsing(struct tcphdr *tcp_hdr,
+ struct iphdr *ipv4_hdr)
+{
+ unsigned int dst_ip = ntohl(ipv4_hdr->daddr);
+ int ret = 0;
+
+ unsigned int *proto = ip2track.lookup(&dst_ip);
+ if (proto != NULL) {
+ process_response(ntohl(ipv4_hdr->daddr),
+ ntohl(ipv4_hdr->saddr),
+ ntohs(tcp_hdr->dest),
+ ntohs(tcp_hdr->source));
+ }
+}
+
+static inline void egress_parsing(struct tcphdr *tcp_hdr,
+ struct iphdr *ipv4_hdr)
+{
+ unsigned int src_ip = ntohl(ipv4_hdr->saddr);
+
+ unsigned int *proto = ip2track.lookup(&src_ip);
+
+ if (proto != NULL) {
+ process_request(ntohl(ipv4_hdr->saddr),
+ ntohl(ipv4_hdr->daddr),
+ ntohs(tcp_hdr->source),
+ ntohs(tcp_hdr->dest));
+ }
+}
+
+static inline int handle_packet(struct __sk_buff *skb, int is_ingress)
+{
+ void *data = (void *)(long)skb->data;
+ void *data_end = (void *)(long)skb->data_end;
+ struct eth_hdr *eth = data;
+ struct iphdr *ipv4_hdr = data + sizeof(*eth);
+ struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr);
+
+ /* TODO(s3wong): assuming TCP only for now */
+ /* single length check */
+ if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end)
+ return TC_ACT_OK;
+
+ if (eth->h_proto != htons(ETH_P_IP))
+ return TC_ACT_OK;
+
+ // TODO(s3wong): no support for IP options
+ if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5)
+ return TC_ACT_OK;
+
+ if (is_ingress == 1) {
+ ingress_parsing(tcp_hdr, ipv4_hdr);
+ } else{
+ egress_parsing(tcp_hdr, ipv4_hdr);
+ }
+
+ return TC_ACT_OK;
+}
+
+int handle_ingress(struct __sk_buff *skb)
+{
+ return handle_packet(skb, 1);
+}
+
+int handle_egress(struct __sk_buff *skb)
+{
+ return handle_packet(skb, 0);
+}
diff --git a/clover/clovisor/libclovisor/ebpf/session_tracking.c b/clover/clovisor/libclovisor/ebpf/session_tracking.c
new file mode 100755
index 0000000..ea68788
--- /dev/null
+++ b/clover/clovisor/libclovisor/ebpf/session_tracking.c
@@ -0,0 +1,292 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+#include <uapi/linux/if_ether.h>
+#include <uapi/linux/in.h>
+#include <uapi/linux/ip.h>
+#include <uapi/linux/tcp.h>
+#include <uapi/linux/pkt_cls.h>
+#include <uapi/linux/bpf.h>
+
+#include <bcc/proto.h>
+
+#define HTTP_HDR_MIN_LEN 7
+#define MAX_SESSION_TABLE_ENTRIES 8192
+
+typedef enum {
+ UNDEFINED = 0,
+ HTTP = 1,
+ HTTP2 = 2,
+ TCP = 3,
+ UDP = 4,
+} app_proto_t;
+
+typedef struct session_key_ {
+ u32 src_ip;
+ u32 dst_ip;
+ unsigned short src_port;
+ unsigned short dst_port;
+} session_key_t;
+
+typedef struct session_ {
+ u64 req_time;
+ u64 resp_time;
+} session_t;
+
+typedef struct egress_match_ {
+ u32 dst_ip;
+ unsigned short dst_port;
+} egress_match_t;
+
+typedef enum policy_action_ {
+ RECORD = 1,
+} policy_action_t;
+
+BPF_PERF_OUTPUT(skb_events);
+BPF_HASH(dports2proto, u16, u32);
+BPF_HASH(egress_lookup_table, egress_match_t, policy_action_t);
+BPF_HASH(sessions, session_key_t, session_t, MAX_SESSION_TABLE_ENTRIES);
+
+struct eth_hdr {
+ unsigned char h_dest[ETH_ALEN];
+ unsigned char h_source[ETH_ALEN];
+ unsigned short h_proto;
+};
+
+static inline int ipv4_hdrlen(struct iphdr *ip4)
+{
+ return ip4->ihl * 4;
+}
+
+static inline int tcp_doff(struct tcphdr *tcp_hdr)
+{
+ return tcp_hdr->doff * 4;
+}
+
+static inline int http_parsing(void *data, void *data_end)
+{
+
+ int is_http = 1;
+ if (data + HTTP_HDR_MIN_LEN > data_end) {
+ bpf_trace_printk("No HTTP Header in TCP segment");
+ return 0;
+ }
+ if (strncmp((char*)data, "HTTP", 4)) {
+ if (strncmp((char*)data, "GET", 3)) {
+ if (strncmp((char*)data, "POST", 4)) {
+ if (strncmp((char*)data, "PUT", 3)) {
+ if (strncmp((char*)data, "HEAD", 4)) {
+ is_http = 0;
+ }
+ }
+ }
+ }
+ }
+ return is_http;
+}
+
+static inline void fill_up_sess_key(session_key_t *key, u32 src_ip,
+ u32 dst_ip, u16 src_port, u16 dst_port)
+{
+ key->src_ip = src_ip;
+ key->dst_ip = dst_ip;
+ key->src_port = src_port;
+ key->dst_port = dst_port;
+}
+
+static inline int process_response(u32 src_ip, u32 dst_ip, u16 src_port,
+ u16 dst_port)
+{
+ session_key_t sess_key = {};
+ session_t *session_ptr = NULL;
+ fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+ session_ptr = sessions.lookup(&sess_key);
+ if (session_ptr != NULL) {
+ u64 resp_time = bpf_ktime_get_ns();
+ session_t update_session = {
+ session_ptr->req_time,
+ resp_time
+ };
+ sessions.update(&sess_key, &update_session);
+ return 1;
+ }
+ return 0;
+}
+
+static inline void process_request(u32 src_ip, u32 dst_ip, u16 src_port,
+ u16 dst_port)
+{
+ session_key_t sess_key = {};
+ session_t *session_ptr = NULL;
+ session_t new_session = {
+ bpf_ktime_get_ns(),
+ 0
+ };
+ fill_up_sess_key(&sess_key, src_ip, dst_ip, src_port, dst_port);
+ session_ptr = sessions.lookup(&sess_key);
+ if (! session_ptr) {
+ sessions.insert(&sess_key, &new_session);
+ }
+ /*
+ if (session_ptr != NULL) {
+ sessions.update(&sess_key, &new_session);
+ } else {
+ sessions.insert(&sess_key, &new_session);
+ }
+ */
+}
+
+static inline app_proto_t ingress_tcp_parsing(struct tcphdr *tcp_hdr,
+ struct iphdr *ipv4_hdr,
+ void *data_end)
+{
+ unsigned short dest_port = htons(tcp_hdr->dest);
+ egress_match_t egress_match = {};
+ policy_action_t *policy_ptr = NULL;
+ app_proto_t ret = TCP;
+
+ unsigned int *proto = dports2proto.lookup(&dest_port);
+ if (proto != NULL) {
+ /*
+ if (tcp_hdr->syn && !tcp_hdr->ack) {
+ return ret;
+ }
+ */
+ ret = HTTP;
+ if (tcp_hdr->fin || tcp_hdr->rst) {
+ process_response(ntohl(ipv4_hdr->saddr),
+ ntohl(ipv4_hdr->daddr),
+ ntohs(tcp_hdr->source),
+ ntohs(tcp_hdr->dest));
+ } else {
+ process_request(ntohl(ipv4_hdr->saddr),
+ ntohl(ipv4_hdr->daddr),
+ ntohs(tcp_hdr->source),
+ ntohs(tcp_hdr->dest));
+ }
+ } else {
+ dest_port = htons(tcp_hdr->source);
+ proto = dports2proto.lookup(&dest_port);
+ if (proto != NULL) {
+ // clock response receiving time
+ process_response(ntohl(ipv4_hdr->daddr),
+ ntohl(ipv4_hdr->saddr),
+ ntohs(tcp_hdr->dest),
+ ntohs(tcp_hdr->source));
+ }
+ egress_match.dst_ip = ntohl(ipv4_hdr->saddr);
+ egress_match.dst_port = ntohs(tcp_hdr->source);
+ policy_ptr = egress_lookup_table.lookup(&egress_match);
+ if (policy_ptr == NULL) {
+ egress_match.dst_ip = 0;
+ policy_ptr = egress_lookup_table.lookup(&egress_match);
+ }
+
+ if (policy_ptr != NULL) {
+ if (*policy_ptr == RECORD) {
+ ret = HTTP;
+ if (tcp_hdr->fin || tcp_hdr->rst) {
+ process_response(ntohl(ipv4_hdr->daddr),
+ ntohl(ipv4_hdr->saddr),
+ ntohs(tcp_hdr->dest),
+ ntohs(tcp_hdr->source));
+ }
+ }
+ }
+ }
+
+ // everything else drops to TCP
+ //return ((void*)tcp_hdr);
+ return ret;
+}
+
+static inline app_proto_t egress_tcp_parsing(struct tcphdr *tcp_hdr,
+ struct iphdr *ipv4_hdr,
+ void *data_end)
+{
+ unsigned short src_port = htons(tcp_hdr->source);
+ app_proto_t ret = TCP;
+ egress_match_t egress_match = {};
+ policy_action_t *policy_ptr = NULL;
+
+ unsigned int *proto = dports2proto.lookup(&src_port);
+
+ if (proto != NULL) {
+ //if (tcp_hdr->fin || tcp_hdr->rst) {
+ process_response(ntohl(ipv4_hdr->daddr),
+ ntohl(ipv4_hdr->saddr),
+ ntohs(tcp_hdr->dest),
+ ntohs(tcp_hdr->source));
+ //}
+ ret = HTTP;
+ } else {
+
+ egress_match.dst_ip = ntohl(ipv4_hdr->daddr);
+ egress_match.dst_port = ntohs(tcp_hdr->dest);
+ policy_ptr = egress_lookup_table.lookup(&egress_match);
+ if (policy_ptr == NULL) {
+ egress_match.dst_ip = 0;
+ policy_ptr = egress_lookup_table.lookup(&egress_match);
+ }
+
+ if (policy_ptr != NULL) {
+ if (*policy_ptr == RECORD) {
+ process_request(ntohl(ipv4_hdr->saddr),
+ ntohl(ipv4_hdr->daddr),
+ ntohs(tcp_hdr->source),
+ ntohs(tcp_hdr->dest));
+ ret = HTTP;
+ }
+ }
+ }
+ //return(ret_hdr);
+ return ret;
+}
+
+static inline int handle_packet(struct __sk_buff *skb, int is_ingress)
+{
+ void *data = (void *)(long)skb->data;
+ void *data_end = (void *)(long)skb->data_end;
+ struct eth_hdr *eth = data;
+ struct iphdr *ipv4_hdr = data + sizeof(*eth);
+ struct tcphdr *tcp_hdr = data + sizeof(*eth) + sizeof(*ipv4_hdr);
+ app_proto_t proto = TCP;
+
+ /* TODO(s3wong): assuming TCP only for now */
+ /* single length check */
+ if (data + sizeof(*eth) + sizeof(*ipv4_hdr) + sizeof(*tcp_hdr) > data_end)
+ return TC_ACT_OK;
+
+ if (eth->h_proto != htons(ETH_P_IP))
+ return TC_ACT_OK;
+
+ // TODO(s3wong): no support for IP options
+ if (ipv4_hdr->protocol != IPPROTO_TCP || ipv4_hdr->ihl != 5)
+ return TC_ACT_OK;
+
+ if (is_ingress == 1) {
+ proto = ingress_tcp_parsing(tcp_hdr, ipv4_hdr, data_end);
+ } else{
+ proto = egress_tcp_parsing(tcp_hdr, ipv4_hdr, data_end);
+ }
+
+ if (proto == HTTP) {
+ int offset = is_ingress;
+ skb_events.perf_submit_skb(skb, skb->len, &offset, sizeof(offset));
+ }
+
+ return TC_ACT_OK;
+}
+
+int handle_ingress(struct __sk_buff *skb)
+{
+ return handle_packet(skb, 1);
+}
+
+int handle_egress(struct __sk_buff *skb)
+{
+ return handle_packet(skb, 0);
+}
diff --git a/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml b/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml
new file mode 100644
index 0000000..1260ef5
--- /dev/null
+++ b/clover/clovisor/libclovisor/jaeger-all-in-one-template.yml
@@ -0,0 +1,151 @@
+#
+# Copyright 2017-2018 The Jaeger Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+# in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+#
+
+apiVersion: v1
+kind: List
+items:
+- apiVersion: extensions/v1beta1
+ kind: Deployment
+ metadata:
+ name: jaeger-deployment
+ namespace: clovisor
+ labels:
+ app: jaeger
+ jaeger-infra: jaeger-deployment
+ spec:
+ replicas: 1
+ strategy:
+ type: Recreate
+ template:
+ metadata:
+ labels:
+ app: jaeger
+ jaeger-infra: jaeger-pod
+ annotations:
+ prometheus.io/scrape: "true"
+ prometheus.io/port: "16686"
+ spec:
+ containers:
+ - env:
+ - name: COLLECTOR_ZIPKIN_HTTP_PORT
+ value: "9411"
+ image: jaegertracing/all-in-one
+ name: jaeger
+ ports:
+ - containerPort: 5775
+ protocol: UDP
+ - containerPort: 6831
+ protocol: UDP
+ - containerPort: 6832
+ protocol: UDP
+ - containerPort: 5778
+ protocol: TCP
+ - containerPort: 16686
+ protocol: TCP
+ - containerPort: 9411
+ protocol: TCP
+ readinessProbe:
+ httpGet:
+ path: "/"
+ port: 14269
+ initialDelaySeconds: 5
+- apiVersion: v1
+ kind: Service
+ metadata:
+ name: jaeger-query
+ namespace: clovisor
+ labels:
+ app: jaeger
+ jaeger-infra: jaeger-service
+ spec:
+ ports:
+ - name: query-http
+ port: 80
+ protocol: TCP
+ targetPort: 16686
+ selector:
+ jaeger-infra: jaeger-pod
+ type: LoadBalancer
+- apiVersion: v1
+ kind: Service
+ metadata:
+ name: jaeger-collector
+ namespace: clovisor
+ labels:
+ app: jaeger
+ jaeger-infra: collector-service
+ spec:
+ ports:
+ - name: jaeger-collector-tchannel
+ port: 14267
+ protocol: TCP
+ targetPort: 14267
+ - name: jaeger-collector-http
+ port: 14268
+ protocol: TCP
+ targetPort: 14268
+ - name: jaeger-collector-zipkin
+ port: 9411
+ protocol: TCP
+ targetPort: 9411
+ selector:
+ jaeger-infra: jaeger-pod
+ type: ClusterIP
+- apiVersion: v1
+ kind: Service
+ metadata:
+ name: jaeger-agent
+ namespace: clovisor
+ labels:
+ app: jaeger
+ jaeger-infra: agent-service
+ spec:
+ ports:
+ - name: agent-zipkin-thrift
+ port: 5775
+ protocol: UDP
+ targetPort: 5775
+ - name: agent-compact
+ port: 6831
+ protocol: UDP
+ targetPort: 6831
+ - name: agent-binary
+ port: 6832
+ protocol: UDP
+ targetPort: 6832
+ - name: agent-configs
+ port: 5778
+ protocol: TCP
+ targetPort: 5778
+ clusterIP: None
+ selector:
+ jaeger-infra: jaeger-pod
+- apiVersion: v1
+ kind: Service
+ metadata:
+ name: zipkin
+ namespace: clovisor
+ labels:
+ app: jaeger
+ jaeger-infra: zipkin-service
+ spec:
+ ports:
+ - name: jaeger-collector-zipkin
+ port: 9411
+ protocol: TCP
+ targetPort: 9411
+ clusterIP: None
+ selector:
+ jaeger-infra: jaeger-pod
+
diff --git a/clover/clovisor/libclovisor/libproto/build-plugin b/clover/clovisor/libclovisor/libproto/build-plugin
new file mode 100755
index 0000000..743071b
--- /dev/null
+++ b/clover/clovisor/libclovisor/libproto/build-plugin
@@ -0,0 +1,9 @@
+#!/bin/bash
+#
+# Copyright (c) Authors of Clover
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+go build --buildmode=plugin -o ../../proto/http.so clovisor_http.go
diff --git a/clover/clovisor/libclovisor/libproto/clovisor_http.go b/clover/clovisor/libclovisor/libproto/clovisor_http.go
new file mode 100644
index 0000000..6440d5a
--- /dev/null
+++ b/clover/clovisor/libclovisor/libproto/clovisor_http.go
@@ -0,0 +1,84 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+)
+
+type httpparser string
+
+func (p httpparser) Parse(session_key string,
+ is_req bool,
+ data []byte) ([]byte, map[string]string) {
+ map_val := make(map[string]string)
+ reader := bytes.NewReader(data)
+ buf := bufio.NewReader(reader)
+ if is_req == true {
+ req, err := http.ReadRequest(buf)
+ if err != nil {
+ fmt.Printf("Request error: ")
+ fmt.Println(err)
+ return nil, nil
+ } else if req == nil {
+ fmt.Println("request is nil")
+ return nil, nil
+ } else {
+ fmt.Printf("HTTP Request Method %s url %v proto %v\n",
+ req.Method, req.URL, req.Proto)
+ map_val["reqmethod"] = req.Method
+ map_val["requrl"] = fmt.Sprintf("%v", req.URL)
+ map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
+ if user_agent := req.UserAgent(); len(user_agent) > 0 {
+ map_val["useragent"] = user_agent
+ }
+ if len(req.Host) > 0 {
+ map_val["host"] = req.Host
+ }
+ header := req.Header
+ if req_id := header.Get("X-Request-Id"); len(req_id) > 0 {
+ map_val["requestid"] = req_id
+ }
+ if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 {
+ map_val["envoydecorator"] = envoy_dec
+ }
+ if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 {
+ map_val["traceid"] = trace_id
+ }
+ body, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+ }
+ return body, map_val
+ }
+ } else {
+ // response
+ resp, err := http.ReadResponse(buf, nil)
+ if err != nil {
+ fmt.Printf("Response error: ")
+ fmt.Println(err)
+ return nil, nil
+ }
+ fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
+ resp.Status, resp.StatusCode, resp.Proto)
+ map_val["respstatus"] = resp.Status
+ map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
+ map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+ }
+ return body, map_val
+ }
+}
+
+var Parser httpparser
diff --git a/clover/clovisor/libclovisor/libproto/http_alt.go b/clover/clovisor/libclovisor/libproto/http_alt.go
new file mode 100644
index 0000000..f7581fb
--- /dev/null
+++ b/clover/clovisor/libclovisor/libproto/http_alt.go
@@ -0,0 +1,93 @@
+// Copyright (c) Authors of Clover
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Apache License, Version 2.0
+// which accompanies this distribution, and is available at
+// http://www.apache.org/licenses/LICENSE-2.0
+
+package main
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+)
+
+type httpparser string
+
+func (p httpparser) Parse(session_key string,
+ is_req bool,
+ data []byte) ([]byte, map[string]string) {
+ map_val := make(map[string]string)
+ reader := bytes.NewReader(data)
+ buf := bufio.NewReader(reader)
+ if is_req == true {
+ req, err := http.ReadRequest(buf)
+ if err != nil {
+ fmt.Printf("Request error: ")
+ fmt.Println(err)
+ return nil, nil
+ } else if req == nil {
+ fmt.Println("request is nil")
+ return nil, nil
+ } else {
+ fmt.Printf("HTTP Request Method %s url %v proto %v\n",
+ req.Method, req.URL, req.Proto)
+ map_val["reqmethod"] = req.Method
+ map_val["requrl"] = fmt.Sprintf("%v", req.URL)
+ map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
+ if user_agent := req.UserAgent(); len(user_agent) > 0 {
+ map_val["useragent"] = user_agent
+ }
+ if len(req.Host) > 0 {
+ map_val["host"] = req.Host
+ }
+ header := req.Header
+ if req_id := header.Get("X-Request-Id"); len(req_id) > 0 {
+ map_val["requestid"] = req_id
+ }
+ if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 {
+ map_val["envoydecorator"] = envoy_dec
+ }
+ if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 {
+ map_val["traceid"] = trace_id
+ }
+ body, err := ioutil.ReadAll(req.Body)
+ if err != nil {
+ fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+ }
+ return body, map_val
+ }
+ } else {
+ // response
+ resp, err := http.ReadResponse(buf, nil)
+ if err != nil {
+ fmt.Printf("Response error: ")
+ fmt.Println(err)
+ return nil, nil
+ }
+ fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
+ resp.Status, resp.StatusCode, resp.Proto)
+ map_val["respstatus"] = resp.Status
+ map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
+ map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
+ header := resp.Header
+ //fmt.Printf("Response Header contains %v\n", header)
+ if contentType := header.Get("Content-Type"); len(contentType) > 0 {
+ map_val["content-type"] = contentType
+ }
+ if lastMod := header.Get("Last-Modified"); len(lastMod) > 0 {
+ map_val["last-modified"] = lastMod
+ }
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Printf("Error reading HTTP Request body %v\n", err.Error())
+ }
+ return body, map_val
+ }
+}
+
+var Parser httpparser
diff --git a/clover/clovisor/libclovisor/mongo.yaml b/clover/clovisor/libclovisor/mongo.yaml
new file mode 100644
index 0000000..5dea26e
--- /dev/null
+++ b/clover/clovisor/libclovisor/mongo.yaml
@@ -0,0 +1,41 @@
+apiVersion: v1
+kind: ReplicationController
+metadata:
+ labels:
+ name: mongo
+ name: mongo-controller
+ namespace: clovisor
+spec:
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ name: mongo
+ spec:
+ containers:
+ - image: mongo
+ name: mongo
+ ports:
+ - name: mongo
+ containerPort: 27017
+ hostPort: 27017
+ volumeMounts:
+ - name: mongo-storage
+ mountPath: /data/db
+ volumes:
+ - name: mongo-storage
+ emptyDir: {}
+---
+apiVersion: v1
+kind: Service
+metadata:
+ labels:
+ name: mongo
+ name: mongo
+ namespace: clovisor
+spec:
+ ports:
+ - port: 27017
+ targetPort: 27017
+ selector:
+ name: mongo
diff --git a/clover/clovisor/libclovisor/redis.yaml b/clover/clovisor/libclovisor/redis.yaml
new file mode 100644
index 0000000..43204ff
--- /dev/null
+++ b/clover/clovisor/libclovisor/redis.yaml
@@ -0,0 +1,53 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: clovisor
+ labels:
+ name: clovisor
+---
+apiVersion: v1
+kind: Pod
+metadata:
+ labels:
+ name: redis
+ redis-sentinel: "true"
+ role: master
+ name: redis
+ namespace: clovisor
+spec:
+ containers:
+ - name: redis
+ image: k8s.gcr.io/redis:v1
+ env:
+ - name: MASTER
+ value: "true"
+ ports:
+ - containerPort: 6379
+ resources:
+ limits:
+ cpu: "0.1"
+ volumeMounts:
+ - mountPath: /redis-master-data
+ name: data
+ - name: sentinel
+ image: kubernetes/redis:v1
+ env:
+ - name: SENTINEL
+ value: "true"
+ ports:
+ - containerPort: 26379
+ volumes:
+ - name: data
+ emptyDir: {}
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: redis
+ namespace: clovisor
+spec:
+ ports:
+ - port: 6379
+ selector:
+ name: redis
+---