diff options
author | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:21:41 -0700 |
---|---|---|
committer | Ashlee Young <ashlee@onosfw.com> | 2015-09-09 22:21:41 -0700 |
commit | 8879b125d26e8db1a5633de5a9c692eb2d1c4f83 (patch) | |
tree | c7259d85a991b83dfa85ab2e339360669fc1f58e /framework/src/suricata/src/flow-hash.c | |
parent | 13d05bc8458758ee39cb829098241e89616717ee (diff) |
suricata checkin based on commit id a4bce14770beee46a537eda3c3f6e8e8565d5d0a
Change-Id: I9a214fa0ee95e58fc640e50bd604dac7f42db48f
Diffstat (limited to 'framework/src/suricata/src/flow-hash.c')
-rw-r--r-- | framework/src/suricata/src/flow-hash.c | 851 |
1 files changed, 851 insertions, 0 deletions
diff --git a/framework/src/suricata/src/flow-hash.c b/framework/src/suricata/src/flow-hash.c new file mode 100644 index 00000000..7a151199 --- /dev/null +++ b/framework/src/suricata/src/flow-hash.c @@ -0,0 +1,851 @@ +/* Copyright (C) 2007-2013 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Victor Julien <victor@inliniac.net> + * \author Pablo Rincon Crespo <pablo.rincon.crespo@gmail.com> + * + * Flow Hashing functions. + */ + +#include "suricata-common.h" +#include "threads.h" + +#include "decode.h" +#include "detect-engine-state.h" + +#include "flow.h" +#include "flow-hash.h" +#include "flow-util.h" +#include "flow-private.h" +#include "flow-manager.h" +#include "app-layer-parser.h" + +#include "util-time.h" +#include "util-debug.h" + +#include "util-hash-lookup3.h" + +#include "conf.h" +#include "output.h" +#include "output-flow.h" + +#define FLOW_DEFAULT_FLOW_PRUNE 5 + +SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx); +SC_ATOMIC_EXTERN(unsigned int, flow_flags); + +static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv); +static int handle_tcp_reuse = 1; + +#ifdef FLOW_DEBUG_STATS +#define FLOW_DEBUG_STATS_PROTO_ALL 0 +#define FLOW_DEBUG_STATS_PROTO_TCP 1 +#define FLOW_DEBUG_STATS_PROTO_UDP 2 +#define FLOW_DEBUG_STATS_PROTO_ICMP 3 +#define FLOW_DEBUG_STATS_PROTO_OTHER 4 + +static uint64_t flow_hash_count[5] = { 0, 0, 0, 0, 0 }; /* how often are we looking for a hash */ +static uint64_t flow_hash_loop_count[5] = { 0, 0, 0, 0, 0 }; /* how often do we loop through a hash bucket */ +static FILE *flow_hash_count_fp = NULL; +static SCSpinlock flow_hash_count_lock; + +#define FlowHashCountUpdate do { \ + SCSpinLock(&flow_hash_count_lock); \ + flow_hash_count[FLOW_DEBUG_STATS_PROTO_ALL]++; \ + flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL] += _flow_hash_counter; \ + if (f != NULL) { \ + if (p->proto == IPPROTO_TCP) { \ + flow_hash_count[FLOW_DEBUG_STATS_PROTO_TCP]++; \ + flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP] += _flow_hash_counter; \ + } else if (p->proto == IPPROTO_UDP) {\ + flow_hash_count[FLOW_DEBUG_STATS_PROTO_UDP]++; \ + flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP] += _flow_hash_counter; \ + } else if (p->proto == IPPROTO_ICMP) {\ + flow_hash_count[FLOW_DEBUG_STATS_PROTO_ICMP]++; \ + flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP] += _flow_hash_counter; \ + } else {\ + flow_hash_count[FLOW_DEBUG_STATS_PROTO_OTHER]++; \ + flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER] += _flow_hash_counter; \ + } \ + } \ + SCSpinUnlock(&flow_hash_count_lock); \ +} while(0); + +#define FlowHashCountInit uint64_t _flow_hash_counter = 0 +#define FlowHashCountIncr _flow_hash_counter++; + +void FlowHashDebugInit(void) +{ +#ifdef FLOW_DEBUG_STATS + SCSpinInit(&flow_hash_count_lock, 0); +#endif + flow_hash_count_fp = fopen("flow-debug.log", "w+"); + if (flow_hash_count_fp != NULL) { + fprintf(flow_hash_count_fp, "ts,all,tcp,udp,icmp,other\n"); + } +} + +void FlowHashDebugPrint(uint32_t ts) +{ +#ifdef FLOW_DEBUG_STATS + if (flow_hash_count_fp == NULL) + return; + + float avg_all = 0, avg_tcp = 0, avg_udp = 0, avg_icmp = 0, avg_other = 0; + SCSpinLock(&flow_hash_count_lock); + if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL] != 0) + avg_all = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_ALL])); + if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP] != 0) + avg_tcp = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_TCP])); + if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP] != 0) + avg_udp = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_UDP])); + if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP] != 0) + avg_icmp= (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_ICMP])); + if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER] != 0) + avg_other= (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_OTHER])); + fprintf(flow_hash_count_fp, "%"PRIu32",%02.3f,%02.3f,%02.3f,%02.3f,%02.3f\n", ts, avg_all, avg_tcp, avg_udp, avg_icmp, avg_other); + fflush(flow_hash_count_fp); + memset(&flow_hash_count, 0, sizeof(flow_hash_count)); + memset(&flow_hash_loop_count, 0, sizeof(flow_hash_loop_count)); + SCSpinUnlock(&flow_hash_count_lock); +#endif +} + +void FlowHashDebugDeinit(void) +{ +#ifdef FLOW_DEBUG_STATS + struct timeval ts; + memset(&ts, 0, sizeof(ts)); + TimeGet(&ts); + FlowHashDebugPrint((uint32_t)ts.tv_sec); + if (flow_hash_count_fp != NULL) + fclose(flow_hash_count_fp); + SCSpinDestroy(&flow_hash_count_lock); +#endif +} + +#else + +#define FlowHashCountUpdate +#define FlowHashCountInit +#define FlowHashCountIncr + +#endif /* FLOW_DEBUG_STATS */ + +void FlowDisableTcpReuseHandling(void) +{ + handle_tcp_reuse = 0; +} + +/** \brief compare two raw ipv6 addrs + * + * \note we don't care about the real ipv6 ip's, this is just + * to consistently fill the FlowHashKey6 struct, without all + * the ntohl calls. + * + * \warning do not use elsewhere unless you know what you're doing. + * detect-engine-address-ipv6.c's AddressIPv6GtU32 is likely + * what you are looking for. + */ +static inline int FlowHashRawAddressIPv6GtU32(const uint32_t *a, const uint32_t *b) +{ + int i; + + for (i = 0; i < 4; i++) { + if (a[i] > b[i]) + return 1; + if (a[i] < b[i]) + break; + } + + return 0; +} + +typedef struct FlowHashKey4_ { + union { + struct { + uint32_t src, dst; + uint16_t sp, dp; + uint16_t proto; /**< u16 so proto and recur add up to u32 */ + uint16_t recur; /**< u16 so proto and recur add up to u32 */ + uint16_t vlan_id[2]; + }; + const uint32_t u32[5]; + }; +} FlowHashKey4; + +typedef struct FlowHashKey6_ { + union { + struct { + uint32_t src[4], dst[4]; + uint16_t sp, dp; + uint16_t proto; /**< u16 so proto and recur add up to u32 */ + uint16_t recur; /**< u16 so proto and recur add up to u32 */ + uint16_t vlan_id[2]; + }; + const uint32_t u32[11]; + }; +} FlowHashKey6; + +/* calculate the hash key for this packet + * + * we're using: + * hash_rand -- set at init time + * source port + * destination port + * source address + * destination address + * recursion level -- for tunnels, make sure different tunnel layers can + * never get mixed up. + * + * For ICMP we only consider UNREACHABLE errors atm. + */ +static inline uint32_t FlowGetKey(const Packet *p) +{ + uint32_t key; + + if (p->ip4h != NULL) { + if (p->tcph != NULL || p->udph != NULL) { + FlowHashKey4 fhk; + if (p->src.addr_data32[0] > p->dst.addr_data32[0]) { + fhk.src = p->src.addr_data32[0]; + fhk.dst = p->dst.addr_data32[0]; + } else { + fhk.src = p->dst.addr_data32[0]; + fhk.dst = p->src.addr_data32[0]; + } + if (p->sp > p->dp) { + fhk.sp = p->sp; + fhk.dp = p->dp; + } else { + fhk.sp = p->dp; + fhk.dp = p->sp; + } + fhk.proto = (uint16_t)p->proto; + fhk.recur = (uint16_t)p->recursion_level; + fhk.vlan_id[0] = p->vlan_id[0]; + fhk.vlan_id[1] = p->vlan_id[1]; + + uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand); + key = hash % flow_config.hash_size; + + } else if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { + uint32_t psrc = IPV4_GET_RAW_IPSRC_U32(ICMPV4_GET_EMB_IPV4(p)); + uint32_t pdst = IPV4_GET_RAW_IPDST_U32(ICMPV4_GET_EMB_IPV4(p)); + FlowHashKey4 fhk; + if (psrc > pdst) { + fhk.src = psrc; + fhk.dst = pdst; + } else { + fhk.src = pdst; + fhk.dst = psrc; + } + if (p->icmpv4vars.emb_sport > p->icmpv4vars.emb_dport) { + fhk.sp = p->icmpv4vars.emb_sport; + fhk.dp = p->icmpv4vars.emb_dport; + } else { + fhk.sp = p->icmpv4vars.emb_dport; + fhk.dp = p->icmpv4vars.emb_sport; + } + fhk.proto = (uint16_t)ICMPV4_GET_EMB_PROTO(p); + fhk.recur = (uint16_t)p->recursion_level; + fhk.vlan_id[0] = p->vlan_id[0]; + fhk.vlan_id[1] = p->vlan_id[1]; + + uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand); + key = hash % flow_config.hash_size; + + } else { + FlowHashKey4 fhk; + if (p->src.addr_data32[0] > p->dst.addr_data32[0]) { + fhk.src = p->src.addr_data32[0]; + fhk.dst = p->dst.addr_data32[0]; + } else { + fhk.src = p->dst.addr_data32[0]; + fhk.dst = p->src.addr_data32[0]; + } + fhk.sp = 0xfeed; + fhk.dp = 0xbeef; + fhk.proto = (uint16_t)p->proto; + fhk.recur = (uint16_t)p->recursion_level; + fhk.vlan_id[0] = p->vlan_id[0]; + fhk.vlan_id[1] = p->vlan_id[1]; + + uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand); + key = hash % flow_config.hash_size; + } + } else if (p->ip6h != NULL) { + FlowHashKey6 fhk; + if (FlowHashRawAddressIPv6GtU32(p->src.addr_data32, p->dst.addr_data32)) { + fhk.src[0] = p->src.addr_data32[0]; + fhk.src[1] = p->src.addr_data32[1]; + fhk.src[2] = p->src.addr_data32[2]; + fhk.src[3] = p->src.addr_data32[3]; + fhk.dst[0] = p->dst.addr_data32[0]; + fhk.dst[1] = p->dst.addr_data32[1]; + fhk.dst[2] = p->dst.addr_data32[2]; + fhk.dst[3] = p->dst.addr_data32[3]; + } else { + fhk.src[0] = p->dst.addr_data32[0]; + fhk.src[1] = p->dst.addr_data32[1]; + fhk.src[2] = p->dst.addr_data32[2]; + fhk.src[3] = p->dst.addr_data32[3]; + fhk.dst[0] = p->src.addr_data32[0]; + fhk.dst[1] = p->src.addr_data32[1]; + fhk.dst[2] = p->src.addr_data32[2]; + fhk.dst[3] = p->src.addr_data32[3]; + } + if (p->sp > p->dp) { + fhk.sp = p->sp; + fhk.dp = p->dp; + } else { + fhk.sp = p->dp; + fhk.dp = p->sp; + } + fhk.proto = (uint16_t)p->proto; + fhk.recur = (uint16_t)p->recursion_level; + fhk.vlan_id[0] = p->vlan_id[0]; + fhk.vlan_id[1] = p->vlan_id[1]; + + uint32_t hash = hashword(fhk.u32, 11, flow_config.hash_rand); + key = hash % flow_config.hash_size; + } else + key = 0; + + return key; +} + +/* Since two or more flows can have the same hash key, we need to compare + * the flow with the current flow key. */ +#define CMP_FLOW(f1,f2) \ + (((CMP_ADDR(&(f1)->src, &(f2)->src) && \ + CMP_ADDR(&(f1)->dst, &(f2)->dst) && \ + CMP_PORT((f1)->sp, (f2)->sp) && CMP_PORT((f1)->dp, (f2)->dp)) || \ + (CMP_ADDR(&(f1)->src, &(f2)->dst) && \ + CMP_ADDR(&(f1)->dst, &(f2)->src) && \ + CMP_PORT((f1)->sp, (f2)->dp) && CMP_PORT((f1)->dp, (f2)->sp))) && \ + (f1)->proto == (f2)->proto && \ + (f1)->recursion_level == (f2)->recursion_level && \ + (f1)->vlan_id[0] == (f2)->vlan_id[0] && \ + (f1)->vlan_id[1] == (f2)->vlan_id[1]) + +/** + * \brief See if a ICMP packet belongs to a flow by comparing the embedded + * packet in the ICMP error packet to the flow. + * + * \param f flow + * \param p ICMP packet + * + * \retval 1 match + * \retval 0 no match + */ +static inline int FlowCompareICMPv4(Flow *f, const Packet *p) +{ + if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { + /* first check the direction of the flow, in other words, the client -> + * server direction as it's most likely the ICMP error will be a + * response to the clients traffic */ + if ((f->src.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32( ICMPV4_GET_EMB_IPV4(p) )) && + (f->dst.addr_data32[0] == IPV4_GET_RAW_IPDST_U32( ICMPV4_GET_EMB_IPV4(p) )) && + f->sp == p->icmpv4vars.emb_sport && + f->dp == p->icmpv4vars.emb_dport && + f->proto == ICMPV4_GET_EMB_PROTO(p) && + f->recursion_level == p->recursion_level && + f->vlan_id[0] == p->vlan_id[0] && + f->vlan_id[1] == p->vlan_id[1]) + { + return 1; + + /* check the less likely case where the ICMP error was a response to + * a packet from the server. */ + } else if ((f->dst.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32( ICMPV4_GET_EMB_IPV4(p) )) && + (f->src.addr_data32[0] == IPV4_GET_RAW_IPDST_U32( ICMPV4_GET_EMB_IPV4(p) )) && + f->dp == p->icmpv4vars.emb_sport && + f->sp == p->icmpv4vars.emb_dport && + f->proto == ICMPV4_GET_EMB_PROTO(p) && + f->recursion_level == p->recursion_level && + f->vlan_id[0] == p->vlan_id[0] && + f->vlan_id[1] == p->vlan_id[1]) + { + return 1; + } + + /* no match, fall through */ + } else { + /* just treat ICMP as a normal proto for now */ + return CMP_FLOW(f, p); + } + + return 0; +} + +int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn); + +static inline int FlowCompare(Flow *f, const Packet *p) +{ + if (p->proto == IPPROTO_ICMP) { + return FlowCompareICMPv4(f, p); + } else if (p->proto == IPPROTO_TCP) { + if (CMP_FLOW(f, p) == 0) + return 0; + + /* if this session is 'reused', we don't return it anymore, + * so return false on the compare */ + if (f->flags & FLOW_TCP_REUSED) + return 0; + + if (handle_tcp_reuse == 1) { + /* lets see if we need to consider the existing session reuse */ + if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) { + /* okay, we need to setup a new flow for this packet. + * Flag the flow that it's been replaced by a new one */ + f->flags |= FLOW_TCP_REUSED; + SCLogDebug("flow obsolete: TCP reuse will use a new flow " + "starting with packet %"PRIu64, p->pcap_cnt); + return 0; + } + } + return 1; + } else { + return CMP_FLOW(f, p); + } +} + +/** + * \brief Check if we should create a flow based on a packet + * + * We use this check to filter out flow creation based on: + * - ICMP error messages + * + * \param p packet + * \retval 1 true + * \retval 0 false + */ +static inline int FlowCreateCheck(const Packet *p) +{ + if (PKT_IS_ICMPV4(p)) { + if (ICMPV4_IS_ERROR_MSG(p)) { + return 0; + } + } + + return 1; +} + +/** + * \brief Get a new flow + * + * Get a new flow. We're checking memcap first and will try to make room + * if the memcap is reached. + * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * + * \retval f *LOCKED* flow on succes, NULL on error. + */ +static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) +{ + Flow *f = NULL; + + if (FlowCreateCheck(p) == 0) { + return NULL; + } + + /* get a flow from the spare queue */ + f = FlowDequeue(&flow_spare_q); + if (f == NULL) { + /* If we reached the max memcap, we get a used flow */ + if (!(FLOW_CHECK_MEMCAP(sizeof(Flow)))) { + /* declare state of emergency */ + if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) { + SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY); + + /* under high load, waking up the flow mgr each time leads + * to high cpu usage. Flows are not timed out much faster if + * we check a 1000 times a second. */ + FlowWakeupFlowManagerThread(); + } + + f = FlowGetUsedFlow(tv, dtv); + if (f == NULL) { + /* very rare, but we can fail. Just giving up */ + return NULL; + } + + /* freed a flow, but it's unlocked */ + } else { + /* now see if we can alloc a new flow */ + f = FlowAlloc(); + if (f == NULL) { + return NULL; + } + + /* flow is initialized but *unlocked* */ + } + } else { + /* flow has been recycled before it went into the spare queue */ + + /* flow is initialized (recylced) but *unlocked* */ + } + + FLOWLOCK_WRLOCK(f); + return f; +} + +Flow *FlowGetFlowFromHashByPacket(const Packet *p) +{ + Flow *f = NULL; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + f = FlowGetNew(NULL, NULL, p); + if (f != NULL) { + /* flow is locked */ + if (fb->head == NULL) { + fb->head = f; + fb->tail = f; + } else { + f->hprev = fb->tail; + fb->tail->hnext = f; + fb->tail = f; + } + + /* got one, now lock, initialize and return */ + FlowInit(f, p); + f->fb = fb; + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + } + FBLOCK_UNLOCK(fb); + return f; +} + +/** \brief Lookup flow based on packet + * + * Find the flow belonging to this packet. If not found, no new flow + * is set up. + * + * \param p packet to lookup the flow for + * + * \retval f flow or NULL if not found + */ +Flow *FlowLookupFlowFromHash(const Packet *p) +{ + Flow *f = NULL; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + /* see if the bucket already has a flow */ + if (fb->head == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + + /* ok, we have a flow in the bucket. Let's find out if it is our flow */ + f = fb->head; + + /* see if this is the flow we are looking for */ + if (FlowCompare(f, p) == 0) { + while (f) { + FlowHashCountIncr; + + f = f->hnext; + + if (f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + + if (FlowCompare(f, p) != 0) { + /* we found our flow, lets put it on top of the + * hash list -- this rewards active flows */ + if (f->hnext) { + f->hnext->hprev = f->hprev; + } + if (f->hprev) { + f->hprev->hnext = f->hnext; + } + if (f == fb->tail) { + fb->tail = f->hprev; + } + + f->hnext = fb->head; + f->hprev = NULL; + fb->head->hprev = f; + fb->head = f; + + /* found our flow, lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + return f; + } + } + } + + /* lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + return f; +} + +/** \brief Get Flow for packet + * + * Hash retrieval function for flows. Looks up the hash bucket containing the + * flow pointer. Then compares the packet with the found flow to see if it is + * the flow we need. If it isn't, walk the list until the right flow is found. + * + * If the flow is not found or the bucket was emtpy, a new flow is taken from + * the queue. FlowDequeue() will alloc new flows as long as we stay within our + * memcap limit. + * + * The p->flow pointer is updated to point to the flow. + * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * + * \retval f *LOCKED* flow or NULL + */ +Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) +{ + Flow *f = NULL; + FlowHashCountInit; + + /* get the key to our bucket */ + uint32_t key = FlowGetKey(p); + /* get our hash bucket and lock it */ + FlowBucket *fb = &flow_hash[key]; + FBLOCK_LOCK(fb); + + SCLogDebug("fb %p fb->head %p", fb, fb->head); + + FlowHashCountIncr; + + /* see if the bucket already has a flow */ + if (fb->head == NULL) { + f = FlowGetNew(tv, dtv, p); + if (f == NULL) { + FBLOCK_UNLOCK(fb); + FlowHashCountUpdate; + return NULL; + } + + /* flow is locked */ + fb->head = f; + fb->tail = f; + + /* got one, now lock, initialize and return */ + FlowInit(f, p); + f->fb = fb; + + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + FlowHashCountUpdate; + return f; + } + + /* ok, we have a flow in the bucket. Let's find out if it is our flow */ + f = fb->head; + + /* see if this is the flow we are looking for */ + if (FlowCompare(f, p) == 0) { + Flow *pf = NULL; /* previous flow */ + + while (f) { + FlowHashCountIncr; + + pf = f; + f = f->hnext; + + if (f == NULL) { + f = pf->hnext = FlowGetNew(tv, dtv, p); + if (f == NULL) { + FBLOCK_UNLOCK(fb); + FlowHashCountUpdate; + return NULL; + } + fb->tail = f; + + /* flow is locked */ + + f->hprev = pf; + + /* initialize and return */ + FlowInit(f, p); + f->fb = fb; + + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + FlowHashCountUpdate; + return f; + } + + if (FlowCompare(f, p) != 0) { + /* we found our flow, lets put it on top of the + * hash list -- this rewards active flows */ + if (f->hnext) { + f->hnext->hprev = f->hprev; + } + if (f->hprev) { + f->hprev->hnext = f->hnext; + } + if (f == fb->tail) { + fb->tail = f->hprev; + } + + f->hnext = fb->head; + f->hprev = NULL; + fb->head->hprev = f; + fb->head = f; + + /* found our flow, lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + FlowHashCountUpdate; + return f; + } + } + } + + /* lock & return */ + FLOWLOCK_WRLOCK(f); + /* update the last seen timestamp of this flow */ + COPY_TIMESTAMP(&p->ts,&f->lastts); + + FBLOCK_UNLOCK(fb); + FlowHashCountUpdate; + return f; +} + +/** \internal + * \brief Get a flow from the hash directly. + * + * Called in conditions where the spare queue is empty and memcap is reached. + * + * Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt + * is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the + * top each time since that would clear the top of the hash leading to longer + * and longer search times under high pressure (observed). + * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * + * \retval f flow or NULL + */ +static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv) +{ + uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size; + uint32_t cnt = flow_config.hash_size; + + while (cnt--) { + if (++idx >= flow_config.hash_size) + idx = 0; + + FlowBucket *fb = &flow_hash[idx]; + + if (FBLOCK_TRYLOCK(fb) != 0) + continue; + + Flow *f = fb->tail; + if (f == NULL) { + FBLOCK_UNLOCK(fb); + continue; + } + + if (FLOWLOCK_TRYWRLOCK(f) != 0) { + FBLOCK_UNLOCK(fb); + continue; + } + + /** never prune a flow that is used by a packet or stream msg + * we are currently processing in one of the threads */ + if (SC_ATOMIC_GET(f->use_cnt) > 0) { + FBLOCK_UNLOCK(fb); + FLOWLOCK_UNLOCK(f); + continue; + } + + /* remove from the hash */ + if (f->hprev != NULL) + f->hprev->hnext = f->hnext; + if (f->hnext != NULL) + f->hnext->hprev = f->hprev; + if (fb->head == f) + fb->head = f->hnext; + if (fb->tail == f) + fb->tail = f->hprev; + + f->hnext = NULL; + f->hprev = NULL; + f->fb = NULL; + FBLOCK_UNLOCK(fb); + + int state = SC_ATOMIC_GET(f->flow_state); + if (state == FLOW_STATE_NEW) + f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW; + else if (state == FLOW_STATE_ESTABLISHED) + f->flow_end_flags |= FLOW_END_FLAG_STATE_ESTABLISHED; + else if (state == FLOW_STATE_CLOSED) + f->flow_end_flags |= FLOW_END_FLAG_STATE_CLOSED; + + f->flow_end_flags |= FLOW_END_FLAG_FORCED; + + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) + f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY; + + /* invoke flow log api */ + if (dtv && dtv->output_flow_thread_data) + (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f); + + FlowClearMemory(f, f->protomap); + + FLOWLOCK_UNLOCK(f); + + (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt)); + return f; + } + + return NULL; +} |