diff options
Diffstat (limited to 'framework/src/suricata/src/flow-manager.c')
-rw-r--r-- | framework/src/suricata/src/flow-manager.c | 1285 |
1 files changed, 1285 insertions, 0 deletions
diff --git a/framework/src/suricata/src/flow-manager.c b/framework/src/suricata/src/flow-manager.c new file mode 100644 index 00000000..15ad6162 --- /dev/null +++ b/framework/src/suricata/src/flow-manager.c @@ -0,0 +1,1285 @@ +/* Copyright (C) 2007-2013 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Anoop Saldanha <anoopsaldanha@gmail.com> + * \author Victor Julien <victor@inliniac.net> + */ + +#include "suricata-common.h" +#include "suricata.h" +#include "decode.h" +#include "conf.h" +#include "threadvars.h" +#include "tm-threads.h" +#include "runmodes.h" + +#include "util-random.h" +#include "util-time.h" + +#include "flow.h" +#include "flow-queue.h" +#include "flow-hash.h" +#include "flow-util.h" +#include "flow-var.h" +#include "flow-private.h" +#include "flow-timeout.h" +#include "flow-manager.h" + +#include "stream-tcp-private.h" +#include "stream-tcp-reassemble.h" +#include "stream-tcp.h" + +#include "util-unittest.h" +#include "util-unittest-helper.h" +#include "util-byte.h" + +#include "util-debug.h" +#include "util-privs.h" +#include "util-signal.h" + +#include "threads.h" +#include "detect.h" +#include "detect-engine-state.h" +#include "stream.h" + +#include "app-layer-parser.h" + +#include "host-timeout.h" +#include "defrag-timeout.h" +#include "ippair-timeout.h" + +#include "output-flow.h" + +/* Run mode selected at suricata.c */ +extern int run_mode; + +/* multi flow mananger support */ +static uint32_t flowmgr_number = 1; +/* atomic counter for flow managers, to assign instance id */ +SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt); + +/* multi flow recycler support */ +static uint32_t flowrec_number = 1; +/* atomic counter for flow recyclers, to assign instance id */ +SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt); + +SC_ATOMIC_EXTERN(unsigned int, flow_flags); + +/* 1 seconds */ +#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1 +#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0 +/* 0.1 seconds */ +#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0 +#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000 +#define NEW_FLOW_COUNT_COND 10 + +typedef struct FlowTimeoutCounters_ { + uint32_t new; + uint32_t est; + uint32_t clo; + uint32_t tcp_reuse; +} FlowTimeoutCounters; + +/** + * \brief Used to disable flow manager thread(s). + * + * \todo Kinda hackish since it uses the tv name to identify flow manager + * thread. We need an all weather identification scheme. + */ +void FlowDisableFlowManagerThread(void) +{ + ThreadVars *tv = NULL; + int cnt = 0; + + /* wake up threads */ + uint32_t u; + for (u = 0; u < flowmgr_number; u++) + SCCtrlCondSignal(&flow_manager_ctrl_cond); + + SCMutexLock(&tv_root_lock); + + /* flow manager thread(s) is/are a part of mgmt threads */ + tv = tv_root[TVT_MGMT]; + + while (tv != NULL) { + if (strcasecmp(tv->name, "FlowManagerThread") == 0) { + TmThreadsSetFlag(tv, THV_KILL); + cnt++; + + /* value in seconds */ +#define THREAD_KILL_MAX_WAIT_TIME 60 + /* value in microseconds */ +#define WAIT_TIME 100 + + double total_wait_time = 0; + while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + usleep(WAIT_TIME); + total_wait_time += WAIT_TIME / 1000000.0; + if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { + SCLogError(SC_ERR_FATAL, "Engine unable to " + "disable detect thread - \"%s\". " + "Killing engine", tv->name); + exit(EXIT_FAILURE); + } + } + } + tv = tv->next; + } + SCMutexUnlock(&tv_root_lock); + + /* wake up threads, another try */ + for (u = 0; u < flowmgr_number; u++) + SCCtrlCondSignal(&flow_manager_ctrl_cond); + + /* reset count, so we can kill and respawn (unix socket) */ + SC_ATOMIC_SET(flowmgr_cnt, 0); + return; +} + +/** \internal + * \brief get timeout for flow + * + * \param f flow + * \param state flow state + * \param emergency bool indicating emergency mode 1 yes, 0 no + * + * \retval timeout timeout in seconds + */ +static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state, int emergency) +{ + uint32_t timeout; + + if (emergency) { + switch(state) { + default: + case FLOW_STATE_NEW: + timeout = flow_proto[f->protomap].emerg_new_timeout; + break; + case FLOW_STATE_ESTABLISHED: + timeout = flow_proto[f->protomap].emerg_est_timeout; + break; + case FLOW_STATE_CLOSED: + timeout = flow_proto[f->protomap].emerg_closed_timeout; + break; + } + } else { /* implies no emergency */ + switch(state) { + default: + case FLOW_STATE_NEW: + timeout = flow_proto[f->protomap].new_timeout; + break; + case FLOW_STATE_ESTABLISHED: + timeout = flow_proto[f->protomap].est_timeout; + break; + case FLOW_STATE_CLOSED: + timeout = flow_proto[f->protomap].closed_timeout; + break; + } + } + + return timeout; +} + +/** \internal + * \brief check if a flow is timed out + * + * \param f flow + * \param ts timestamp + * \param emergency bool indicating emergency mode + * + * \retval 0 not timed out + * \retval 1 timed out + */ +static int FlowManagerFlowTimeout(const Flow *f, int state, struct timeval *ts, int emergency) +{ + /* set the timeout value according to the flow operating mode, + * flow's state and protocol.*/ + uint32_t timeout = FlowGetFlowTimeout(f, state, emergency); + + /* do the timeout check */ + if ((int32_t)(f->lastts.tv_sec + timeout) >= ts->tv_sec) { + return 0; + } + + return 1; +} + +/** \internal + * \brief See if we can really discard this flow. Check use_cnt reference + * counter and force reassembly if necessary. + * + * \param f flow + * \param ts timestamp + * \param emergency bool indicating emergency mode + * + * \retval 0 not timed out just yet + * \retval 1 fully timed out, lets kill it + */ +static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts) +{ + /** never prune a flow that is used by a packet or stream msg + * we are currently processing in one of the threads */ + if (SC_ATOMIC_GET(f->use_cnt) > 0) { + return 0; + } + + int server = 0, client = 0; + if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) && + FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) { + FlowForceReassemblyForFlow(f, server, client); + return 0; + } +#ifdef DEBUG + /* this should not be possible */ + BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0); +#endif + + return 1; +} + +/** + * \internal + * + * \brief check all flows in a hash row for timing out + * + * \param f last flow in the hash row + * \param ts timestamp + * \param emergency bool indicating emergency mode + * \param counters ptr to FlowTimeoutCounters structure + * + * \retval cnt timed out flows + */ +static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, + int emergency, FlowTimeoutCounters *counters) +{ + uint32_t cnt = 0; + + do { + /* check flow timeout based on lastts and state. Both can be + * accessed w/o Flow lock as we do have the hash row lock (so flow + * can't disappear) and flow_state is atomic. lastts can only + * be modified when we have both the flow and hash row lock */ + + int state = SC_ATOMIC_GET(f->flow_state); + + /* timeout logic goes here */ + if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) { + f = f->hprev; + continue; + } + + /* before grabbing the flow lock, make sure we have at least + * 3 packets in the pool */ + PacketPoolWaitForN(3); + + FLOWLOCK_WRLOCK(f); + + Flow *next_flow = f->hprev; + + /* check if the flow is fully timed out and + * ready to be discarded. */ + if (FlowManagerFlowTimedOut(f, ts) == 1) { + /* remove from the hash */ + if (f->hprev != NULL) + f->hprev->hnext = f->hnext; + if (f->hnext != NULL) + f->hnext->hprev = f->hprev; + if (f->fb->head == f) + f->fb->head = f->hnext; + if (f->fb->tail == f) + f->fb->tail = f->hprev; + + f->hnext = NULL; + f->hprev = NULL; + + if (f->flags & FLOW_TCP_REUSED) + counters->tcp_reuse++; + + if (state == FLOW_STATE_NEW) + f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW; + else if (state == FLOW_STATE_ESTABLISHED) + f->flow_end_flags |= FLOW_END_FLAG_STATE_ESTABLISHED; + else if (state == FLOW_STATE_CLOSED) + f->flow_end_flags |= FLOW_END_FLAG_STATE_CLOSED; + + if (emergency) + f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY; + f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; + +// FlowClearMemory (f, f->protomap); + + /* no one is referring to this flow, use_cnt 0, removed from hash + * so we can unlock it and move it back to the spare queue. */ + FLOWLOCK_UNLOCK(f); + FlowEnqueue(&flow_recycle_q, f); + /* move to spare list */ +// FlowMoveToSpare(f); + + cnt++; + + switch (state) { + case FLOW_STATE_NEW: + default: + counters->new++; + break; + case FLOW_STATE_ESTABLISHED: + counters->est++; + break; + case FLOW_STATE_CLOSED: + counters->clo++; + break; + } + } else { + FLOWLOCK_UNLOCK(f); + } + + f = next_flow; + } while (f != NULL); + + return cnt; +} + +/** + * \brief time out flows from the hash + * + * \param ts timestamp + * \param try_cnt number of flows to time out max (0 is unlimited) + * \param hash_min min hash index to consider + * \param hash_max max hash index to consider + * \param counters ptr to FlowTimeoutCounters structure + * + * \retval cnt number of timed out flow + */ +static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, + uint32_t hash_min, uint32_t hash_max, + FlowTimeoutCounters *counters) +{ + uint32_t idx = 0; + uint32_t cnt = 0; + int emergency = 0; + + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) + emergency = 1; + + for (idx = hash_min; idx < hash_max; idx++) { + FlowBucket *fb = &flow_hash[idx]; + + /* before grabbing the row lock, make sure we have at least + * 9 packets in the pool */ + PacketPoolWaitForN(9); + + if (FBLOCK_TRYLOCK(fb) != 0) + continue; + + /* flow hash bucket is now locked */ + + if (fb->tail == NULL) + goto next; + + /* we have a flow, or more than one */ + cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters); + +next: + FBLOCK_UNLOCK(fb); + + if (try_cnt > 0 && cnt >= try_cnt) + break; + } + + return cnt; +} + +/** + * \internal + * + * \brief move all flows out of a hash row + * + * \param f last flow in the hash row + * + * \retval cnt removed out flows + */ +static uint32_t FlowManagerHashRowCleanup(Flow *f) +{ + uint32_t cnt = 0; + + do { + FLOWLOCK_WRLOCK(f); + + Flow *next_flow = f->hprev; + + int state = SC_ATOMIC_GET(f->flow_state); + + /* remove from the hash */ + if (f->hprev != NULL) + f->hprev->hnext = f->hnext; + if (f->hnext != NULL) + f->hnext->hprev = f->hprev; + if (f->fb->head == f) + f->fb->head = f->hnext; + if (f->fb->tail == f) + f->fb->tail = f->hprev; + + f->hnext = NULL; + f->hprev = NULL; + + if (state == FLOW_STATE_NEW) + f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW; + else if (state == FLOW_STATE_ESTABLISHED) + f->flow_end_flags |= FLOW_END_FLAG_STATE_ESTABLISHED; + else if (state == FLOW_STATE_CLOSED) + f->flow_end_flags |= FLOW_END_FLAG_STATE_CLOSED; + + f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN; + + /* no one is referring to this flow, use_cnt 0, removed from hash + * so we can unlock it and move it to the recycle queue. */ + FLOWLOCK_UNLOCK(f); + + FlowEnqueue(&flow_recycle_q, f); + + cnt++; + + f = next_flow; + } while (f != NULL); + + return cnt; +} + +/** + * \brief remove all flows from the hash + * + * \retval cnt number of removes out flows + */ +static uint32_t FlowCleanupHash(void){ + uint32_t idx = 0; + uint32_t cnt = 0; + + for (idx = 0; idx < flow_config.hash_size; idx++) { + FlowBucket *fb = &flow_hash[idx]; + + FBLOCK_LOCK(fb); + + if (fb->tail != NULL) { + /* we have a flow, or more than one */ + cnt += FlowManagerHashRowCleanup(fb->tail); + } + + FBLOCK_UNLOCK(fb); + } + + return cnt; +} + +extern int g_detect_disabled; + +typedef struct FlowManagerThreadData_ { + uint32_t instance; + uint32_t min; + uint32_t max; + + uint16_t flow_mgr_cnt_clo; + uint16_t flow_mgr_cnt_new; + uint16_t flow_mgr_cnt_est; + uint16_t flow_mgr_spare; + uint16_t flow_emerg_mode_enter; + uint16_t flow_emerg_mode_over; + uint16_t flow_tcp_reuse; +} FlowManagerThreadData; + +static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data) +{ + FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1); + SCLogDebug("flow manager instance %u", ftd->instance); + + /* set the min and max value used for hash row walking + * each thread has it's own section of the flow hash */ + uint32_t range = flow_config.hash_size / flowmgr_number; + if (ftd->instance == 1) + ftd->max = range; + else if (ftd->instance == flowmgr_number) { + ftd->min = (range * (ftd->instance - 1)); + ftd->max = flow_config.hash_size; + } else { + ftd->min = (range * (ftd->instance - 1)); + ftd->max = (range * ftd->instance); + } + BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size); + + SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max); + + /* pass thread data back to caller */ + *data = ftd; + + ftd->flow_mgr_cnt_clo = StatsRegisterCounter("flow_mgr.closed_pruned", t); + ftd->flow_mgr_cnt_new = StatsRegisterCounter("flow_mgr.new_pruned", t); + ftd->flow_mgr_cnt_est = StatsRegisterCounter("flow_mgr.est_pruned", t); + ftd->flow_mgr_spare = StatsRegisterCounter("flow.spare", t); + ftd->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t); + ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t); + ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t); + + PacketPoolInit(); + return TM_ECODE_OK; +} + +static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data) +{ + PacketPoolDestroy(); + SCFree(data); + return TM_ECODE_OK; +} + + +/** \brief Thread that manages the flow table and times out flows. + * + * \param td ThreadVars casted to void ptr + * + * Keeps an eye on the spare list, alloc flows if needed... + */ +static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) +{ + /* block usr2. usr1 to be handled by the main thread only */ + UtilSignalBlock(SIGUSR2); + + FlowManagerThreadData *ftd = thread_data; + struct timeval ts; + uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0; + int emerg = FALSE; + int prev_emerg = FALSE; + uint32_t last_sec = 0; + struct timespec cond_time; + int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC; + int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC; +/* VJ leaving disabled for now, as hosts are only used by tags and the numbers + * are really low. Might confuse ppl + uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v); + uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v); + uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v); +*/ + memset(&ts, 0, sizeof(ts)); + + FlowHashDebugInit(); + + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + + if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { + emerg = TRUE; + + if (emerg == TRUE && prev_emerg == FALSE) { + prev_emerg = TRUE; + + SCLogDebug("Flow emergency mode entered..."); + + StatsIncr(th_v, ftd->flow_emerg_mode_enter); + } + } + + /* Get the time */ + memset(&ts, 0, sizeof(ts)); + TimeGet(&ts); + SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); + + if (((uint32_t)ts.tv_sec - last_sec) > 600) { + FlowHashDebugPrint((uint32_t)ts.tv_sec); + last_sec = (uint32_t)ts.tv_sec; + } + + /* see if we still have enough spare flows */ + if (ftd->instance == 1) + FlowUpdateSpareFlows(); + + /* try to time out flows */ + FlowTimeoutCounters counters = { 0, 0, 0, 0, }; + FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters); + + + if (ftd->instance == 1) { + DefragTimeoutHash(&ts); + //uint32_t hosts_pruned = + HostTimeoutHash(&ts); + IPPairTimeoutHash(&ts); + } +/* + StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned); + uint32_t hosts_active = HostGetActiveCount(); + StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active); + uint32_t hosts_spare = HostGetSpareCount(); + StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare); +*/ + StatsAddUI64(th_v, ftd->flow_mgr_cnt_clo, (uint64_t)counters.clo); + StatsAddUI64(th_v, ftd->flow_mgr_cnt_new, (uint64_t)counters.new); + StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est); + StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse); + + uint32_t len = 0; + FQLOCK_LOCK(&flow_spare_q); + len = flow_spare_q.len; + FQLOCK_UNLOCK(&flow_spare_q); + StatsSetUI64(th_v, ftd->flow_mgr_spare, (uint64_t)len); + + /* Don't fear, FlowManagerThread is here... + * clear emergency bit if we have at least xx flows pruned. */ + if (emerg == TRUE) { + SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32 + "flow_spare_q status: %"PRIu32"%% flows at the queue", + len, flow_config.prealloc, len * 100 / flow_config.prealloc); + /* only if we have pruned this "emergency_recovery" percentage + * of flows, we will unset the emergency bit */ + if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { + SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); + + emerg = FALSE; + prev_emerg = FALSE; + + flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC; + flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC; + SCLogInfo("Flow emergency mode over, back to normal... unsetting" + " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", " + "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32 + "%% flows at the queue", (uintmax_t)ts.tv_sec, + (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc); + + StatsIncr(th_v, ftd->flow_emerg_mode_over); + } else { + flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC; + flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC; + } + } + + if (TmThreadsCheckFlag(th_v, THV_KILL)) { + StatsSyncCounters(th_v); + break; + } + + cond_time.tv_sec = time(NULL) + flow_update_delay_sec; + cond_time.tv_nsec = flow_update_delay_nsec; + SCCtrlMutexLock(&flow_manager_ctrl_mutex); + SCCtrlCondTimedwait(&flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, + &cond_time); + SCCtrlMutexUnlock(&flow_manager_ctrl_mutex); + + SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":""); + + StatsSyncCountersIfSignalled(th_v); + } + + FlowHashDebugDeinit(); + + SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were " + "timed out, %"PRIu32" flows in closed state", new_cnt, + established_cnt, closing_cnt); + + return TM_ECODE_OK; +} + +static uint64_t FlowGetMemuse(void) +{ + uint64_t flow_memuse = SC_ATOMIC_GET(flow_memuse); + return flow_memuse; +} + +/** \brief spawn the flow manager thread */ +void FlowManagerThreadSpawn() +{ + intmax_t setting = 1; + (void)ConfGetInt("flow.managers", &setting); + + if (setting < 1 || setting > 1024) { + SCLogError(SC_ERR_INVALID_ARGUMENTS, + "invalid flow.managers setting %"PRIdMAX, setting); + exit(EXIT_FAILURE); + } + flowmgr_number = (uint32_t)setting; + + SCLogInfo("using %u flow manager threads", flowmgr_number); + SCCtrlCondInit(&flow_manager_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL); + + StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse); + + uint32_t u; + for (u = 0; u < flowmgr_number; u++) { + ThreadVars *tv_flowmgr = NULL; + + char name[32] = ""; + snprintf(name, sizeof(name), "FlowManagerThread%02u", u+1); + + tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowManagerThread", + "FlowManager", 0); + BUG_ON(tv_flowmgr == NULL); + + if (tv_flowmgr == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + } + return; +} + +typedef struct FlowRecyclerThreadData_ { + void *output_thread_data; +} FlowRecyclerThreadData; + +static TmEcode FlowRecyclerThreadInit(ThreadVars *t, void *initdata, void **data) +{ + FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) { + SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed"); + SCFree(ftd); + return TM_ECODE_FAILED; + } + SCLogDebug("output_thread_data %p", ftd->output_thread_data); + + *data = ftd; + return TM_ECODE_OK; +} + +static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) +{ + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data; + if (ftd->output_thread_data != NULL) + OutputFlowLogThreadDeinit(t, ftd->output_thread_data); + + SCFree(data); + return TM_ECODE_OK; +} + +/** \brief Thread that manages timed out flows. + * + * \param td ThreadVars casted to void ptr + */ +static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) +{ + /* block usr2. usr2 to be handled by the main thread only */ + UtilSignalBlock(SIGUSR2); + + struct timeval ts; + struct timespec cond_time; + int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC; + int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC; + uint64_t recycled_cnt = 0; + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; + BUG_ON(ftd == NULL); + + memset(&ts, 0, sizeof(ts)); + + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + + /* Get the time */ + memset(&ts, 0, sizeof(ts)); + TimeGet(&ts); + SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); + + uint32_t len = 0; + FQLOCK_LOCK(&flow_recycle_q); + len = flow_recycle_q.len; + FQLOCK_UNLOCK(&flow_recycle_q); + + /* Loop through the queue and clean up all flows in it */ + if (len) { + Flow *f; + + while ((f = FlowDequeue(&flow_recycle_q)) != NULL) { + FLOWLOCK_WRLOCK(f); + + (void)OutputFlowLog(th_v, ftd->output_thread_data, f); + + FlowClearMemory (f, f->protomap); + FLOWLOCK_UNLOCK(f); + FlowMoveToSpare(f); + recycled_cnt++; + } + } + + SCLogDebug("%u flows to recycle", len); + + if (TmThreadsCheckFlag(th_v, THV_KILL)) { + StatsSyncCounters(th_v); + break; + } + + cond_time.tv_sec = time(NULL) + flow_update_delay_sec; + cond_time.tv_nsec = flow_update_delay_nsec; + SCCtrlMutexLock(&flow_recycler_ctrl_mutex); + SCCtrlCondTimedwait(&flow_recycler_ctrl_cond, + &flow_recycler_ctrl_mutex, &cond_time); + SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex); + + SCLogDebug("woke up..."); + + StatsSyncCountersIfSignalled(th_v); + } + + SCLogInfo("%"PRIu64" flows processed", recycled_cnt); + + return TM_ECODE_OK; +} + +int FlowRecyclerReadyToShutdown(void) +{ + uint32_t len = 0; + FQLOCK_LOCK(&flow_recycle_q); + len = flow_recycle_q.len; + FQLOCK_UNLOCK(&flow_recycle_q); + + return ((len == 0)); +} + +/** \brief spawn the flow recycler thread */ +void FlowRecyclerThreadSpawn() +{ + intmax_t setting = 1; + (void)ConfGetInt("flow.recyclers", &setting); + + if (setting < 1 || setting > 1024) { + SCLogError(SC_ERR_INVALID_ARGUMENTS, + "invalid flow.recyclers setting %"PRIdMAX, setting); + exit(EXIT_FAILURE); + } + flowrec_number = (uint32_t)setting; + + SCLogInfo("using %u flow recycler threads", flowrec_number); + + SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); + SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); + + + uint32_t u; + for (u = 0; u < flowrec_number; u++) { + ThreadVars *tv_flowmgr = NULL; + + char name[32] = ""; + snprintf(name, sizeof(name), "FlowRecyclerThread%02u", u+1); + + tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowRecyclerThread", + "FlowRecycler", 0); + BUG_ON(tv_flowmgr == NULL); + + if (tv_flowmgr == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + } + return; +} + +/** + * \brief Used to disable flow recycler thread(s). + * + * \note this should only be called when the flow manager is already gone + * + * \todo Kinda hackish since it uses the tv name to identify flow recycler + * thread. We need an all weather identification scheme. + */ +void FlowDisableFlowRecyclerThread(void) +{ + ThreadVars *tv = NULL; + int cnt = 0; + + /* move all flows still in the hash to the recycler queue */ + FlowCleanupHash(); + + /* make sure all flows are processed */ + do { + SCCtrlCondSignal(&flow_recycler_ctrl_cond); + usleep(10); + } while (FlowRecyclerReadyToShutdown() == 0); + + /* wake up threads */ + uint32_t u; + for (u = 0; u < flowrec_number; u++) + SCCtrlCondSignal(&flow_recycler_ctrl_cond); + + SCMutexLock(&tv_root_lock); + + /* flow recycler thread(s) is/are a part of mgmt threads */ + tv = tv_root[TVT_MGMT]; + + while (tv != NULL) { + if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) { + TmThreadsSetFlag(tv, THV_KILL); + cnt++; + + /* value in seconds */ +#define THREAD_KILL_MAX_WAIT_TIME 60 + /* value in microseconds */ +#define WAIT_TIME 100 + + double total_wait_time = 0; + while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + usleep(WAIT_TIME); + total_wait_time += WAIT_TIME / 1000000.0; + if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { + SCLogError(SC_ERR_FATAL, "Engine unable to " + "disable detect thread - \"%s\". " + "Killing engine", tv->name); + exit(EXIT_FAILURE); + } + } + } + tv = tv->next; + } + + /* wake up threads, another try */ + for (u = 0; u < flowrec_number; u++) + SCCtrlCondSignal(&flow_recycler_ctrl_cond); + + SCMutexUnlock(&tv_root_lock); + + /* reset count, so we can kill and respawn (unix socket) */ + SC_ATOMIC_SET(flowrec_cnt, 0); + return; +} + +void TmModuleFlowManagerRegister (void) +{ + tmm_modules[TMM_FLOWMANAGER].name = "FlowManager"; + tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit; + tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit; +// tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests; + tmm_modules[TMM_FLOWMANAGER].Management = FlowManager; + tmm_modules[TMM_FLOWMANAGER].cap_flags = 0; + tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name); + + SC_ATOMIC_INIT(flowmgr_cnt); +} + +void TmModuleFlowRecyclerRegister (void) +{ + tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler"; + tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit; + tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit; +// tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests; + tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler; + tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0; + tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name); + + SC_ATOMIC_INIT(flowrec_cnt); +} + +#ifdef UNITTESTS + +/** + * \test Test the timing out of a flow with a fresh TcpSession + * (just initialized, no data segments) in normal mode. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowMgrTest01 (void) +{ + TcpSession ssn; + Flow f; + FlowBucket fb; + struct timeval ts; + + FlowQueueInit(&flow_spare_q); + + memset(&ssn, 0, sizeof(TcpSession)); + memset(&f, 0, sizeof(Flow)); + memset(&ts, 0, sizeof(ts)); + memset(&fb, 0, sizeof(FlowBucket)); + + FBLOCK_INIT(&fb); + + FLOW_INITIALIZE(&f); + f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; + + TimeGet(&ts); + f.lastts.tv_sec = ts.tv_sec - 5000; + f.protoctx = &ssn; + f.fb = &fb; + + f.proto = IPPROTO_TCP; + + int state = SC_ATOMIC_GET(f.flow_state); + if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 0; + } + + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + + FlowQueueDestroy(&flow_spare_q); + return 1; +} + +/** + * \test Test the timing out of a flow with a TcpSession + * (with data segments) in normal mode. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowMgrTest02 (void) +{ + TcpSession ssn; + Flow f; + FlowBucket fb; + struct timeval ts; + TcpSegment seg; + TcpStream client; + uint8_t payload[3] = {0x41, 0x41, 0x41}; + + FlowQueueInit(&flow_spare_q); + + memset(&ssn, 0, sizeof(TcpSession)); + memset(&f, 0, sizeof(Flow)); + memset(&fb, 0, sizeof(FlowBucket)); + memset(&ts, 0, sizeof(ts)); + memset(&seg, 0, sizeof(TcpSegment)); + memset(&client, 0, sizeof(TcpSegment)); + + FBLOCK_INIT(&fb); + FLOW_INITIALIZE(&f); + f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; + + TimeGet(&ts); + seg.payload = payload; + seg.payload_len = 3; + seg.next = NULL; + seg.prev = NULL; + client.seg_list = &seg; + ssn.client = client; + ssn.server = client; + ssn.state = TCP_ESTABLISHED; + f.lastts.tv_sec = ts.tv_sec - 5000; + f.protoctx = &ssn; + f.fb = &fb; + f.proto = IPPROTO_TCP; + + int state = SC_ATOMIC_GET(f.flow_state); + if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 0; + } + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 1; + +} + +/** + * \test Test the timing out of a flow with a fresh TcpSession + * (just initialized, no data segments) in emergency mode. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowMgrTest03 (void) +{ + TcpSession ssn; + Flow f; + FlowBucket fb; + struct timeval ts; + + FlowQueueInit(&flow_spare_q); + + memset(&ssn, 0, sizeof(TcpSession)); + memset(&f, 0, sizeof(Flow)); + memset(&ts, 0, sizeof(ts)); + memset(&fb, 0, sizeof(FlowBucket)); + + FBLOCK_INIT(&fb); + FLOW_INITIALIZE(&f); + f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; + + TimeGet(&ts); + ssn.state = TCP_SYN_SENT; + f.lastts.tv_sec = ts.tv_sec - 300; + f.protoctx = &ssn; + f.fb = &fb; + f.proto = IPPROTO_TCP; + f.flags |= FLOW_EMERGENCY; + + int state = SC_ATOMIC_GET(f.flow_state); + if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 0; + } + + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 1; +} + +/** + * \test Test the timing out of a flow with a TcpSession + * (with data segments) in emergency mode. + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowMgrTest04 (void) +{ + + TcpSession ssn; + Flow f; + FlowBucket fb; + struct timeval ts; + TcpSegment seg; + TcpStream client; + uint8_t payload[3] = {0x41, 0x41, 0x41}; + + FlowQueueInit(&flow_spare_q); + + memset(&ssn, 0, sizeof(TcpSession)); + memset(&f, 0, sizeof(Flow)); + memset(&fb, 0, sizeof(FlowBucket)); + memset(&ts, 0, sizeof(ts)); + memset(&seg, 0, sizeof(TcpSegment)); + memset(&client, 0, sizeof(TcpSegment)); + + FBLOCK_INIT(&fb); + FLOW_INITIALIZE(&f); + f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; + + TimeGet(&ts); + seg.payload = payload; + seg.payload_len = 3; + seg.next = NULL; + seg.prev = NULL; + client.seg_list = &seg; + ssn.client = client; + ssn.server = client; + ssn.state = TCP_ESTABLISHED; + f.lastts.tv_sec = ts.tv_sec - 5000; + f.protoctx = &ssn; + f.fb = &fb; + f.proto = IPPROTO_TCP; + f.flags |= FLOW_EMERGENCY; + + int state = SC_ATOMIC_GET(f.flow_state); + if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 0; + } + + FBLOCK_DESTROY(&fb); + FLOW_DESTROY(&f); + FlowQueueDestroy(&flow_spare_q); + return 1; +} + +/** + * \test Test flow allocations when it reach memcap + * + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowMgrTest05 (void) +{ + int result = 0; + + FlowInitConfig(FLOW_QUIET); + FlowConfig backup; + memcpy(&backup, &flow_config, sizeof(FlowConfig)); + + uint32_t ini = 0; + uint32_t end = flow_spare_q.len; + flow_config.memcap = 10000; + flow_config.prealloc = 100; + + /* Let's get the flow_spare_q empty */ + UTHBuildPacketOfFlows(ini, end, 0); + + /* And now let's try to reach the memcap val */ + while (FLOW_CHECK_MEMCAP(sizeof(Flow))) { + ini = end + 1; + end = end + 2; + UTHBuildPacketOfFlows(ini, end, 0); + } + + /* should time out normal */ + TimeSetIncrementTime(2000); + ini = end + 1; + end = end + 2;; + UTHBuildPacketOfFlows(ini, end, 0); + + struct timeval ts; + TimeGet(&ts); + /* try to time out flows */ + FlowTimeoutCounters counters = { 0, 0, 0, 0, }; + FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters); + + if (flow_recycle_q.len > 0) { + result = 1; + } + + memcpy(&flow_config, &backup, sizeof(FlowConfig)); + FlowShutdown(); + return result; +} +#endif /* UNITTESTS */ + +/** + * \brief Function to register the Flow Unitests. + */ +void FlowMgrRegisterTests (void) +{ +#ifdef UNITTESTS + UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession", FlowMgrTest01, 1); + UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments", FlowMgrTest02, 1); + UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession", FlowMgrTest03, 1); + UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments", FlowMgrTest04, 1); + UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap", FlowMgrTest05, 1); +#endif /* UNITTESTS */ +} |