aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/suricata/src/flow-hash.c
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-09-09 22:21:41 -0700
committerAshlee Young <ashlee@onosfw.com>2015-09-09 22:21:41 -0700
commit8879b125d26e8db1a5633de5a9c692eb2d1c4f83 (patch)
treec7259d85a991b83dfa85ab2e339360669fc1f58e /framework/src/suricata/src/flow-hash.c
parent13d05bc8458758ee39cb829098241e89616717ee (diff)
suricata checkin based on commit id a4bce14770beee46a537eda3c3f6e8e8565d5d0a
Change-Id: I9a214fa0ee95e58fc640e50bd604dac7f42db48f
Diffstat (limited to 'framework/src/suricata/src/flow-hash.c')
-rw-r--r--framework/src/suricata/src/flow-hash.c851
1 files changed, 851 insertions, 0 deletions
diff --git a/framework/src/suricata/src/flow-hash.c b/framework/src/suricata/src/flow-hash.c
new file mode 100644
index 00000000..7a151199
--- /dev/null
+++ b/framework/src/suricata/src/flow-hash.c
@@ -0,0 +1,851 @@
+/* Copyright (C) 2007-2013 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ * \author Pablo Rincon Crespo <pablo.rincon.crespo@gmail.com>
+ *
+ * Flow Hashing functions.
+ */
+
+#include "suricata-common.h"
+#include "threads.h"
+
+#include "decode.h"
+#include "detect-engine-state.h"
+
+#include "flow.h"
+#include "flow-hash.h"
+#include "flow-util.h"
+#include "flow-private.h"
+#include "flow-manager.h"
+#include "app-layer-parser.h"
+
+#include "util-time.h"
+#include "util-debug.h"
+
+#include "util-hash-lookup3.h"
+
+#include "conf.h"
+#include "output.h"
+#include "output-flow.h"
+
+#define FLOW_DEFAULT_FLOW_PRUNE 5
+
+SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
+SC_ATOMIC_EXTERN(unsigned int, flow_flags);
+
+static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv);
+static int handle_tcp_reuse = 1;
+
+#ifdef FLOW_DEBUG_STATS
+#define FLOW_DEBUG_STATS_PROTO_ALL 0
+#define FLOW_DEBUG_STATS_PROTO_TCP 1
+#define FLOW_DEBUG_STATS_PROTO_UDP 2
+#define FLOW_DEBUG_STATS_PROTO_ICMP 3
+#define FLOW_DEBUG_STATS_PROTO_OTHER 4
+
+static uint64_t flow_hash_count[5] = { 0, 0, 0, 0, 0 }; /* how often are we looking for a hash */
+static uint64_t flow_hash_loop_count[5] = { 0, 0, 0, 0, 0 }; /* how often do we loop through a hash bucket */
+static FILE *flow_hash_count_fp = NULL;
+static SCSpinlock flow_hash_count_lock;
+
+#define FlowHashCountUpdate do { \
+ SCSpinLock(&flow_hash_count_lock); \
+ flow_hash_count[FLOW_DEBUG_STATS_PROTO_ALL]++; \
+ flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL] += _flow_hash_counter; \
+ if (f != NULL) { \
+ if (p->proto == IPPROTO_TCP) { \
+ flow_hash_count[FLOW_DEBUG_STATS_PROTO_TCP]++; \
+ flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP] += _flow_hash_counter; \
+ } else if (p->proto == IPPROTO_UDP) {\
+ flow_hash_count[FLOW_DEBUG_STATS_PROTO_UDP]++; \
+ flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP] += _flow_hash_counter; \
+ } else if (p->proto == IPPROTO_ICMP) {\
+ flow_hash_count[FLOW_DEBUG_STATS_PROTO_ICMP]++; \
+ flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP] += _flow_hash_counter; \
+ } else {\
+ flow_hash_count[FLOW_DEBUG_STATS_PROTO_OTHER]++; \
+ flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER] += _flow_hash_counter; \
+ } \
+ } \
+ SCSpinUnlock(&flow_hash_count_lock); \
+} while(0);
+
+#define FlowHashCountInit uint64_t _flow_hash_counter = 0
+#define FlowHashCountIncr _flow_hash_counter++;
+
+void FlowHashDebugInit(void)
+{
+#ifdef FLOW_DEBUG_STATS
+ SCSpinInit(&flow_hash_count_lock, 0);
+#endif
+ flow_hash_count_fp = fopen("flow-debug.log", "w+");
+ if (flow_hash_count_fp != NULL) {
+ fprintf(flow_hash_count_fp, "ts,all,tcp,udp,icmp,other\n");
+ }
+}
+
+void FlowHashDebugPrint(uint32_t ts)
+{
+#ifdef FLOW_DEBUG_STATS
+ if (flow_hash_count_fp == NULL)
+ return;
+
+ float avg_all = 0, avg_tcp = 0, avg_udp = 0, avg_icmp = 0, avg_other = 0;
+ SCSpinLock(&flow_hash_count_lock);
+ if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL] != 0)
+ avg_all = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_ALL]));
+ if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP] != 0)
+ avg_tcp = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_TCP]));
+ if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP] != 0)
+ avg_udp = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_UDP]));
+ if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP] != 0)
+ avg_icmp= (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_ICMP]));
+ if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER] != 0)
+ avg_other= (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_OTHER]));
+ fprintf(flow_hash_count_fp, "%"PRIu32",%02.3f,%02.3f,%02.3f,%02.3f,%02.3f\n", ts, avg_all, avg_tcp, avg_udp, avg_icmp, avg_other);
+ fflush(flow_hash_count_fp);
+ memset(&flow_hash_count, 0, sizeof(flow_hash_count));
+ memset(&flow_hash_loop_count, 0, sizeof(flow_hash_loop_count));
+ SCSpinUnlock(&flow_hash_count_lock);
+#endif
+}
+
+void FlowHashDebugDeinit(void)
+{
+#ifdef FLOW_DEBUG_STATS
+ struct timeval ts;
+ memset(&ts, 0, sizeof(ts));
+ TimeGet(&ts);
+ FlowHashDebugPrint((uint32_t)ts.tv_sec);
+ if (flow_hash_count_fp != NULL)
+ fclose(flow_hash_count_fp);
+ SCSpinDestroy(&flow_hash_count_lock);
+#endif
+}
+
+#else
+
+#define FlowHashCountUpdate
+#define FlowHashCountInit
+#define FlowHashCountIncr
+
+#endif /* FLOW_DEBUG_STATS */
+
+void FlowDisableTcpReuseHandling(void)
+{
+ handle_tcp_reuse = 0;
+}
+
+/** \brief compare two raw ipv6 addrs
+ *
+ * \note we don't care about the real ipv6 ip's, this is just
+ * to consistently fill the FlowHashKey6 struct, without all
+ * the ntohl calls.
+ *
+ * \warning do not use elsewhere unless you know what you're doing.
+ * detect-engine-address-ipv6.c's AddressIPv6GtU32 is likely
+ * what you are looking for.
+ */
+static inline int FlowHashRawAddressIPv6GtU32(const uint32_t *a, const uint32_t *b)
+{
+ int i;
+
+ for (i = 0; i < 4; i++) {
+ if (a[i] > b[i])
+ return 1;
+ if (a[i] < b[i])
+ break;
+ }
+
+ return 0;
+}
+
+typedef struct FlowHashKey4_ {
+ union {
+ struct {
+ uint32_t src, dst;
+ uint16_t sp, dp;
+ uint16_t proto; /**< u16 so proto and recur add up to u32 */
+ uint16_t recur; /**< u16 so proto and recur add up to u32 */
+ uint16_t vlan_id[2];
+ };
+ const uint32_t u32[5];
+ };
+} FlowHashKey4;
+
+typedef struct FlowHashKey6_ {
+ union {
+ struct {
+ uint32_t src[4], dst[4];
+ uint16_t sp, dp;
+ uint16_t proto; /**< u16 so proto and recur add up to u32 */
+ uint16_t recur; /**< u16 so proto and recur add up to u32 */
+ uint16_t vlan_id[2];
+ };
+ const uint32_t u32[11];
+ };
+} FlowHashKey6;
+
+/* calculate the hash key for this packet
+ *
+ * we're using:
+ * hash_rand -- set at init time
+ * source port
+ * destination port
+ * source address
+ * destination address
+ * recursion level -- for tunnels, make sure different tunnel layers can
+ * never get mixed up.
+ *
+ * For ICMP we only consider UNREACHABLE errors atm.
+ */
+static inline uint32_t FlowGetKey(const Packet *p)
+{
+ uint32_t key;
+
+ if (p->ip4h != NULL) {
+ if (p->tcph != NULL || p->udph != NULL) {
+ FlowHashKey4 fhk;
+ if (p->src.addr_data32[0] > p->dst.addr_data32[0]) {
+ fhk.src = p->src.addr_data32[0];
+ fhk.dst = p->dst.addr_data32[0];
+ } else {
+ fhk.src = p->dst.addr_data32[0];
+ fhk.dst = p->src.addr_data32[0];
+ }
+ if (p->sp > p->dp) {
+ fhk.sp = p->sp;
+ fhk.dp = p->dp;
+ } else {
+ fhk.sp = p->dp;
+ fhk.dp = p->sp;
+ }
+ fhk.proto = (uint16_t)p->proto;
+ fhk.recur = (uint16_t)p->recursion_level;
+ fhk.vlan_id[0] = p->vlan_id[0];
+ fhk.vlan_id[1] = p->vlan_id[1];
+
+ uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand);
+ key = hash % flow_config.hash_size;
+
+ } else if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
+ uint32_t psrc = IPV4_GET_RAW_IPSRC_U32(ICMPV4_GET_EMB_IPV4(p));
+ uint32_t pdst = IPV4_GET_RAW_IPDST_U32(ICMPV4_GET_EMB_IPV4(p));
+ FlowHashKey4 fhk;
+ if (psrc > pdst) {
+ fhk.src = psrc;
+ fhk.dst = pdst;
+ } else {
+ fhk.src = pdst;
+ fhk.dst = psrc;
+ }
+ if (p->icmpv4vars.emb_sport > p->icmpv4vars.emb_dport) {
+ fhk.sp = p->icmpv4vars.emb_sport;
+ fhk.dp = p->icmpv4vars.emb_dport;
+ } else {
+ fhk.sp = p->icmpv4vars.emb_dport;
+ fhk.dp = p->icmpv4vars.emb_sport;
+ }
+ fhk.proto = (uint16_t)ICMPV4_GET_EMB_PROTO(p);
+ fhk.recur = (uint16_t)p->recursion_level;
+ fhk.vlan_id[0] = p->vlan_id[0];
+ fhk.vlan_id[1] = p->vlan_id[1];
+
+ uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand);
+ key = hash % flow_config.hash_size;
+
+ } else {
+ FlowHashKey4 fhk;
+ if (p->src.addr_data32[0] > p->dst.addr_data32[0]) {
+ fhk.src = p->src.addr_data32[0];
+ fhk.dst = p->dst.addr_data32[0];
+ } else {
+ fhk.src = p->dst.addr_data32[0];
+ fhk.dst = p->src.addr_data32[0];
+ }
+ fhk.sp = 0xfeed;
+ fhk.dp = 0xbeef;
+ fhk.proto = (uint16_t)p->proto;
+ fhk.recur = (uint16_t)p->recursion_level;
+ fhk.vlan_id[0] = p->vlan_id[0];
+ fhk.vlan_id[1] = p->vlan_id[1];
+
+ uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand);
+ key = hash % flow_config.hash_size;
+ }
+ } else if (p->ip6h != NULL) {
+ FlowHashKey6 fhk;
+ if (FlowHashRawAddressIPv6GtU32(p->src.addr_data32, p->dst.addr_data32)) {
+ fhk.src[0] = p->src.addr_data32[0];
+ fhk.src[1] = p->src.addr_data32[1];
+ fhk.src[2] = p->src.addr_data32[2];
+ fhk.src[3] = p->src.addr_data32[3];
+ fhk.dst[0] = p->dst.addr_data32[0];
+ fhk.dst[1] = p->dst.addr_data32[1];
+ fhk.dst[2] = p->dst.addr_data32[2];
+ fhk.dst[3] = p->dst.addr_data32[3];
+ } else {
+ fhk.src[0] = p->dst.addr_data32[0];
+ fhk.src[1] = p->dst.addr_data32[1];
+ fhk.src[2] = p->dst.addr_data32[2];
+ fhk.src[3] = p->dst.addr_data32[3];
+ fhk.dst[0] = p->src.addr_data32[0];
+ fhk.dst[1] = p->src.addr_data32[1];
+ fhk.dst[2] = p->src.addr_data32[2];
+ fhk.dst[3] = p->src.addr_data32[3];
+ }
+ if (p->sp > p->dp) {
+ fhk.sp = p->sp;
+ fhk.dp = p->dp;
+ } else {
+ fhk.sp = p->dp;
+ fhk.dp = p->sp;
+ }
+ fhk.proto = (uint16_t)p->proto;
+ fhk.recur = (uint16_t)p->recursion_level;
+ fhk.vlan_id[0] = p->vlan_id[0];
+ fhk.vlan_id[1] = p->vlan_id[1];
+
+ uint32_t hash = hashword(fhk.u32, 11, flow_config.hash_rand);
+ key = hash % flow_config.hash_size;
+ } else
+ key = 0;
+
+ return key;
+}
+
+/* Since two or more flows can have the same hash key, we need to compare
+ * the flow with the current flow key. */
+#define CMP_FLOW(f1,f2) \
+ (((CMP_ADDR(&(f1)->src, &(f2)->src) && \
+ CMP_ADDR(&(f1)->dst, &(f2)->dst) && \
+ CMP_PORT((f1)->sp, (f2)->sp) && CMP_PORT((f1)->dp, (f2)->dp)) || \
+ (CMP_ADDR(&(f1)->src, &(f2)->dst) && \
+ CMP_ADDR(&(f1)->dst, &(f2)->src) && \
+ CMP_PORT((f1)->sp, (f2)->dp) && CMP_PORT((f1)->dp, (f2)->sp))) && \
+ (f1)->proto == (f2)->proto && \
+ (f1)->recursion_level == (f2)->recursion_level && \
+ (f1)->vlan_id[0] == (f2)->vlan_id[0] && \
+ (f1)->vlan_id[1] == (f2)->vlan_id[1])
+
+/**
+ * \brief See if a ICMP packet belongs to a flow by comparing the embedded
+ * packet in the ICMP error packet to the flow.
+ *
+ * \param f flow
+ * \param p ICMP packet
+ *
+ * \retval 1 match
+ * \retval 0 no match
+ */
+static inline int FlowCompareICMPv4(Flow *f, const Packet *p)
+{
+ if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
+ /* first check the direction of the flow, in other words, the client ->
+ * server direction as it's most likely the ICMP error will be a
+ * response to the clients traffic */
+ if ((f->src.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32( ICMPV4_GET_EMB_IPV4(p) )) &&
+ (f->dst.addr_data32[0] == IPV4_GET_RAW_IPDST_U32( ICMPV4_GET_EMB_IPV4(p) )) &&
+ f->sp == p->icmpv4vars.emb_sport &&
+ f->dp == p->icmpv4vars.emb_dport &&
+ f->proto == ICMPV4_GET_EMB_PROTO(p) &&
+ f->recursion_level == p->recursion_level &&
+ f->vlan_id[0] == p->vlan_id[0] &&
+ f->vlan_id[1] == p->vlan_id[1])
+ {
+ return 1;
+
+ /* check the less likely case where the ICMP error was a response to
+ * a packet from the server. */
+ } else if ((f->dst.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32( ICMPV4_GET_EMB_IPV4(p) )) &&
+ (f->src.addr_data32[0] == IPV4_GET_RAW_IPDST_U32( ICMPV4_GET_EMB_IPV4(p) )) &&
+ f->dp == p->icmpv4vars.emb_sport &&
+ f->sp == p->icmpv4vars.emb_dport &&
+ f->proto == ICMPV4_GET_EMB_PROTO(p) &&
+ f->recursion_level == p->recursion_level &&
+ f->vlan_id[0] == p->vlan_id[0] &&
+ f->vlan_id[1] == p->vlan_id[1])
+ {
+ return 1;
+ }
+
+ /* no match, fall through */
+ } else {
+ /* just treat ICMP as a normal proto for now */
+ return CMP_FLOW(f, p);
+ }
+
+ return 0;
+}
+
+int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn);
+
+static inline int FlowCompare(Flow *f, const Packet *p)
+{
+ if (p->proto == IPPROTO_ICMP) {
+ return FlowCompareICMPv4(f, p);
+ } else if (p->proto == IPPROTO_TCP) {
+ if (CMP_FLOW(f, p) == 0)
+ return 0;
+
+ /* if this session is 'reused', we don't return it anymore,
+ * so return false on the compare */
+ if (f->flags & FLOW_TCP_REUSED)
+ return 0;
+
+ if (handle_tcp_reuse == 1) {
+ /* lets see if we need to consider the existing session reuse */
+ if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
+ /* okay, we need to setup a new flow for this packet.
+ * Flag the flow that it's been replaced by a new one */
+ f->flags |= FLOW_TCP_REUSED;
+ SCLogDebug("flow obsolete: TCP reuse will use a new flow "
+ "starting with packet %"PRIu64, p->pcap_cnt);
+ return 0;
+ }
+ }
+ return 1;
+ } else {
+ return CMP_FLOW(f, p);
+ }
+}
+
+/**
+ * \brief Check if we should create a flow based on a packet
+ *
+ * We use this check to filter out flow creation based on:
+ * - ICMP error messages
+ *
+ * \param p packet
+ * \retval 1 true
+ * \retval 0 false
+ */
+static inline int FlowCreateCheck(const Packet *p)
+{
+ if (PKT_IS_ICMPV4(p)) {
+ if (ICMPV4_IS_ERROR_MSG(p)) {
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
+/**
+ * \brief Get a new flow
+ *
+ * Get a new flow. We're checking memcap first and will try to make room
+ * if the memcap is reached.
+ *
+ * \param tv thread vars
+ * \param dtv decode thread vars (for flow log api thread data)
+ *
+ * \retval f *LOCKED* flow on succes, NULL on error.
+ */
+static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
+{
+ Flow *f = NULL;
+
+ if (FlowCreateCheck(p) == 0) {
+ return NULL;
+ }
+
+ /* get a flow from the spare queue */
+ f = FlowDequeue(&flow_spare_q);
+ if (f == NULL) {
+ /* If we reached the max memcap, we get a used flow */
+ if (!(FLOW_CHECK_MEMCAP(sizeof(Flow)))) {
+ /* declare state of emergency */
+ if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
+ SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
+
+ /* under high load, waking up the flow mgr each time leads
+ * to high cpu usage. Flows are not timed out much faster if
+ * we check a 1000 times a second. */
+ FlowWakeupFlowManagerThread();
+ }
+
+ f = FlowGetUsedFlow(tv, dtv);
+ if (f == NULL) {
+ /* very rare, but we can fail. Just giving up */
+ return NULL;
+ }
+
+ /* freed a flow, but it's unlocked */
+ } else {
+ /* now see if we can alloc a new flow */
+ f = FlowAlloc();
+ if (f == NULL) {
+ return NULL;
+ }
+
+ /* flow is initialized but *unlocked* */
+ }
+ } else {
+ /* flow has been recycled before it went into the spare queue */
+
+ /* flow is initialized (recylced) but *unlocked* */
+ }
+
+ FLOWLOCK_WRLOCK(f);
+ return f;
+}
+
+Flow *FlowGetFlowFromHashByPacket(const Packet *p)
+{
+ Flow *f = NULL;
+
+ /* get the key to our bucket */
+ uint32_t key = FlowGetKey(p);
+ /* get our hash bucket and lock it */
+ FlowBucket *fb = &flow_hash[key];
+ FBLOCK_LOCK(fb);
+
+ SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+ f = FlowGetNew(NULL, NULL, p);
+ if (f != NULL) {
+ /* flow is locked */
+ if (fb->head == NULL) {
+ fb->head = f;
+ fb->tail = f;
+ } else {
+ f->hprev = fb->tail;
+ fb->tail->hnext = f;
+ fb->tail = f;
+ }
+
+ /* got one, now lock, initialize and return */
+ FlowInit(f, p);
+ f->fb = fb;
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ }
+ FBLOCK_UNLOCK(fb);
+ return f;
+}
+
+/** \brief Lookup flow based on packet
+ *
+ * Find the flow belonging to this packet. If not found, no new flow
+ * is set up.
+ *
+ * \param p packet to lookup the flow for
+ *
+ * \retval f flow or NULL if not found
+ */
+Flow *FlowLookupFlowFromHash(const Packet *p)
+{
+ Flow *f = NULL;
+
+ /* get the key to our bucket */
+ uint32_t key = FlowGetKey(p);
+ /* get our hash bucket and lock it */
+ FlowBucket *fb = &flow_hash[key];
+ FBLOCK_LOCK(fb);
+
+ SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+ /* see if the bucket already has a flow */
+ if (fb->head == NULL) {
+ FBLOCK_UNLOCK(fb);
+ return NULL;
+ }
+
+ /* ok, we have a flow in the bucket. Let's find out if it is our flow */
+ f = fb->head;
+
+ /* see if this is the flow we are looking for */
+ if (FlowCompare(f, p) == 0) {
+ while (f) {
+ FlowHashCountIncr;
+
+ f = f->hnext;
+
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ return NULL;
+ }
+
+ if (FlowCompare(f, p) != 0) {
+ /* we found our flow, lets put it on top of the
+ * hash list -- this rewards active flows */
+ if (f->hnext) {
+ f->hnext->hprev = f->hprev;
+ }
+ if (f->hprev) {
+ f->hprev->hnext = f->hnext;
+ }
+ if (f == fb->tail) {
+ fb->tail = f->hprev;
+ }
+
+ f->hnext = fb->head;
+ f->hprev = NULL;
+ fb->head->hprev = f;
+ fb->head = f;
+
+ /* found our flow, lock & return */
+ FLOWLOCK_WRLOCK(f);
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ return f;
+ }
+ }
+ }
+
+ /* lock & return */
+ FLOWLOCK_WRLOCK(f);
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ return f;
+}
+
+/** \brief Get Flow for packet
+ *
+ * Hash retrieval function for flows. Looks up the hash bucket containing the
+ * flow pointer. Then compares the packet with the found flow to see if it is
+ * the flow we need. If it isn't, walk the list until the right flow is found.
+ *
+ * If the flow is not found or the bucket was emtpy, a new flow is taken from
+ * the queue. FlowDequeue() will alloc new flows as long as we stay within our
+ * memcap limit.
+ *
+ * The p->flow pointer is updated to point to the flow.
+ *
+ * \param tv thread vars
+ * \param dtv decode thread vars (for flow log api thread data)
+ *
+ * \retval f *LOCKED* flow or NULL
+ */
+Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
+{
+ Flow *f = NULL;
+ FlowHashCountInit;
+
+ /* get the key to our bucket */
+ uint32_t key = FlowGetKey(p);
+ /* get our hash bucket and lock it */
+ FlowBucket *fb = &flow_hash[key];
+ FBLOCK_LOCK(fb);
+
+ SCLogDebug("fb %p fb->head %p", fb, fb->head);
+
+ FlowHashCountIncr;
+
+ /* see if the bucket already has a flow */
+ if (fb->head == NULL) {
+ f = FlowGetNew(tv, dtv, p);
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ FlowHashCountUpdate;
+ return NULL;
+ }
+
+ /* flow is locked */
+ fb->head = f;
+ fb->tail = f;
+
+ /* got one, now lock, initialize and return */
+ FlowInit(f, p);
+ f->fb = fb;
+
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ FlowHashCountUpdate;
+ return f;
+ }
+
+ /* ok, we have a flow in the bucket. Let's find out if it is our flow */
+ f = fb->head;
+
+ /* see if this is the flow we are looking for */
+ if (FlowCompare(f, p) == 0) {
+ Flow *pf = NULL; /* previous flow */
+
+ while (f) {
+ FlowHashCountIncr;
+
+ pf = f;
+ f = f->hnext;
+
+ if (f == NULL) {
+ f = pf->hnext = FlowGetNew(tv, dtv, p);
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ FlowHashCountUpdate;
+ return NULL;
+ }
+ fb->tail = f;
+
+ /* flow is locked */
+
+ f->hprev = pf;
+
+ /* initialize and return */
+ FlowInit(f, p);
+ f->fb = fb;
+
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ FlowHashCountUpdate;
+ return f;
+ }
+
+ if (FlowCompare(f, p) != 0) {
+ /* we found our flow, lets put it on top of the
+ * hash list -- this rewards active flows */
+ if (f->hnext) {
+ f->hnext->hprev = f->hprev;
+ }
+ if (f->hprev) {
+ f->hprev->hnext = f->hnext;
+ }
+ if (f == fb->tail) {
+ fb->tail = f->hprev;
+ }
+
+ f->hnext = fb->head;
+ f->hprev = NULL;
+ fb->head->hprev = f;
+ fb->head = f;
+
+ /* found our flow, lock & return */
+ FLOWLOCK_WRLOCK(f);
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ FlowHashCountUpdate;
+ return f;
+ }
+ }
+ }
+
+ /* lock & return */
+ FLOWLOCK_WRLOCK(f);
+ /* update the last seen timestamp of this flow */
+ COPY_TIMESTAMP(&p->ts,&f->lastts);
+
+ FBLOCK_UNLOCK(fb);
+ FlowHashCountUpdate;
+ return f;
+}
+
+/** \internal
+ * \brief Get a flow from the hash directly.
+ *
+ * Called in conditions where the spare queue is empty and memcap is reached.
+ *
+ * Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt
+ * is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the
+ * top each time since that would clear the top of the hash leading to longer
+ * and longer search times under high pressure (observed).
+ *
+ * \param tv thread vars
+ * \param dtv decode thread vars (for flow log api thread data)
+ *
+ * \retval f flow or NULL
+ */
+static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
+{
+ uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size;
+ uint32_t cnt = flow_config.hash_size;
+
+ while (cnt--) {
+ if (++idx >= flow_config.hash_size)
+ idx = 0;
+
+ FlowBucket *fb = &flow_hash[idx];
+
+ if (FBLOCK_TRYLOCK(fb) != 0)
+ continue;
+
+ Flow *f = fb->tail;
+ if (f == NULL) {
+ FBLOCK_UNLOCK(fb);
+ continue;
+ }
+
+ if (FLOWLOCK_TRYWRLOCK(f) != 0) {
+ FBLOCK_UNLOCK(fb);
+ continue;
+ }
+
+ /** never prune a flow that is used by a packet or stream msg
+ * we are currently processing in one of the threads */
+ if (SC_ATOMIC_GET(f->use_cnt) > 0) {
+ FBLOCK_UNLOCK(fb);
+ FLOWLOCK_UNLOCK(f);
+ continue;
+ }
+
+ /* remove from the hash */
+ if (f->hprev != NULL)
+ f->hprev->hnext = f->hnext;
+ if (f->hnext != NULL)
+ f->hnext->hprev = f->hprev;
+ if (fb->head == f)
+ fb->head = f->hnext;
+ if (fb->tail == f)
+ fb->tail = f->hprev;
+
+ f->hnext = NULL;
+ f->hprev = NULL;
+ f->fb = NULL;
+ FBLOCK_UNLOCK(fb);
+
+ int state = SC_ATOMIC_GET(f->flow_state);
+ if (state == FLOW_STATE_NEW)
+ f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW;
+ else if (state == FLOW_STATE_ESTABLISHED)
+ f->flow_end_flags |= FLOW_END_FLAG_STATE_ESTABLISHED;
+ else if (state == FLOW_STATE_CLOSED)
+ f->flow_end_flags |= FLOW_END_FLAG_STATE_CLOSED;
+
+ f->flow_end_flags |= FLOW_END_FLAG_FORCED;
+
+ if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
+ f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;
+
+ /* invoke flow log api */
+ if (dtv && dtv->output_flow_thread_data)
+ (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f);
+
+ FlowClearMemory(f, f->protomap);
+
+ FLOWLOCK_UNLOCK(f);
+
+ (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));
+ return f;
+ }
+
+ return NULL;
+}