summaryrefslogtreecommitdiffstats
path: root/clover/clovisor/libclovisor/clovisor_bcc.go
diff options
context:
space:
mode:
Diffstat (limited to 'clover/clovisor/libclovisor/clovisor_bcc.go')
-rw-r--r--clover/clovisor/libclovisor/clovisor_bcc.go687
1 files changed, 497 insertions, 190 deletions
diff --git a/clover/clovisor/libclovisor/clovisor_bcc.go b/clover/clovisor/libclovisor/clovisor_bcc.go
index ab5bc33..a6c74ef 100644
--- a/clover/clovisor/libclovisor/clovisor_bcc.go
+++ b/clover/clovisor/libclovisor/clovisor_bcc.go
@@ -9,15 +9,20 @@ package clovisor
import (
"encoding/hex"
- "bufio"
+ //"encoding/json"
"bytes"
"encoding/binary"
+ "errors"
"fmt"
"io/ioutil"
- "net/http"
+ "net"
+ //"net/http"
+ "plugin"
"strconv"
+ "strings"
"time"
+ //"github.com/go-redis/redis"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/iovisor/gobpf/bcc"
@@ -30,11 +35,15 @@ import (
/*
#cgo CFLAGS: -I/usr/include/bcc/compat
#cgo LDFLAGS: -lbcc
-#include <bcc/bpf_common.h>
+#include <bcc/bcc_common.h>
#include <bcc/libbpf.h>
*/
import "C"
+type Parser interface {
+ Parse(session_key string, is_req bool, data []byte)([]byte, map[string]string)
+}
+
type ClovisorBCC struct {
stopChan chan bool
// TODO(s3wong): remove once k8s watcher available
@@ -65,8 +74,12 @@ type egress_match_cfg struct {
}
type session_info_t struct {
- session map[string]string
- buf []byte
+ done bool
+ service string
+ generalInfo map[string]string
+ traces []map[string]string
+ reqBuf []byte
+ respBuf []byte
}
const (
@@ -77,7 +90,12 @@ const (
)
//var sessionMap map[string]map[string]string;
-var sessionMap map[string]session_info_t;
+var sessionMap map[string]*session_info_t;
+var protocolParser = map[string]Parser{};
+var defaultModPath = map[string]string{
+ "http": "/proto/http.so",
+}
+var tracerMap = map[string]opentracing.Tracer{};
var veth_ifidx_command = "cat /sys/class/net/eth0/iflink";
@@ -88,6 +106,20 @@ var protocolMap = map[string]int{
"udp": 4,
}
+var traceTable string = "NetworkTraces"
+
+/*
+ * redisConnect: redis client connecting to redis server
+func redisConnect() *redis.Client {
+ client := redis.NewClient(&redis.Options{
+ Addr: fmt.Sprintf("%s:6379", redisServer),
+ Password: "",
+ DB: 0,
+ })
+ return client
+}
+ */
+
func linkSetup(ifname string) netlink.Link {
link, err := netlink.LinkByName(ifname)
netlink.LinkSetUp(link)
@@ -114,43 +146,117 @@ func dumpBPFTable(table *bcc.Table) {
}
}
-func print_network_traces(tracer opentracing.Tracer) {
- for key, sess_info := range sessionMap {
- value := sess_info.session
- if _, ok := value["done"]; ok {
- span := tracer.StartSpan("http-tracing")
- span.SetTag("Node-Name", value["nodename"])
- span.SetTag("Pod-Name", value["podname"])
- span.SetTag("Source-IP", value["srcip"])
- span.SetTag("Destination-IP", value["dstip"])
- span.SetTag("Source-Port", value["srcport"])
- span.SetTag("Destination-Port", value["dstport"])
- span.SetTag("HTTP Request Method", value["reqmethod"])
- span.SetTag("HTTP Request URL", value["requrl"])
- span.SetTag("HTTP Request Protocol", value["reqproto"])
- if _, exist := value["host"]; exist {
- span.SetTag("HTTP Request Host", value["host"])
- }
- if _, exist := value["useragent"]; exist {
- span.SetTag("HTTP Client User Agent", value["useragent"])
+func loadProtoParser(protocol string, update bool) error {
+ var modPath = ""
+
+ if !update {
+ if _, ok := protocolParser[protocol]; ok {
+ fmt.Printf("Found parse function for protocol %s\n", protocol)
+ return nil
+ }
+ }
+
+ client := redisConnect()
+
+ redisResult := client.HGet(ProtoCfg, protocol)
+ if redisResult.Err() == nil {
+ if len(redisResult.Val()) > 0 {
+ modPath = redisResult.Val()
+ }
+ }
+ if len(modPath) == 0 {
+ if _, ok := defaultModPath[protocol]; ok {
+ modPath = defaultModPath[protocol]
+ } else {
+ return errors.New(fmt.Sprintf("Unable to find module path for protocol %s", protocol))
+ }
+ }
+
+ fmt.Printf("Loading plugin for protocol %v with %v\n", protocol, modPath)
+
+ plug, err := plugin.Open(modPath)
+ if err != nil {
+ fmt.Println(err)
+ return err
+ }
+
+ symParse, err := plug.Lookup("Parser")
+ if err != nil {
+ fmt.Println(err)
+ return err
+ }
+
+ var parser Parser
+ parser, ok := symParse.(Parser)
+ if !ok {
+ fmt.Printf("Unexpected type from mod %s symbol parse\n", modPath)
+ return errors.New(fmt.Sprintf("Wrong type for func parse from %s", modPath))
+ }
+
+ protocolParser[protocol] = parser
+ return nil
+}
+
+func print_network_traces() {
+ /*
+ client := redisConnect()
+
+ traces, err := client.HGetAll(traceTable).Result()
+ if err != nil {
+ fmt.Printf("Error retriving traces from redis: %v\n", err.Error())
+ return
+ }
+ */
+ /*
+ structure:
+ "done": "true",
+ "traces": array of protocol traces
+ [0] : "admin": map[string]string
+ [1] : "ipv4 or ipv6": map[string]string
+ [2] : "tcp or udp": map[string]string
+ [3] : "http"...
+ */
+ /*
+ for key, value := range traces {
+ traceMap := map[string]interface{}
+ json.Unmarshal([]byte(value), &traceMap)
+ if _, ok := traceMap["done"]; ok {
+ span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key))
+ for idx, trace := range traceMap["traces"] {
+ span.SetTag(fmt.Sprintf("protocol-%d", idx), trace['protocol'])
+ for tag, tagVal := range trace {
+ if tag == "protocol" {
+ continue
+ }
+ span.SetTag(tag, tagVal)
+ }
}
- if _, exist := value["requestid"]; exist {
- span.SetTag("OpenTracing Request ID", value["requestid"])
+ span.Finish()
+ ret := client.HDel(traceTable, key)
+ if ret.Err() != nil {
+ fmt.Printf("Error deleting %v from %v: %v\n", key, traceTable, ret.Err())
}
- if _, exist := value["envoydecorator"]; exist {
- span.SetTag("Envoy Decorator", value["envoydecorator"])
+ }
+ }
+ */
+ for key, value := range sessionMap {
+ if value.done {
+ tracer := tracerMap[value.service]
+ span := tracer.StartSpan(fmt.Sprintf("tracing-%s", key))
+ for genTag, genVal := range value.generalInfo {
+ fmt.Printf("general info writing %v: %v\n", genTag, genVal)
+ span.SetTag(genTag, genVal)
}
- if _, exist := value["traceid"]; exist {
- span.SetTag("Trace ID", value["traceid"])
+ for idx, trace := range value.traces {
+ span.SetTag(fmt.Sprintf("protocol-%d", idx), trace["protocol"])
+ for tag, tagVal := range trace {
+ if tag == "protocol" {
+ continue
+ }
+ fmt.Printf("%v writing %v: %v\n", trace["protocol"], tag, tagVal)
+ span.SetTag(tag, tagVal)
+ }
}
- span.SetTag("HTTP Request Packet Count", value["reqpakcount"])
- span.SetTag("HTTP Request Byte Count", value["reqbytecount"])
- span.SetTag("HTTP Response Status", value["respstatus"])
- span.SetTag("HTTP Response Status Code", value["respstatuscode"])
- span.SetTag("HTTP Response Protocol", value["respproto"])
- span.SetTag("HTTP Response Packet Count", value["resppakcount"])
- span.SetTag("HTTP Response Byte Count", value["respbytecount"])
- span.SetTag("HTTP Session Duration", value["duration"])
span.Finish()
delete(sessionMap, key)
}
@@ -160,12 +266,15 @@ func print_network_traces(tracer opentracing.Tracer) {
func handle_skb_event(data *[]byte, node_name string, pod_name string,
session_table *bcc.Table,
monitoring_info *monitoring_info_t,
- egress_match_list []egress_match_t) (error) {
+ egress_match_list []egress_match_t,
+ svc_name string) (error) {
//fmt.Printf("monitoring info has %v\n", monitoring_info)
- fmt.Printf("\n\nnode[%s] pod[%s]\n%s", node_name, pod_name, hex.Dump(*data))
+ fmt.Printf("\n\nnode[%s] pod[%s]\n%s\n", node_name, pod_name, hex.Dump(*data))
+ var ipproto layers.IPProtocol
var src_ip, dst_ip uint32
var src_port, dst_port uint16
var session_key, src_ip_str, dst_ip_str string
+ is_req := false
proto := HTTP
is_ingress:= binary.LittleEndian.Uint32((*data)[0:4])
packet := gopacket.NewPacket((*data)[4:len(*data)],
@@ -175,6 +284,7 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
ipv4, _ := ipv4_layer.(*layers.IPv4)
src_ip_str = ipv4.SrcIP.String()
dst_ip_str = ipv4.DstIP.String()
+ ipproto = ipv4.Protocol
fmt.Printf("Source: %s Dest: %s\n", src_ip_str, dst_ip_str)
// Note: the src_ip and dst_ip var here are ONLY being used as
// lookup key to eBPF hash table, hence preserving network
@@ -210,79 +320,128 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
break
}
}
- app_layer := packet.ApplicationLayer()
- if app_layer == nil {
- fmt.Printf("No application layer, TCP packet\n")
- proto = TCP
- }
if dst_port == uint16(monitoring_info.port_num) || egress_port_req {
- session_key = fmt.Sprintf("%x.%x:%d:%d", src_ip, dst_ip, src_port,
- dst_port)
- if _, ok := sessionMap[session_key]; !ok {
- sessionMap[session_key] = session_info_t{}
- sess_map := sessionMap[session_key]
- sess_map.session = make(map[string]string)
- sess_map.buf = []byte{}
- sessionMap[session_key] = sess_map
- zero := strconv.Itoa(0)
- sessionMap[session_key].session["reqpakcount"] = zero
- sessionMap[session_key].session["reqbytecount"] = zero
- sessionMap[session_key].session["resppakcount"] = zero
- sessionMap[session_key].session["respbytecount"] = zero
+ is_req = true
+ }
+ if is_req {
+ session_key = fmt.Sprintf("%x:%x:%d:%d:%d", src_ip, dst_ip, ipproto,
+ src_port, dst_port)
+ } else {
+ session_key = fmt.Sprintf("%x:%x:%d:%d:%d", dst_ip, src_ip, ipproto,
+ dst_port, src_port)
+ }
+ var sess_map *session_info_t
+ if _, ok := sessionMap[session_key]; !ok {
+ sess_map = &session_info_t{}
+ sess_map.done = false
+ sess_map.service = svc_name
+ sess_map.generalInfo = make(map[string]string)
+ sess_map.traces = []map[string]string{}
+ sess_map.reqBuf = []byte{}
+ sess_map.respBuf = []byte{}
+ sessionMap[session_key] = sess_map
+ zero := strconv.Itoa(0)
+ sessionMap[session_key].generalInfo["reqpakcount"] = zero
+ sessionMap[session_key].generalInfo["reqbytecount"] = zero
+ sessionMap[session_key].generalInfo["resppakcount"] = zero
+ sessionMap[session_key].generalInfo["respbytecount"] = zero
+ sessionMap[session_key].generalInfo["nodename"] = node_name
+ sessionMap[session_key].generalInfo["podname"] = pod_name
+ } else {
+ sess_map = sessionMap[session_key]
+ }
+
+ curr_pak_count := 0
+ curr_byte_count := 0
+ map_val := sess_map.generalInfo
+ if is_req {
+ curr_pak_count, _ = strconv.Atoi(map_val["reqpakcount"])
+ curr_byte_count, _ = strconv.Atoi(map_val["reqbytecount"])
+ } else {
+ curr_pak_count, _ = strconv.Atoi(map_val["resppakcount"])
+ curr_byte_count, _ = strconv.Atoi(map_val["respbytecount"])
+ }
+ curr_pak_count++
+ curr_byte_count += len(packet.Data())
+ if is_req {
+ map_val["reqpakcount"] = strconv.Itoa(curr_pak_count)
+ map_val["reqbytecount"] = strconv.Itoa(curr_byte_count)
+ } else {
+ map_val["resppakcount"] = strconv.Itoa(curr_pak_count)
+ map_val["respbytecount"] = strconv.Itoa(curr_byte_count)
+ }
+
+ if is_req {
+ // TODO (s3wong): just do IPv4 and TCP without using the plugin for now
+ // the condition check itself is cheating also...
+ if len(sess_map.traces) <= 1 {
+ ipv4Map := make(map[string]string)
+ ipv4Map["protocol"] = "IPv4"
+ ipv4Map["srcip"] = src_ip_str
+ ipv4Map["dstip"] = dst_ip_str
+ sess_map.traces = append(sess_map.traces, ipv4Map)
+ tcpMap := make(map[string]string)
+ tcpMap["protocol"] = "TCP"
+ tcpMap["srcport"] = fmt.Sprintf("%d", src_port)
+ tcpMap["dstport"] = fmt.Sprintf("%d", dst_port)
+ sess_map.traces = append(sess_map.traces, tcpMap)
}
- map_val := sessionMap[session_key].session
- map_val["nodename"] = node_name
- map_val["podname"] = pod_name
- map_val["srcip"] = src_ip_str
- map_val["dstip"] = dst_ip_str
- map_val["srcport"] = fmt.Sprintf("%d", src_port)
- map_val["dstport"] = fmt.Sprintf("%d", dst_port)
- curr_pak_count, _ := strconv.Atoi(map_val["reqpakcount"])
- curr_byte_count, _ := strconv.Atoi(map_val["reqbytecount"])
- curr_pak_count++
- if proto == HTTP {
- curr_byte_count += len(app_layer.Payload())
- reader := bytes.NewReader(app_layer.Payload())
- buf := bufio.NewReader(reader)
- req, err := http.ReadRequest(buf)
- if err != nil {
- fmt.Printf("Request error: ")
- fmt.Println(err)
- } else if req == nil {
- fmt.Println("request is nil")
- } else {
- fmt.Printf("HTTP Request Method %s url %v proto %v\n",
- req.Method, req.URL, req.Proto)
- map_val["reqmethod"] = req.Method
- map_val["requrl"] = fmt.Sprintf("%v", req.URL)
- map_val["reqproto"] = fmt.Sprintf("%v", req.Proto)
- if user_agent := req.UserAgent(); len(user_agent) > 0 {
- map_val["useragent"] = user_agent
- }
- if len(req.Host) > 0 {
- map_val["host"] = req.Host
- }
- header := req.Header
- if req_id := header.Get("X-Request-Id"); len(req_id) > 0 {
- map_val["requestid"] = req_id
- }
- if envoy_dec := header.Get("X-Envoy-Decorator-Operation"); len(envoy_dec) > 0 {
- map_val["envoydecorator"] = envoy_dec
- }
- if trace_id := header.Get("X-B3-Traceid"); len(trace_id) > 0 {
- map_val["traceid"] = trace_id
- }
- if _, ok := map_val["respstatus"]; ok {
- map_val["done"] = "true"
+ }
+
+ var dataptr []byte
+ app_layer := packet.ApplicationLayer()
+ errStr := ""
+ if app_layer != nil {
+ if is_req {
+ dataptr = append(sess_map.reqBuf, app_layer.Payload()...)
+ } else {
+ dataptr = append(sess_map.respBuf, app_layer.Payload()...)
+ }
+ for _, protocol := range monitoring_info.protocols {
+ if _, ok := protocolParser[protocol]; ok {
+ parser := protocolParser[protocol]
+ new_dataptr, parseMap := parser.Parse(session_key, is_req,
+ dataptr)
+ if parseMap != nil {
+ protocolTag := strings.ToUpper(protocol)
+ merged := false
+ for _, existing := range sess_map.traces {
+ if existing["protocol"] == protocolTag {
+ for k, v := range parseMap {
+ existing[k] = v
+ }
+ merged = true
+ break
+ }
+ }
+ if !merged {
+ parseMap["protocol"] = strings.ToUpper(protocol)
+ sess_map.traces = append(sess_map.traces, parseMap)
+ }
+ dataptr = new_dataptr
+ } else {
+ // offset to packet is off, no need to continue
+ // parsing, return error
+ errStr = fmt.Sprintf("Error: unable to parse protocol %v", protocol)
+ fmt.Println(errStr)
+ //return errors.New(errStr)
+ break
}
}
+ }
+ } else {
+ fmt.Printf("No application layer, TCP packet\n")
+ return nil
+ }
+
+ if len(errStr) > 0 {
+ // buffer
+ if is_req {
+ sess_map.reqBuf = append([]byte(nil), dataptr...)
} else {
- // TODO(s3wong): TCP assumed for now
- curr_byte_count += (len(*data) - 4)
+ sess_map.respBuf = append([]byte(nil), dataptr...)
}
- map_val["reqpakcount"] = strconv.Itoa(curr_pak_count)
- map_val["reqbytecount"] = strconv.Itoa(curr_byte_count)
- fmt.Printf("Current session packet count %v and byte count %v\n", map_val["reqpakcount"], map_val["reqbytecount"])
+ //sessionMap[session_key] = sess_map
} else {
session_key := session_key_t {
src_ip: dst_ip,
@@ -304,7 +463,7 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
var duration uint64 = 0
leaf_buf := bytes.NewBuffer(leaf)
if leaf_buf == nil {
- fmt.Println("Error: leaf is nil")
+ fmt.Println("Error: unable to allocate new byte buffer")
return nil
}
session := session_t{}
@@ -314,89 +473,31 @@ func handle_skb_event(data *[]byte, node_name string, pod_name string,
return nil
}
if session.Resp_time == 0 {
- fmt.Printf("session response time not set?\n")
+ fmt.Printf("session response time not set yet\n")
} else {
duration = (session.Resp_time - session.Req_time)/1000
- fmt.Printf("Leaf %v\n", leaf)
+ fmt.Printf("session time : %v\n", session)
fmt.Printf("Duration: %d usec\n", duration)
}
- sess_key := fmt.Sprintf("%x.%x:%d:%d", dst_ip, src_ip,
- dst_port, src_port)
- if _, ok := sessionMap[sess_key]; !ok {
- //sessionMap[sess_key] = make(map[string]string)
- sessionMap[sess_key] = session_info_t{}
- sess_map := sessionMap[sess_key]
- sess_map.session = make(map[string]string)
- sess_map.buf = []byte{}
- sessionMap[sess_key] = sess_map
- zero := strconv.Itoa(0)
- sessionMap[sess_key].session["reqpakcount"] = zero
- sessionMap[sess_key].session["reqbytecount"] = zero
- sessionMap[sess_key].session["resppakcount"] = zero
- sessionMap[sess_key].session["respbytecount"] = zero
- }
- var map_val = sessionMap[sess_key].session
- map_val["nodename"] = node_name
- map_val["podname"] = pod_name
- map_val["srcip"] = dst_ip_str
- map_val["dstip"] = src_ip_str
- map_val["srcport"] = fmt.Sprintf("%d", dst_port)
- map_val["dstport"] = fmt.Sprintf("%d", src_port)
map_val["duration"] = fmt.Sprintf("%v usec", duration)
- curr_pak_count, _ := strconv.Atoi(map_val["resppakcount"])
- curr_byte_count, _ := strconv.Atoi(map_val["respbytecount"])
- curr_pak_count++
-
- if proto == HTTP {
- curr_byte_count += len(app_layer.Payload())
- reader := bytes.NewReader(app_layer.Payload())
- buf := bufio.NewReader(reader)
- resp, err := http.ReadResponse(buf, nil)
- read_http := true
- if err != nil {
- fmt.Printf("Response error: ")
- fmt.Println(err)
- sess_map := sessionMap[sess_key]
- sess_map.buf = append(sess_map.buf, app_layer.Payload()...)
- reader = bytes.NewReader(sess_map.buf)
- buf = bufio.NewReader(reader)
- resp, err = http.ReadResponse(buf, nil)
- if err != nil || resp == nil {
- if err != nil {
- fmt.Printf("Response error: %v\n", err)
- }
- read_http = false
- }
- sessionMap[sess_key] = sess_map
- } else if resp == nil {
- fmt.Println("response is nil")
- read_http = false
- }
- if read_http {
- fmt.Printf("HTTP Response Status %v code %v Proto %v\n",
- resp.Status, resp.StatusCode, resp.Proto)
- map_val["respstatus"] = resp.Status
- map_val["respstatuscode"] = fmt.Sprintf("%v", resp.StatusCode)
- map_val["respproto"] = fmt.Sprintf("%v", resp.Proto)
- //map_val["duration"] = fmt.Sprintf("%v usec", duration)
- /*
- if _, ok := map_val["reqmethod"]; ok {
- map_val["done"] = "true"
- }
- */
- }
- if resp != nil {
- resp.Body.Close()
- }
+
+ node, node_session, err := getNodeIntfSession(session_key)
+ if err == nil {
+ map_val["node-interface"] = node
+ map_val["node-request-ts"] = fmt.Sprintf("%v", node_session.Req_time)
+ map_val["node-response-ts"] = fmt.Sprintf("%v", node_session.Resp_time)
+ delNodeIntfSession(node, key)
} else {
- // TODO(s3wong): TCP assumed for now
- curr_byte_count += (len(*data) - 4)
+ fmt.Printf("Session not found in any node interface... posssibly local?")
}
- map_val["resppakcount"] = strconv.Itoa(curr_pak_count)
- map_val["respbytecount"] = strconv.Itoa(curr_byte_count)
- fmt.Printf("Current session packet count %v and byte count %v\n", map_val["resppakcount"], map_val["respbytecount"])
+
if duration > 0 {
- map_val["done"] = "true"
+ sess_map.done = true
+ err := session_table.Delete(key)
+ if err != nil {
+ fmt.Printf("Error deleting key %v: %v\n", key, err)
+ return err
+ }
}
}
}
@@ -441,6 +542,207 @@ func setEgressTable(egress_table *bcc.Table,
return nil
}
+var nodeintfFilterList = [...]string {"lo", "veth", "docker", "flannel"}
+
+func filterNodeIntf(intf string) bool {
+ for _, substring := range nodeintfFilterList {
+ if strings.Contains(intf, substring) {
+ return false
+ }
+ }
+ return true
+}
+
+type nodeIntf struct {
+ bpfMod *bcc.Module
+ ipTrackTable *bcc.Table
+ sessionTable *bcc.Table
+}
+
+var nodeIntfMap = map[string]*nodeIntf{}
+
+func setupNodeIntf(ifindex int) (*nodeIntf, error) {
+ buf, err := ioutil.ReadFile("libclovisor/ebpf/node_interface.c")
+ if err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+ code := string(buf)
+
+ bpf_mod := bcc.NewModule(code, []string{})
+
+ ingress_fn, err := bpf_mod.Load("handle_ingress",
+ C.BPF_PROG_TYPE_SCHED_CLS,
+ 1, 65536)
+ if err != nil {
+ fmt.Printf("Failed to load node interface ingress func: %v\n", err)
+ return nil, err
+ }
+
+ egress_fn, err := bpf_mod.Load("handle_egress",
+ C.BPF_PROG_TYPE_SCHED_CLS,
+ 1, 65536)
+ if err != nil {
+ fmt.Printf("Failed to load node interface egress func: %v\n", err)
+ return nil, err
+ }
+
+ ip_track_table := bcc.NewTable(bpf_mod.TableId("ip2track"), bpf_mod)
+ node_sess_table := bcc.NewTable(bpf_mod.TableId("node_sessions"), bpf_mod)
+
+ // check if qdisc clsact filter for this interface already exists
+ link, err := netlink.LinkByIndex(ifindex)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ qdiscs, err := netlink.QdiscList(link)
+ if err == nil {
+ for _, qdisc_ := range qdiscs {
+ if qdisc_.Type() == "clsact" {
+ netlink.QdiscDel(qdisc_)
+ break
+ }
+ }
+ }
+ }
+
+ attrs := netlink.QdiscAttrs {
+ LinkIndex: ifindex,
+ Handle: netlink.MakeHandle(0xffff, 0),
+ Parent: netlink.HANDLE_CLSACT,
+ }
+
+ qdisc := &netlink.GenericQdisc {
+ QdiscAttrs: attrs,
+ QdiscType: "clsact",
+ }
+
+ if err := netlink.QdiscAdd(qdisc); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ ingress_filter_attrs := netlink.FilterAttrs{
+ LinkIndex: ifindex,
+ Parent: netlink.MakeHandle(0xffff, 0xfff3),
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ }
+ ingress_filter := &netlink.BpfFilter{
+ FilterAttrs: ingress_filter_attrs,
+ Fd: ingress_fn,
+ Name: "handle_ingress",
+ DirectAction: true,
+ }
+ if ingress_filter.Fd < 0 {
+ fmt.Println("Failed to load node interface ingress bpf program")
+ return nil, err
+ }
+
+ if err := netlink.FilterAdd(ingress_filter); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ egress_filter_attrs := netlink.FilterAttrs{
+ LinkIndex: ifindex,
+ Parent: netlink.MakeHandle(0xffff, 0xfff2),
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ }
+ egress_filter := &netlink.BpfFilter{
+ FilterAttrs: egress_filter_attrs,
+ Fd: egress_fn,
+ Name: "handle_egress",
+ DirectAction: true,
+ }
+ if egress_filter.Fd < 0 {
+ fmt.Println("Failed to load node interface egress bpf program")
+ return nil, err
+ }
+
+ if err := netlink.FilterAdd(egress_filter); err != nil {
+ fmt.Println(err)
+ return nil, err
+ }
+
+ return &nodeIntf{
+ bpfMod: bpf_mod,
+ ipTrackTable: ip_track_table,
+ sessionTable: node_sess_table,
+ }, nil
+}
+
+func ClovisorPhyInfSetup() error {
+ intfList, err := net.Interfaces()
+ if err != nil {
+ fmt.Printf("Failed to get node interfaces: %v\n", err)
+ return err
+ }
+
+ for _, f := range intfList {
+ if !filterNodeIntf(f.Name) {
+ continue
+ }
+ fmt.Printf("Tracking node interface %v w/ index %v\n", f.Name, f.Index)
+ bpf_node_intf, err := setupNodeIntf(f.Index)
+ if err != nil {
+ fmt.Printf("Failed to set up node interface %v: %v\n", f.Name, err)
+ return err
+ }
+ nodeIntfMap[f.Name] = bpf_node_intf
+ }
+ return nil
+}
+
+func setIPTrackingTable(table *bcc.Table, ipaddr uint32, action int) error {
+ key, _ := table.KeyStrToBytes(strconv.Itoa(int(ipaddr)))
+ leaf, _ := table.LeafStrToBytes(strconv.Itoa(action))
+ if err := table.Set(key, leaf); err != nil {
+ fmt.Printf("Failed to set IP tracking table: %v\n", err)
+ return err
+ }
+ dumpBPFTable(table)
+ return nil
+}
+
+func setNodeIntfTrackingIP(ipaddr uint32) {
+ for name, node_intf := range nodeIntfMap {
+ err := setIPTrackingTable(node_intf.ipTrackTable, ipaddr, 1)
+ if err != nil {
+ fmt.Printf("Failed to add ip address %v to node interface %v: %v\n", backtoIP4(int64(ipaddr)), name, err)
+ }
+ }
+}
+
+func getNodeIntfSession(session_key session_key_t) (string, *session_t, error) {
+ key_buf := &bytes.Buffer{}
+ binary.Write(key_buf, binary.LittleEndian, session_key)
+ key := append([]byte(nil), key_buf.Bytes()...)
+
+ for node, node_intf := range nodeIntfMap {
+ fmt.Printf("For node interface %v... ", node)
+ //dumpBPFTable(node_intf.sessionTable)
+ if leaf, err := node_intf.sessionTable.Get(key); err == nil {
+ leaf_buf := bytes.NewBuffer(leaf)
+ session := session_t{}
+ binary.Read(leaf_buf, binary.LittleEndian, &session)
+ return node, &session, nil
+ }
+ }
+ return "", nil, errors.New("session not found")
+}
+
+func delNodeIntfSession(node_iname string, key []byte) error {
+ nodeIntf := nodeIntfMap[node_iname]
+ err := nodeIntf.sessionTable.Delete(key)
+ if err != nil {
+ fmt.Printf("Error deleting session %v from node interface %v: %v\n",
+ key, node_iname, err)
+ }
+ return err
+}
+
func ClovisorNewPodInit(k8s_client *ClovisorK8s,
node_name string,
pod_name string,
@@ -457,7 +759,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
return nil, err
}
- sessionMap = map[string]session_info_t{};
+ sessionMap = map[string]*session_info_t{};
fmt.Printf("Beginning network tracing for pod %v\n", pod_name)
@@ -492,7 +794,7 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
traffic_table := bcc.NewTable(bpf_mod.TableId("dports2proto"), bpf_mod)
if err := setTrafficTable(traffic_table, int(monitoring_info.port_num),
- monitoring_info.protocol, true);
+ monitoring_info.protocols[0], true);
err != nil {
fmt.Printf("Error on setting traffic port")
return nil, err
@@ -587,6 +889,8 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
return nil, err
}
+ setNodeIntfTrackingIP(ip2Long(monitoring_info.pod_ip))
+
table := bcc.NewTable(bpf_mod.TableId("skb_events"), bpf_mod)
skb_rev_chan := make(chan []byte)
@@ -597,17 +901,20 @@ func ClovisorNewPodInit(k8s_client *ClovisorK8s,
return nil, err
}
- tracer, closer := initJaeger(monitoring_info.svc_name)
- ticker := time.NewTicker(500 * time.Millisecond)
stop := make(chan bool)
go func() {
+ fmt.Printf("Start tracing to Jaeger with service %v\n", monitoring_info.svc_name)
+ tracer, closer := initJaeger(monitoring_info.svc_name)
+ tracerMap[monitoring_info.svc_name] = tracer
+ ticker := time.NewTicker(500 * time.Millisecond)
for {
select {
case <- ticker.C:
- print_network_traces(tracer)
+ print_network_traces()
case data := <-skb_rev_chan:
err = handle_skb_event(&data, node_name, pod_name, session_table,
- monitoring_info, egress_match_list)
+ monitoring_info, egress_match_list,
+ monitoring_info.svc_name)
if err != nil {
fmt.Printf("failed to decode received data: %s\n", err)
}