diff options
Diffstat (limited to 'framework/src/suricata/src/flow-hash.c')
-rw-r--r-- | framework/src/suricata/src/flow-hash.c | 860 |
1 files changed, 0 insertions, 860 deletions
diff --git a/framework/src/suricata/src/flow-hash.c b/framework/src/suricata/src/flow-hash.c deleted file mode 100644 index 9ddb3713..00000000 --- a/framework/src/suricata/src/flow-hash.c +++ /dev/null @@ -1,860 +0,0 @@ -/* Copyright (C) 2007-2013 Open Information Security Foundation - * - * You can copy, redistribute or modify this Program under the terms of - * the GNU General Public License version 2 as published by the Free - * Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - * 02110-1301, USA. - */ - -/** - * \file - * - * \author Victor Julien <victor@inliniac.net> - * \author Pablo Rincon Crespo <pablo.rincon.crespo@gmail.com> - * - * Flow Hashing functions. - */ - -#include "suricata-common.h" -#include "threads.h" - -#include "decode.h" -#include "detect-engine-state.h" - -#include "flow.h" -#include "flow-hash.h" -#include "flow-util.h" -#include "flow-private.h" -#include "flow-manager.h" -#include "flow-storage.h" -#include "app-layer-parser.h" - -#include "util-time.h" -#include "util-debug.h" - -#include "util-hash-lookup3.h" - -#include "conf.h" -#include "output.h" -#include "output-flow.h" - -#define FLOW_DEFAULT_FLOW_PRUNE 5 - -SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx); -SC_ATOMIC_EXTERN(unsigned int, flow_flags); - -static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv); -static int handle_tcp_reuse = 1; - -#ifdef FLOW_DEBUG_STATS -#define FLOW_DEBUG_STATS_PROTO_ALL 0 -#define FLOW_DEBUG_STATS_PROTO_TCP 1 -#define FLOW_DEBUG_STATS_PROTO_UDP 2 -#define FLOW_DEBUG_STATS_PROTO_ICMP 3 -#define FLOW_DEBUG_STATS_PROTO_OTHER 4 - -static uint64_t flow_hash_count[5] = { 0, 0, 0, 0, 0 }; /* how often are we looking for a hash */ -static uint64_t flow_hash_loop_count[5] = { 0, 0, 0, 0, 0 }; /* how often do we loop through a hash bucket */ -static FILE *flow_hash_count_fp = NULL; -static SCSpinlock flow_hash_count_lock; - -#define FlowHashCountUpdate do { \ - SCSpinLock(&flow_hash_count_lock); \ - flow_hash_count[FLOW_DEBUG_STATS_PROTO_ALL]++; \ - flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL] += _flow_hash_counter; \ - if (f != NULL) { \ - if (p->proto == IPPROTO_TCP) { \ - flow_hash_count[FLOW_DEBUG_STATS_PROTO_TCP]++; \ - flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP] += _flow_hash_counter; \ - } else if (p->proto == IPPROTO_UDP) {\ - flow_hash_count[FLOW_DEBUG_STATS_PROTO_UDP]++; \ - flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP] += _flow_hash_counter; \ - } else if (p->proto == IPPROTO_ICMP) {\ - flow_hash_count[FLOW_DEBUG_STATS_PROTO_ICMP]++; \ - flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP] += _flow_hash_counter; \ - } else {\ - flow_hash_count[FLOW_DEBUG_STATS_PROTO_OTHER]++; \ - flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER] += _flow_hash_counter; \ - } \ - } \ - SCSpinUnlock(&flow_hash_count_lock); \ -} while(0); - -#define FlowHashCountInit uint64_t _flow_hash_counter = 0 -#define FlowHashCountIncr _flow_hash_counter++; - -void FlowHashDebugInit(void) -{ -#ifdef FLOW_DEBUG_STATS - SCSpinInit(&flow_hash_count_lock, 0); -#endif - flow_hash_count_fp = fopen("flow-debug.log", "w+"); - if (flow_hash_count_fp != NULL) { - fprintf(flow_hash_count_fp, "ts,all,tcp,udp,icmp,other\n"); - } -} - -void FlowHashDebugPrint(uint32_t ts) -{ -#ifdef FLOW_DEBUG_STATS - if (flow_hash_count_fp == NULL) - return; - - float avg_all = 0, avg_tcp = 0, avg_udp = 0, avg_icmp = 0, avg_other = 0; - SCSpinLock(&flow_hash_count_lock); - if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL] != 0) - avg_all = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ALL]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_ALL])); - if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP] != 0) - avg_tcp = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_TCP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_TCP])); - if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP] != 0) - avg_udp = (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_UDP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_UDP])); - if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP] != 0) - avg_icmp= (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_ICMP]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_ICMP])); - if (flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER] != 0) - avg_other= (float)(flow_hash_loop_count[FLOW_DEBUG_STATS_PROTO_OTHER]/(float)(flow_hash_count[FLOW_DEBUG_STATS_PROTO_OTHER])); - fprintf(flow_hash_count_fp, "%"PRIu32",%02.3f,%02.3f,%02.3f,%02.3f,%02.3f\n", ts, avg_all, avg_tcp, avg_udp, avg_icmp, avg_other); - fflush(flow_hash_count_fp); - memset(&flow_hash_count, 0, sizeof(flow_hash_count)); - memset(&flow_hash_loop_count, 0, sizeof(flow_hash_loop_count)); - SCSpinUnlock(&flow_hash_count_lock); -#endif -} - -void FlowHashDebugDeinit(void) -{ -#ifdef FLOW_DEBUG_STATS - struct timeval ts; - memset(&ts, 0, sizeof(ts)); - TimeGet(&ts); - FlowHashDebugPrint((uint32_t)ts.tv_sec); - if (flow_hash_count_fp != NULL) - fclose(flow_hash_count_fp); - SCSpinDestroy(&flow_hash_count_lock); -#endif -} - -#else - -#define FlowHashCountUpdate -#define FlowHashCountInit -#define FlowHashCountIncr - -#endif /* FLOW_DEBUG_STATS */ - -void FlowDisableTcpReuseHandling(void) -{ - handle_tcp_reuse = 0; -} - -/** \brief compare two raw ipv6 addrs - * - * \note we don't care about the real ipv6 ip's, this is just - * to consistently fill the FlowHashKey6 struct, without all - * the ntohl calls. - * - * \warning do not use elsewhere unless you know what you're doing. - * detect-engine-address-ipv6.c's AddressIPv6GtU32 is likely - * what you are looking for. - */ -static inline int FlowHashRawAddressIPv6GtU32(const uint32_t *a, const uint32_t *b) -{ - int i; - - for (i = 0; i < 4; i++) { - if (a[i] > b[i]) - return 1; - if (a[i] < b[i]) - break; - } - - return 0; -} - -typedef struct FlowHashKey4_ { - union { - struct { - uint32_t src, dst; - uint16_t sp, dp; - uint16_t proto; /**< u16 so proto and recur add up to u32 */ - uint16_t recur; /**< u16 so proto and recur add up to u32 */ - uint16_t vlan_id[2]; - }; - const uint32_t u32[5]; - }; -} FlowHashKey4; - -typedef struct FlowHashKey6_ { - union { - struct { - uint32_t src[4], dst[4]; - uint16_t sp, dp; - uint16_t proto; /**< u16 so proto and recur add up to u32 */ - uint16_t recur; /**< u16 so proto and recur add up to u32 */ - uint16_t vlan_id[2]; - }; - const uint32_t u32[11]; - }; -} FlowHashKey6; - -/* calculate the hash key for this packet - * - * we're using: - * hash_rand -- set at init time - * source port - * destination port - * source address - * destination address - * recursion level -- for tunnels, make sure different tunnel layers can - * never get mixed up. - * - * For ICMP we only consider UNREACHABLE errors atm. - */ -static inline uint32_t FlowGetKey(const Packet *p) -{ - uint32_t key; - - if (p->ip4h != NULL) { - if (p->tcph != NULL || p->udph != NULL) { - FlowHashKey4 fhk; - if (p->src.addr_data32[0] > p->dst.addr_data32[0]) { - fhk.src = p->src.addr_data32[0]; - fhk.dst = p->dst.addr_data32[0]; - } else { - fhk.src = p->dst.addr_data32[0]; - fhk.dst = p->src.addr_data32[0]; - } - if (p->sp > p->dp) { - fhk.sp = p->sp; - fhk.dp = p->dp; - } else { - fhk.sp = p->dp; - fhk.dp = p->sp; - } - fhk.proto = (uint16_t)p->proto; - fhk.recur = (uint16_t)p->recursion_level; - fhk.vlan_id[0] = p->vlan_id[0]; - fhk.vlan_id[1] = p->vlan_id[1]; - - uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand); - key = hash % flow_config.hash_size; - - } else if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { - uint32_t psrc = IPV4_GET_RAW_IPSRC_U32(ICMPV4_GET_EMB_IPV4(p)); - uint32_t pdst = IPV4_GET_RAW_IPDST_U32(ICMPV4_GET_EMB_IPV4(p)); - FlowHashKey4 fhk; - if (psrc > pdst) { - fhk.src = psrc; - fhk.dst = pdst; - } else { - fhk.src = pdst; - fhk.dst = psrc; - } - if (p->icmpv4vars.emb_sport > p->icmpv4vars.emb_dport) { - fhk.sp = p->icmpv4vars.emb_sport; - fhk.dp = p->icmpv4vars.emb_dport; - } else { - fhk.sp = p->icmpv4vars.emb_dport; - fhk.dp = p->icmpv4vars.emb_sport; - } - fhk.proto = (uint16_t)ICMPV4_GET_EMB_PROTO(p); - fhk.recur = (uint16_t)p->recursion_level; - fhk.vlan_id[0] = p->vlan_id[0]; - fhk.vlan_id[1] = p->vlan_id[1]; - - uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand); - key = hash % flow_config.hash_size; - - } else { - FlowHashKey4 fhk; - if (p->src.addr_data32[0] > p->dst.addr_data32[0]) { - fhk.src = p->src.addr_data32[0]; - fhk.dst = p->dst.addr_data32[0]; - } else { - fhk.src = p->dst.addr_data32[0]; - fhk.dst = p->src.addr_data32[0]; - } - fhk.sp = 0xfeed; - fhk.dp = 0xbeef; - fhk.proto = (uint16_t)p->proto; - fhk.recur = (uint16_t)p->recursion_level; - fhk.vlan_id[0] = p->vlan_id[0]; - fhk.vlan_id[1] = p->vlan_id[1]; - - uint32_t hash = hashword(fhk.u32, 5, flow_config.hash_rand); - key = hash % flow_config.hash_size; - } - } else if (p->ip6h != NULL) { - FlowHashKey6 fhk; - if (FlowHashRawAddressIPv6GtU32(p->src.addr_data32, p->dst.addr_data32)) { - fhk.src[0] = p->src.addr_data32[0]; - fhk.src[1] = p->src.addr_data32[1]; - fhk.src[2] = p->src.addr_data32[2]; - fhk.src[3] = p->src.addr_data32[3]; - fhk.dst[0] = p->dst.addr_data32[0]; - fhk.dst[1] = p->dst.addr_data32[1]; - fhk.dst[2] = p->dst.addr_data32[2]; - fhk.dst[3] = p->dst.addr_data32[3]; - } else { - fhk.src[0] = p->dst.addr_data32[0]; - fhk.src[1] = p->dst.addr_data32[1]; - fhk.src[2] = p->dst.addr_data32[2]; - fhk.src[3] = p->dst.addr_data32[3]; - fhk.dst[0] = p->src.addr_data32[0]; - fhk.dst[1] = p->src.addr_data32[1]; - fhk.dst[2] = p->src.addr_data32[2]; - fhk.dst[3] = p->src.addr_data32[3]; - } - if (p->sp > p->dp) { - fhk.sp = p->sp; - fhk.dp = p->dp; - } else { - fhk.sp = p->dp; - fhk.dp = p->sp; - } - fhk.proto = (uint16_t)p->proto; - fhk.recur = (uint16_t)p->recursion_level; - fhk.vlan_id[0] = p->vlan_id[0]; - fhk.vlan_id[1] = p->vlan_id[1]; - - uint32_t hash = hashword(fhk.u32, 11, flow_config.hash_rand); - key = hash % flow_config.hash_size; - } else - key = 0; - - return key; -} - -/* Since two or more flows can have the same hash key, we need to compare - * the flow with the current flow key. */ -#define CMP_FLOW(f1,f2) \ - (((CMP_ADDR(&(f1)->src, &(f2)->src) && \ - CMP_ADDR(&(f1)->dst, &(f2)->dst) && \ - CMP_PORT((f1)->sp, (f2)->sp) && CMP_PORT((f1)->dp, (f2)->dp)) || \ - (CMP_ADDR(&(f1)->src, &(f2)->dst) && \ - CMP_ADDR(&(f1)->dst, &(f2)->src) && \ - CMP_PORT((f1)->sp, (f2)->dp) && CMP_PORT((f1)->dp, (f2)->sp))) && \ - (f1)->proto == (f2)->proto && \ - (f1)->recursion_level == (f2)->recursion_level && \ - (f1)->vlan_id[0] == (f2)->vlan_id[0] && \ - (f1)->vlan_id[1] == (f2)->vlan_id[1]) - -/** - * \brief See if a ICMP packet belongs to a flow by comparing the embedded - * packet in the ICMP error packet to the flow. - * - * \param f flow - * \param p ICMP packet - * - * \retval 1 match - * \retval 0 no match - */ -static inline int FlowCompareICMPv4(Flow *f, const Packet *p) -{ - if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { - /* first check the direction of the flow, in other words, the client -> - * server direction as it's most likely the ICMP error will be a - * response to the clients traffic */ - if ((f->src.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32( ICMPV4_GET_EMB_IPV4(p) )) && - (f->dst.addr_data32[0] == IPV4_GET_RAW_IPDST_U32( ICMPV4_GET_EMB_IPV4(p) )) && - f->sp == p->icmpv4vars.emb_sport && - f->dp == p->icmpv4vars.emb_dport && - f->proto == ICMPV4_GET_EMB_PROTO(p) && - f->recursion_level == p->recursion_level && - f->vlan_id[0] == p->vlan_id[0] && - f->vlan_id[1] == p->vlan_id[1]) - { - return 1; - - /* check the less likely case where the ICMP error was a response to - * a packet from the server. */ - } else if ((f->dst.addr_data32[0] == IPV4_GET_RAW_IPSRC_U32( ICMPV4_GET_EMB_IPV4(p) )) && - (f->src.addr_data32[0] == IPV4_GET_RAW_IPDST_U32( ICMPV4_GET_EMB_IPV4(p) )) && - f->dp == p->icmpv4vars.emb_sport && - f->sp == p->icmpv4vars.emb_dport && - f->proto == ICMPV4_GET_EMB_PROTO(p) && - f->recursion_level == p->recursion_level && - f->vlan_id[0] == p->vlan_id[0] && - f->vlan_id[1] == p->vlan_id[1]) - { - return 1; - } - - /* no match, fall through */ - } else { - /* just treat ICMP as a normal proto for now */ - return CMP_FLOW(f, p); - } - - return 0; -} - -int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn); - -static inline int FlowCompare(Flow *f, const Packet *p) -{ - if (p->proto == IPPROTO_ICMP) { - return FlowCompareICMPv4(f, p); - } else if (p->proto == IPPROTO_TCP) { - if (CMP_FLOW(f, p) == 0) - return 0; - - /* if this session is 'reused', we don't return it anymore, - * so return false on the compare */ - if (f->flags & FLOW_TCP_REUSED) - return 0; - - if (handle_tcp_reuse == 1) { - /* lets see if we need to consider the existing session reuse */ - if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) { - /* okay, we need to setup a new flow for this packet. - * Flag the flow that it's been replaced by a new one */ - f->flags |= FLOW_TCP_REUSED; - SCLogDebug("flow obsolete: TCP reuse will use a new flow " - "starting with packet %"PRIu64, p->pcap_cnt); - return 0; - } - } - return 1; - } else { - return CMP_FLOW(f, p); - } -} - -/** - * \brief Check if we should create a flow based on a packet - * - * We use this check to filter out flow creation based on: - * - ICMP error messages - * - * \param p packet - * \retval 1 true - * \retval 0 false - */ -static inline int FlowCreateCheck(const Packet *p) -{ - if (PKT_IS_ICMPV4(p)) { - if (ICMPV4_IS_ERROR_MSG(p)) { - return 0; - } - } - - return 1; -} - -/** - * \brief Get a new flow - * - * Get a new flow. We're checking memcap first and will try to make room - * if the memcap is reached. - * - * \param tv thread vars - * \param dtv decode thread vars (for flow log api thread data) - * - * \retval f *LOCKED* flow on succes, NULL on error. - */ -static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) -{ - Flow *f = NULL; - - if (FlowCreateCheck(p) == 0) { - return NULL; - } - - /* get a flow from the spare queue */ - f = FlowDequeue(&flow_spare_q); - if (f == NULL) { - /* If we reached the max memcap, we get a used flow */ - if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) { - /* declare state of emergency */ - if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) { - SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY); - - /* under high load, waking up the flow mgr each time leads - * to high cpu usage. Flows are not timed out much faster if - * we check a 1000 times a second. */ - FlowWakeupFlowManagerThread(); - } - - f = FlowGetUsedFlow(tv, dtv); - if (f == NULL) { - /* max memcap reached, so increments the counter */ - if (tv != NULL && dtv != NULL) { - StatsIncr(tv, dtv->counter_flow_memcap); - } - - /* very rare, but we can fail. Just giving up */ - return NULL; - } - - /* freed a flow, but it's unlocked */ - } else { - /* now see if we can alloc a new flow */ - f = FlowAlloc(); - if (f == NULL) { - if (tv != NULL && dtv != NULL) { - StatsIncr(tv, dtv->counter_flow_memcap); - } - return NULL; - } - - /* flow is initialized but *unlocked* */ - } - } else { - /* flow has been recycled before it went into the spare queue */ - - /* flow is initialized (recylced) but *unlocked* */ - } - - FLOWLOCK_WRLOCK(f); - return f; -} - -Flow *FlowGetFlowFromHashByPacket(const Packet *p) -{ - Flow *f = NULL; - - /* get the key to our bucket */ - uint32_t key = FlowGetKey(p); - /* get our hash bucket and lock it */ - FlowBucket *fb = &flow_hash[key]; - FBLOCK_LOCK(fb); - - SCLogDebug("fb %p fb->head %p", fb, fb->head); - - f = FlowGetNew(NULL, NULL, p); - if (f != NULL) { - /* flow is locked */ - if (fb->head == NULL) { - fb->head = f; - fb->tail = f; - } else { - f->hprev = fb->tail; - fb->tail->hnext = f; - fb->tail = f; - } - - /* got one, now lock, initialize and return */ - FlowInit(f, p); - f->fb = fb; - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - } - FBLOCK_UNLOCK(fb); - return f; -} - -/** \brief Lookup flow based on packet - * - * Find the flow belonging to this packet. If not found, no new flow - * is set up. - * - * \param p packet to lookup the flow for - * - * \retval f flow or NULL if not found - */ -Flow *FlowLookupFlowFromHash(const Packet *p) -{ - Flow *f = NULL; - - /* get the key to our bucket */ - uint32_t key = FlowGetKey(p); - /* get our hash bucket and lock it */ - FlowBucket *fb = &flow_hash[key]; - FBLOCK_LOCK(fb); - - SCLogDebug("fb %p fb->head %p", fb, fb->head); - - /* see if the bucket already has a flow */ - if (fb->head == NULL) { - FBLOCK_UNLOCK(fb); - return NULL; - } - - /* ok, we have a flow in the bucket. Let's find out if it is our flow */ - f = fb->head; - - /* see if this is the flow we are looking for */ - if (FlowCompare(f, p) == 0) { - while (f) { - FlowHashCountIncr; - - f = f->hnext; - - if (f == NULL) { - FBLOCK_UNLOCK(fb); - return NULL; - } - - if (FlowCompare(f, p) != 0) { - /* we found our flow, lets put it on top of the - * hash list -- this rewards active flows */ - if (f->hnext) { - f->hnext->hprev = f->hprev; - } - if (f->hprev) { - f->hprev->hnext = f->hnext; - } - if (f == fb->tail) { - fb->tail = f->hprev; - } - - f->hnext = fb->head; - f->hprev = NULL; - fb->head->hprev = f; - fb->head = f; - - /* found our flow, lock & return */ - FLOWLOCK_WRLOCK(f); - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - FBLOCK_UNLOCK(fb); - return f; - } - } - } - - /* lock & return */ - FLOWLOCK_WRLOCK(f); - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - FBLOCK_UNLOCK(fb); - return f; -} - -/** \brief Get Flow for packet - * - * Hash retrieval function for flows. Looks up the hash bucket containing the - * flow pointer. Then compares the packet with the found flow to see if it is - * the flow we need. If it isn't, walk the list until the right flow is found. - * - * If the flow is not found or the bucket was emtpy, a new flow is taken from - * the queue. FlowDequeue() will alloc new flows as long as we stay within our - * memcap limit. - * - * The p->flow pointer is updated to point to the flow. - * - * \param tv thread vars - * \param dtv decode thread vars (for flow log api thread data) - * - * \retval f *LOCKED* flow or NULL - */ -Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) -{ - Flow *f = NULL; - FlowHashCountInit; - - /* get the key to our bucket */ - uint32_t key = FlowGetKey(p); - /* get our hash bucket and lock it */ - FlowBucket *fb = &flow_hash[key]; - FBLOCK_LOCK(fb); - - SCLogDebug("fb %p fb->head %p", fb, fb->head); - - FlowHashCountIncr; - - /* see if the bucket already has a flow */ - if (fb->head == NULL) { - f = FlowGetNew(tv, dtv, p); - if (f == NULL) { - FBLOCK_UNLOCK(fb); - FlowHashCountUpdate; - return NULL; - } - - /* flow is locked */ - fb->head = f; - fb->tail = f; - - /* got one, now lock, initialize and return */ - FlowInit(f, p); - f->fb = fb; - - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - FBLOCK_UNLOCK(fb); - FlowHashCountUpdate; - return f; - } - - /* ok, we have a flow in the bucket. Let's find out if it is our flow */ - f = fb->head; - - /* see if this is the flow we are looking for */ - if (FlowCompare(f, p) == 0) { - Flow *pf = NULL; /* previous flow */ - - while (f) { - FlowHashCountIncr; - - pf = f; - f = f->hnext; - - if (f == NULL) { - f = pf->hnext = FlowGetNew(tv, dtv, p); - if (f == NULL) { - FBLOCK_UNLOCK(fb); - FlowHashCountUpdate; - return NULL; - } - fb->tail = f; - - /* flow is locked */ - - f->hprev = pf; - - /* initialize and return */ - FlowInit(f, p); - f->fb = fb; - - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - FBLOCK_UNLOCK(fb); - FlowHashCountUpdate; - return f; - } - - if (FlowCompare(f, p) != 0) { - /* we found our flow, lets put it on top of the - * hash list -- this rewards active flows */ - if (f->hnext) { - f->hnext->hprev = f->hprev; - } - if (f->hprev) { - f->hprev->hnext = f->hnext; - } - if (f == fb->tail) { - fb->tail = f->hprev; - } - - f->hnext = fb->head; - f->hprev = NULL; - fb->head->hprev = f; - fb->head = f; - - /* found our flow, lock & return */ - FLOWLOCK_WRLOCK(f); - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - FBLOCK_UNLOCK(fb); - FlowHashCountUpdate; - return f; - } - } - } - - /* lock & return */ - FLOWLOCK_WRLOCK(f); - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - - FBLOCK_UNLOCK(fb); - FlowHashCountUpdate; - return f; -} - -/** \internal - * \brief Get a flow from the hash directly. - * - * Called in conditions where the spare queue is empty and memcap is reached. - * - * Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt - * is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the - * top each time since that would clear the top of the hash leading to longer - * and longer search times under high pressure (observed). - * - * \param tv thread vars - * \param dtv decode thread vars (for flow log api thread data) - * - * \retval f flow or NULL - */ -static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv) -{ - uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size; - uint32_t cnt = flow_config.hash_size; - - while (cnt--) { - if (++idx >= flow_config.hash_size) - idx = 0; - - FlowBucket *fb = &flow_hash[idx]; - - if (FBLOCK_TRYLOCK(fb) != 0) - continue; - - Flow *f = fb->tail; - if (f == NULL) { - FBLOCK_UNLOCK(fb); - continue; - } - - if (FLOWLOCK_TRYWRLOCK(f) != 0) { - FBLOCK_UNLOCK(fb); - continue; - } - - /** never prune a flow that is used by a packet or stream msg - * we are currently processing in one of the threads */ - if (SC_ATOMIC_GET(f->use_cnt) > 0) { - FBLOCK_UNLOCK(fb); - FLOWLOCK_UNLOCK(f); - continue; - } - - /* remove from the hash */ - if (f->hprev != NULL) - f->hprev->hnext = f->hnext; - if (f->hnext != NULL) - f->hnext->hprev = f->hprev; - if (fb->head == f) - fb->head = f->hnext; - if (fb->tail == f) - fb->tail = f->hprev; - - f->hnext = NULL; - f->hprev = NULL; - f->fb = NULL; - FBLOCK_UNLOCK(fb); - - int state = SC_ATOMIC_GET(f->flow_state); - if (state == FLOW_STATE_NEW) - f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW; - else if (state == FLOW_STATE_ESTABLISHED) - f->flow_end_flags |= FLOW_END_FLAG_STATE_ESTABLISHED; - else if (state == FLOW_STATE_CLOSED) - f->flow_end_flags |= FLOW_END_FLAG_STATE_CLOSED; - - f->flow_end_flags |= FLOW_END_FLAG_FORCED; - - if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) - f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY; - - /* invoke flow log api */ - if (dtv && dtv->output_flow_thread_data) - (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f); - - FlowClearMemory(f, f->protomap); - - FLOWLOCK_UNLOCK(f); - - (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt)); - return f; - } - - return NULL; -} |