From 19d701ddf07d855128ded0cf2b573ce468e3bdd6 Mon Sep 17 00:00:00 2001 From: Ashlee Young Date: Wed, 20 Jan 2016 01:10:01 +0000 Subject: Removing Suricata and Audit from source repo, and updated build.sh to avoid building suricata. Will re-address this in C release via tar balls. Change-Id: I3710076f8b7f3313cb3cb5260c4eb0a6834d4f6e Signed-off-by: Ashlee Young --- framework/src/suricata/src/tm-threads.c | 2203 ------------------------------- 1 file changed, 2203 deletions(-) delete mode 100644 framework/src/suricata/src/tm-threads.c (limited to 'framework/src/suricata/src/tm-threads.c') diff --git a/framework/src/suricata/src/tm-threads.c b/framework/src/suricata/src/tm-threads.c deleted file mode 100644 index 2b26404f..00000000 --- a/framework/src/suricata/src/tm-threads.c +++ /dev/null @@ -1,2203 +0,0 @@ -/* Copyright (C) 2007-2013 Open Information Security Foundation - * - * You can copy, redistribute or modify this Program under the terms of - * the GNU General Public License version 2 as published by the Free - * Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - * 02110-1301, USA. - */ - -/** - * \file - * - * \author Victor Julien - * \author Anoop Saldanha - * \author Eric Leblond - * - * Thread management functions. - */ - -#include "suricata-common.h" -#include "suricata.h" -#include "stream.h" -#include "runmodes.h" -#include "threadvars.h" -#include "tm-queues.h" -#include "tm-queuehandlers.h" -#include "tm-threads.h" -#include "tmqh-packetpool.h" -#include "threads.h" -#include "util-debug.h" -#include "util-privs.h" -#include "util-cpu.h" -#include "util-optimize.h" -#include "util-profiling.h" -#include "util-signal.h" -#include "queue.h" - -#ifdef PROFILE_LOCKING -__thread uint64_t mutex_lock_contention; -__thread uint64_t mutex_lock_wait_ticks; -__thread uint64_t mutex_lock_cnt; - -__thread uint64_t spin_lock_contention; -__thread uint64_t spin_lock_wait_ticks; -__thread uint64_t spin_lock_cnt; - -__thread uint64_t rww_lock_contention; -__thread uint64_t rww_lock_wait_ticks; -__thread uint64_t rww_lock_cnt; - -__thread uint64_t rwr_lock_contention; -__thread uint64_t rwr_lock_wait_ticks; -__thread uint64_t rwr_lock_cnt; -#endif - -#ifdef OS_FREEBSD -#include -#include -#include -#include -#include -#define cpu_set_t cpuset_t -#endif /* OS_FREEBSD */ - -/* prototypes */ -static int SetCPUAffinity(uint16_t cpu); - -/* root of the threadvars list */ -ThreadVars *tv_root[TVT_MAX] = { NULL }; - -/* lock to protect tv_root */ -SCMutex tv_root_lock = SCMUTEX_INITIALIZER; - -/* Action On Failure(AOF). Determines how the engine should behave when a - * thread encounters a failure. Defaults to restart the failed thread */ -uint8_t tv_aof = THV_RESTART_THREAD; - -/** - * \brief Check if a thread flag is set. - * - * \retval 1 flag is set. - * \retval 0 flag is not set. - */ -int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag) -{ - return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0; -} - -/** - * \brief Set a thread flag. - */ -void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag) -{ - SC_ATOMIC_OR(tv->flags, flag); -} - -/** - * \brief Unset a thread flag. - */ -void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag) -{ - SC_ATOMIC_AND(tv->flags, ~flag); -} - -/** - * \brief Separate run function so we can call it recursively. - * - * \todo Deal with post_pq for slots beyond the first. - */ -TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, - TmSlot *slot) -{ - TmEcode r; - TmSlot *s; - Packet *extra_p; - - for (s = slot; s != NULL; s = s->slot_next) { - TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); - PACKET_PROFILING_TMM_START(p, s->tm_id); - - if (unlikely(s->id == 0)) { - r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); - } else { - r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL); - } - - PACKET_PROFILING_TMM_END(p, s->tm_id); - - /* handle error */ - if (unlikely(r == TM_ECODE_FAILED)) { - /* Encountered error. Return packets to packetpool and return */ - TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); - - SCMutexLock(&s->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); - - TmThreadsSetFlag(tv, THV_FAILED); - return TM_ECODE_FAILED; - } - - /* handle new packets */ - while (s->slot_pre_pq.top != NULL) { - extra_p = PacketDequeue(&s->slot_pre_pq); - if (unlikely(extra_p == NULL)) - continue; - - /* see if we need to process the packet */ - if (s->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); - if (unlikely(r == TM_ECODE_FAILED)) { - TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); - - SCMutexLock(&s->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); - - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); - return TM_ECODE_FAILED; - } - } - tv->tmqh_out(tv, extra_p); - } - } - - return TM_ECODE_OK; -} - -/** \internal - * - * \brief Process flow timeout packets - * - * Process flow timeout pseudo packets. During shutdown this loop - * is run until the flow engine kills the thread and the queue is - * empty. - */ -static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) -{ - TmSlot *stream_slot = NULL, *slot = NULL; - int run = 1; - int r = TM_ECODE_OK; - - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->tm_id == TMM_STREAMTCP) { - stream_slot = slot; - break; - } - } - - if (tv->stream_pq == NULL || stream_slot == NULL) - return r; - - SCLogDebug("flow end loop starting"); - while(run) { - Packet *p; - if (tv->stream_pq->len != 0) { - SCMutexLock(&tv->stream_pq->mutex_q); - p = PacketDequeue(tv->stream_pq); - SCMutexUnlock(&tv->stream_pq->mutex_q); - BUG_ON(p == NULL); - - if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) { - if (r == TM_ECODE_FAILED) - run = 0; - } - } else { - usleep(1); - } - - if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) { - run = 0; - } - } - SCLogDebug("flow end loop complete"); - - return r; -} - -/* - - pcap/nfq - - pkt read - callback - process_pkt - - pfring - - pkt read - process_pkt - - slot: - setup - - pkt_ack_loop(tv, slot_data) - - deinit - - process_pkt: - while(s) - run s; - queue; - - */ - -void *TmThreadsSlotPktAcqLoop(void *td) -{ - /* block usr2. usr2 to be handled by the main thread only */ - UtilSignalBlock(SIGUSR2); - - ThreadVars *tv = (ThreadVars *)td; - TmSlot *s = tv->tm_slots; - char run = 1; - TmEcode r = TM_ECODE_OK; - TmSlot *slot = NULL; - - /* Set the thread name */ - if (SCSetThreadName(tv->name) < 0) { - SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name"); - } - - if (tv->thread_setup_flags != 0) - TmThreadSetupOptions(tv); - - /* Drop the capabilities for this thread */ - SCDropCaps(tv); - - PacketPoolInit(); - - /* check if we are setup properly */ - if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { - SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p," - " PktAcqLoop=%p, tmqh_in=%p," - " tmqh_out=%p", - s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); - EngineKill(); - - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit((void *) -1); - return NULL; - } - - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadInit != NULL) { - void *slot_data = NULL; - r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); - if (r != TM_ECODE_OK) { - if (r == TM_ECODE_DONE) { - EngineDone(); - TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE); - goto error; - } else { - EngineKill(); - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - goto error; - } - } - (void)SC_ATOMIC_SET(slot->slot_data, slot_data); - } - memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL); - memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); - - /* get the 'pre qeueue' from module before the stream module */ - if (slot->slot_next != NULL && slot->slot_next->tm_id == TMM_STREAMTCP) { - SCLogDebug("pre-stream packetqueue %p (postq)", &s->slot_post_pq); - tv->stream_pq = &slot->slot_post_pq; - /* if the stream module is the first, get the threads input queue */ - } else if (slot == (TmSlot *)tv->tm_slots && slot->tm_id == TMM_STREAMTCP) { - tv->stream_pq = &trans_q[tv->inq->id]; - SCLogDebug("pre-stream packetqueue %p (inq)", &slot->slot_pre_pq); - } - } - - StatsSetupPrivate(tv); - - TmThreadsSetFlag(tv, THV_INIT_DONE); - - while(run) { - if (TmThreadsCheckFlag(tv, THV_PAUSE)) { - TmThreadsSetFlag(tv, THV_PAUSED); - TmThreadTestThreadUnPaused(tv); - TmThreadsUnsetFlag(tv, THV_PAUSED); - } - - r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); - - if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) - || suricata_ctl_flags) { - run = 0; - } - if (r == TM_ECODE_DONE) { - run = 0; - } - } - StatsSyncCounters(tv); - - TmThreadsSetFlag(tv, THV_FLOW_LOOP); - - /* process all pseudo packets the flow timeout may throw at us */ - TmThreadTimeoutLoop(tv, s); - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - PacketPoolDestroy(); - - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadExitPrintStats != NULL) { - slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); - } - - if (slot->SlotThreadDeinit != NULL) { - r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); - if (r != TM_ECODE_OK) { - TmThreadsSetFlag(tv, THV_CLOSED); - goto error; - } - } - - BUG_ON(slot->slot_pre_pq.len); - BUG_ON(slot->slot_post_pq.len); - } - - tv->stream_pq = NULL; - SCLogDebug("%s ending", tv->name); - TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit((void *) 0); - return NULL; - -error: - tv->stream_pq = NULL; - pthread_exit((void *) -1); - return NULL; -} - - -/** - * \todo Only the first "slot" currently makes the "post_pq" available - * to the thread module. - */ -void *TmThreadsSlotVar(void *td) -{ - /* block usr2. usr2 to be handled by the main thread only */ - UtilSignalBlock(SIGUSR2); - - ThreadVars *tv = (ThreadVars *)td; - TmSlot *s = (TmSlot *)tv->tm_slots; - Packet *p = NULL; - char run = 1; - TmEcode r = TM_ECODE_OK; - - PacketPoolInitEmpty(); - - /* Set the thread name */ - if (SCSetThreadName(tv->name) < 0) { - SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name"); - } - - if (tv->thread_setup_flags != 0) - TmThreadSetupOptions(tv); - - /* Drop the capabilities for this thread */ - SCDropCaps(tv); - - /* check if we are setup properly */ - if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { - EngineKill(); - - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit((void *) -1); - return NULL; - } - - for (; s != NULL; s = s->slot_next) { - if (s->SlotThreadInit != NULL) { - void *slot_data = NULL; - r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data); - if (r != TM_ECODE_OK) { - EngineKill(); - - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - goto error; - } - (void)SC_ATOMIC_SET(s->slot_data, slot_data); - } - memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&s->slot_pre_pq.mutex_q, NULL); - memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); - SCMutexInit(&s->slot_post_pq.mutex_q, NULL); - - /* special case: we need to access the stream queue - * from the flow timeout code */ - - /* get the 'pre qeueue' from module before the stream module */ - if (s->slot_next != NULL && s->slot_next->tm_id == TMM_STREAMTCP) { - SCLogDebug("pre-stream packetqueue %p (preq)", &s->slot_pre_pq); - tv->stream_pq = &s->slot_pre_pq; - /* if the stream module is the first, get the threads input queue */ - } else if (s == (TmSlot *)tv->tm_slots && s->tm_id == TMM_STREAMTCP) { - tv->stream_pq = &trans_q[tv->inq->id]; - SCLogDebug("pre-stream packetqueue %p (inq)", &s->slot_pre_pq); - } - } - - StatsSetupPrivate(tv); - - TmThreadsSetFlag(tv, THV_INIT_DONE); - - s = (TmSlot *)tv->tm_slots; - - while (run) { - if (TmThreadsCheckFlag(tv, THV_PAUSE)) { - TmThreadsSetFlag(tv, THV_PAUSED); - TmThreadTestThreadUnPaused(tv); - TmThreadsUnsetFlag(tv, THV_PAUSED); - } - - /* input a packet */ - p = tv->tmqh_in(tv); - - if (p != NULL) { - /* run the thread module(s) */ - r = TmThreadsSlotVarRun(tv, p, s); - if (r == TM_ECODE_FAILED) { - TmqhOutputPacketpool(tv, p); - TmThreadsSetFlag(tv, THV_FAILED); - break; - } - - /* output the packet */ - tv->tmqh_out(tv, p); - - } /* if (p != NULL) */ - - /* now handle the post_pq packets */ - TmSlot *slot; - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->slot_post_pq.top != NULL) { - while (1) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - if (extra_p == NULL) - break; - - if (slot->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); - break; - } - } - /* output the packet */ - tv->tmqh_out(tv, extra_p); - } /* while */ - } /* if */ - } /* for */ - - if (TmThreadsCheckFlag(tv, THV_KILL)) { - run = 0; - } - } /* while (run) */ - StatsSyncCounters(tv); - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - PacketPoolDestroy(); - - s = (TmSlot *)tv->tm_slots; - - for ( ; s != NULL; s = s->slot_next) { - if (s->SlotThreadExitPrintStats != NULL) { - s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data)); - } - - if (s->SlotThreadDeinit != NULL) { - r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data)); - if (r != TM_ECODE_OK) { - TmThreadsSetFlag(tv, THV_CLOSED); - goto error; - } - } - BUG_ON(s->slot_pre_pq.len); - BUG_ON(s->slot_post_pq.len); - } - - SCLogDebug("%s ending", tv->name); - tv->stream_pq = NULL; - TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit((void *) 0); - return NULL; - -error: - tv->stream_pq = NULL; - pthread_exit((void *) -1); - return NULL; -} - -static void *TmThreadsManagement(void *td) -{ - /* block usr2. usr2 to be handled by the main thread only */ - UtilSignalBlock(SIGUSR2); - - ThreadVars *tv = (ThreadVars *)td; - TmSlot *s = (TmSlot *)tv->tm_slots; - TmEcode r = TM_ECODE_OK; - - BUG_ON(s == NULL); - - /* Set the thread name */ - if (SCSetThreadName(tv->name) < 0) { - SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name"); - } - - if (tv->thread_setup_flags != 0) - TmThreadSetupOptions(tv); - - /* Drop the capabilities for this thread */ - SCDropCaps(tv); - - SCLogDebug("%s starting", tv->name); - - if (s->SlotThreadInit != NULL) { - void *slot_data = NULL; - r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data); - if (r != TM_ECODE_OK) { - EngineKill(); - - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit((void *) -1); - return NULL; - } - (void)SC_ATOMIC_SET(s->slot_data, slot_data); - } - memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); - memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); - - StatsSetupPrivate(tv); - - TmThreadsSetFlag(tv, THV_INIT_DONE); - - r = s->Management(tv, SC_ATOMIC_GET(s->slot_data)); - /* handle error */ - if (r == TM_ECODE_FAILED) { - TmThreadsSetFlag(tv, THV_FAILED); - } - - if (TmThreadsCheckFlag(tv, THV_KILL)) { - StatsSyncCounters(tv); - } - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - if (s->SlotThreadExitPrintStats != NULL) { - s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data)); - } - - if (s->SlotThreadDeinit != NULL) { - r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data)); - if (r != TM_ECODE_OK) { - TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit((void *) -1); - return NULL; - } - } - - TmThreadsSetFlag(tv, THV_CLOSED); - pthread_exit((void *) 0); - return NULL; -} - -/** - * \brief We set the slot functions. - * - * \param tv Pointer to the TV to set the slot function for. - * \param name Name of the slot variant. - * \param fn_p Pointer to a custom slot function. Used only if slot variant - * "name" is "custom". - * - * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure. - */ -TmEcode TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) -{ - if (name == NULL) { - if (fn_p == NULL) { - printf("Both slot name and function pointer can't be NULL inside " - "TmThreadSetSlots\n"); - goto error; - } else { - name = "custom"; - } - } - - if (strcmp(name, "varslot") == 0) { - tv->tm_func = TmThreadsSlotVar; - } else if (strcmp(name, "pktacqloop") == 0) { - tv->tm_func = TmThreadsSlotPktAcqLoop; - } else if (strcmp(name, "management") == 0) { - tv->tm_func = TmThreadsManagement; - } else if (strcmp(name, "command") == 0) { - tv->tm_func = TmThreadsManagement; - } else if (strcmp(name, "custom") == 0) { - if (fn_p == NULL) - goto error; - tv->tm_func = fn_p; - } else { - printf("Error: Slot \"%s\" not supported\n", name); - goto error; - } - - return TM_ECODE_OK; - -error: - return TM_ECODE_FAILED; -} - -ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot) -{ - ThreadVars *tv; - int i; - - SCMutexLock(&tv_root_lock); - - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - - while (tv) { - TmSlot *slots = tv->tm_slots; - while (slots != NULL) { - if (slots == tm_slot) { - SCMutexUnlock(&tv_root_lock); - return tv; - } - slots = slots->slot_next; - } - tv = tv->next; - } - } - - SCMutexUnlock(&tv_root_lock); - - return NULL; -} - -/** - * \brief Appends a new entry to the slots. - * - * \param tv TV the slot is attached to. - * \param tm TM to append. - * \param data Data to be passed on to the slot init function. - * - * \retval The allocated TmSlot or NULL if there is an error - */ -static inline TmSlot * _TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) -{ - TmSlot *slot = SCMalloc(sizeof(TmSlot)); - if (unlikely(slot == NULL)) - return NULL; - memset(slot, 0, sizeof(TmSlot)); - SC_ATOMIC_INIT(slot->slot_data); - slot->tv = tv; - slot->SlotThreadInit = tm->ThreadInit; - slot->slot_initdata = data; - SC_ATOMIC_INIT(slot->SlotFunc); - (void)SC_ATOMIC_SET(slot->SlotFunc, tm->Func); - slot->PktAcqLoop = tm->PktAcqLoop; - slot->Management = tm->Management; - slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats; - slot->SlotThreadDeinit = tm->ThreadDeinit; - /* we don't have to check for the return value "-1". We wouldn't have - * received a TM as arg, if it didn't exist */ - slot->tm_id = TmModuleGetIDForTM(tm); - - tv->tmm_flags |= tm->flags; - tv->cap_flags |= tm->cap_flags; - - if (tv->tm_slots == NULL) { - tv->tm_slots = slot; - slot->id = 0; - } else { - TmSlot *a = (TmSlot *)tv->tm_slots, *b = NULL; - - /* get the last slot */ - for ( ; a != NULL; a = a->slot_next) { - b = a; - } - /* append the new slot */ - if (b != NULL) { - b->slot_next = slot; - slot->id = b->id + 1; - } - } - - return slot; -} - -void TmSlotFree(TmSlot *tms) -{ - SC_ATOMIC_DESTROY(tms->slot_data); - SCFree(tms); -} - -/** - * \brief Appends a new entry to the slots. - * - * \param tv TV the slot is attached to. - * \param tm TM to append. - * \param data Data to be passed on to the slot init function. - */ -void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) -{ - _TmSlotSetFuncAppend(tv, tm, data); -} - -/** - * \brief Returns the slot holding a TM with the particular tm_id. - * - * \param tm_id TM id of the TM whose slot has to be returned. - * - * \retval slots Pointer to the slot. - */ -TmSlot *TmSlotGetSlotForTM(int tm_id) -{ - ThreadVars *tv = NULL; - TmSlot *slots; - int i; - - SCMutexLock(&tv_root_lock); - - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - while (tv) { - slots = tv->tm_slots; - while (slots != NULL) { - if (slots->tm_id == tm_id) { - SCMutexUnlock(&tv_root_lock); - return slots; - } - slots = slots->slot_next; - } - tv = tv->next; - } - } - - SCMutexUnlock(&tv_root_lock); - - return NULL; -} - -#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ -static int SetCPUAffinitySet(cpu_set_t *cs) -{ -#if defined OS_FREEBSD - int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, - SCGetThreadIdLong(), sizeof(cpu_set_t),cs); -#elif OS_DARWIN - int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, - (void*)cs, THREAD_AFFINITY_POLICY_COUNT); -#else - pid_t tid = syscall(SYS_gettid); - int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs); -#endif /* OS_FREEBSD */ - - if (r != 0) { - printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, - strerror(errno)); - return -1; - } - - return 0; -} -#endif - - -/** - * \brief Set the thread affinity on the calling thread. - * - * \param cpuid Id of the core/cpu to setup the affinity. - * - * \retval 0 If all goes well; -1 if something is wrong. - */ -static int SetCPUAffinity(uint16_t cpuid) -{ -#if defined __OpenBSD__ - return 0; -#else - int cpu = (int)cpuid; - -#if defined OS_WIN32 || defined __CYGWIN__ - DWORD cs = 1 << cpu; - - int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs)); - if (r != 0) { - printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, - strerror(errno)); - return -1; - } - SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32, - SCGetThreadIdLong(), cpu); - - return 0; - -#else - cpu_set_t cs; - - CPU_ZERO(&cs); - CPU_SET(cpu, &cs); - return SetCPUAffinitySet(&cs); -#endif /* windows */ -#endif /* not supported */ -} - - -/** - * \brief Set the thread options (thread priority). - * - * \param tv Pointer to the ThreadVars to setup the thread priority. - * - * \retval TM_ECODE_OK. - */ -TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio) -{ - tv->thread_setup_flags |= THREAD_SET_PRIORITY; - tv->thread_priority = prio; - - return TM_ECODE_OK; -} - -/** - * \brief Adjusting nice value for threads. - */ -void TmThreadSetPrio(ThreadVars *tv) -{ - SCEnter(); -#ifndef __CYGWIN__ -#ifdef OS_WIN32 - if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) { - SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for " - "thread %s: %s", tv->name, strerror(errno)); - } else { - SCLogDebug("Priority set to %"PRId32" for thread %s", - tv->thread_priority, tv->name); - } -#else - int ret = nice(tv->thread_priority); - if (ret == -1) { - SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value " - "for thread %s: %s", tv->name, strerror(errno)); - } else { - SCLogDebug("Nice value set to %"PRId32" for thread %s", - tv->thread_priority, tv->name); - } -#endif /* OS_WIN32 */ -#endif - SCReturn; -} - - -/** - * \brief Set the thread options (cpu affinity). - * - * \param tv pointer to the ThreadVars to setup the affinity. - * \param cpu cpu on which affinity is set. - * - * \retval TM_ECODE_OK - */ -TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu) -{ - tv->thread_setup_flags |= THREAD_SET_AFFINITY; - tv->cpu_affinity = cpu; - - return TM_ECODE_OK; -} - - -TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) -{ - if (!threading_set_cpu_affinity) - return TM_ECODE_OK; - - if (type > MAX_CPU_SET) { - SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family"); - return TM_ECODE_FAILED; - } - - tv->thread_setup_flags |= THREAD_SET_AFFTYPE; - tv->cpu_affinity = type; - - return TM_ECODE_OK; -} - -int TmThreadGetNbThreads(uint8_t type) -{ - if (type >= MAX_CPU_SET) { - SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family"); - return 0; - } - - return thread_affinity[type].nb_threads; -} - -/** - * \brief Set the thread options (cpu affinitythread). - * Priority should be already set by pthread_create. - * - * \param tv pointer to the ThreadVars of the calling thread. - */ -TmEcode TmThreadSetupOptions(ThreadVars *tv) -{ - if (tv->thread_setup_flags & THREAD_SET_AFFINITY) { - SCLogInfo("Setting affinity for \"%s\" Module to cpu/core " - "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity, - SCGetThreadIdLong()); - SetCPUAffinity(tv->cpu_affinity); - } - -#if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ - if (tv->thread_setup_flags & THREAD_SET_PRIORITY) - TmThreadSetPrio(tv); - if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) { - ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity]; - if (taf->mode_flag == EXCLUSIVE_AFFINITY) { - int cpu = AffinityGetNextCPU(taf); - SetCPUAffinity(cpu); - /* If CPU is in a set overwrite the default thread prio */ - if (CPU_ISSET(cpu, &taf->lowprio_cpu)) { - tv->thread_priority = PRIO_LOW; - } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) { - tv->thread_priority = PRIO_MEDIUM; - } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) { - tv->thread_priority = PRIO_HIGH; - } else { - tv->thread_priority = taf->prio; - } - SCLogInfo("Setting prio %d for \"%s\" Module to cpu/core " - "%d, thread id %lu", tv->thread_priority, - tv->name, cpu, SCGetThreadIdLong()); - } else { - SetCPUAffinitySet(&taf->cpu_set); - tv->thread_priority = taf->prio; - SCLogInfo("Setting prio %d for \"%s\" thread " - ", thread id %lu", tv->thread_priority, - tv->name, SCGetThreadIdLong()); - } - TmThreadSetPrio(tv); - } -#endif - - return TM_ECODE_OK; -} - -/** - * \brief Creates and returns the TV instance for a new thread. - * - * \param name Name of this TV instance - * \param inq_name Incoming queue name - * \param inqh_name Incoming queue handler name as set by TmqhSetup() - * \param outq_name Outgoing queue name - * \param outqh_name Outgoing queue handler as set by TmqhSetup() - * \param slots String representation for the slot function to be used - * \param fn_p Pointer to function when \"slots\" is of type \"custom\" - * \param mucond Flag to indicate whether to initialize the condition - * and the mutex variables for this newly created TV. - * - * \retval the newly created TV instance, or NULL on error - */ -ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, - char *outq_name, char *outqh_name, char *slots, - void * (*fn_p)(void *), int mucond) -{ - ThreadVars *tv = NULL; - Tmq *tmq = NULL; - Tmqh *tmqh = NULL; - - SCLogDebug("creating thread \"%s\"...", name); - - /* XXX create separate function for this: allocate a thread container */ - tv = SCMalloc(sizeof(ThreadVars)); - if (unlikely(tv == NULL)) - goto error; - memset(tv, 0, sizeof(ThreadVars)); - - SC_ATOMIC_INIT(tv->flags); - SCMutexInit(&tv->perf_public_ctx.m, NULL); - - tv->name = name; - /* default state for every newly created thread */ - TmThreadsSetFlag(tv, THV_PAUSE); - TmThreadsSetFlag(tv, THV_USE); - /* default aof for every newly created thread */ - tv->aof = THV_RESTART_THREAD; - - /* set the incoming queue */ - if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) { - SCLogDebug("inq_name \"%s\"", inq_name); - - tmq = TmqGetQueueByName(inq_name); - if (tmq == NULL) { - tmq = TmqCreateQueue(inq_name); - if (tmq == NULL) - goto error; - } - SCLogDebug("tmq %p", tmq); - - tv->inq = tmq; - tv->inq->reader_cnt++; - SCLogDebug("tv->inq %p", tv->inq); - } - if (inqh_name != NULL) { - SCLogDebug("inqh_name \"%s\"", inqh_name); - - tmqh = TmqhGetQueueHandlerByName(inqh_name); - if (tmqh == NULL) - goto error; - - tv->tmqh_in = tmqh->InHandler; - tv->InShutdownHandler = tmqh->InShutdownHandler; - SCLogDebug("tv->tmqh_in %p", tv->tmqh_in); - } - - /* set the outgoing queue */ - if (outqh_name != NULL) { - SCLogDebug("outqh_name \"%s\"", outqh_name); - - tmqh = TmqhGetQueueHandlerByName(outqh_name); - if (tmqh == NULL) - goto error; - - tv->tmqh_out = tmqh->OutHandler; - tv->outqh_name = tmqh->name; - - if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) { - SCLogDebug("outq_name \"%s\"", outq_name); - - if (tmqh->OutHandlerCtxSetup != NULL) { - tv->outctx = tmqh->OutHandlerCtxSetup(outq_name); - if (tv->outctx == NULL) - goto error; - tv->outq = NULL; - } else { - tmq = TmqGetQueueByName(outq_name); - if (tmq == NULL) { - tmq = TmqCreateQueue(outq_name); - if (tmq == NULL) - goto error; - } - SCLogDebug("tmq %p", tmq); - - tv->outq = tmq; - tv->outctx = NULL; - tv->outq->writer_cnt++; - } - } - } - - if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) { - goto error; - } - - if (mucond != 0) - TmThreadInitMC(tv); - - return tv; - -error: - SCLogError(SC_ERR_THREAD_CREATE, "failed to setup a thread"); - - if (tv != NULL) - SCFree(tv); - return NULL; -} - -/** - * \brief Creates and returns a TV instance for a Packet Processing Thread. - * This function doesn't support custom slots, and hence shouldn't be - * supplied \"custom\" as its slot type. All PPT threads are created - * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv - * conditional variables are not used to kill the thread. - * - * \param name Name of this TV instance - * \param inq_name Incoming queue name - * \param inqh_name Incoming queue handler name as set by TmqhSetup() - * \param outq_name Outgoing queue name - * \param outqh_name Outgoing queue handler as set by TmqhSetup() - * \param slots String representation for the slot function to be used - * - * \retval the newly created TV instance, or NULL on error - */ -ThreadVars *TmThreadCreatePacketHandler(char *name, char *inq_name, - char *inqh_name, char *outq_name, - char *outqh_name, char *slots) -{ - ThreadVars *tv = NULL; - - tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name, - slots, NULL, 0); - - if (tv != NULL) { - tv->type = TVT_PPT; - tv->id = TmThreadsRegisterThread(tv, tv->type); - } - - - return tv; -} - -/** - * \brief Creates and returns the TV instance for a Management thread(MGMT). - * This function supports only custom slot functions and hence a - * function pointer should be sent as an argument. - * - * \param name Name of this TV instance - * \param fn_p Pointer to function when \"slots\" is of type \"custom\" - * \param mucond Flag to indicate whether to initialize the condition - * and the mutex variables for this newly created TV. - * - * \retval the newly created TV instance, or NULL on error - */ -ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *), - int mucond) -{ - ThreadVars *tv = NULL; - - tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond); - - if (tv != NULL) { - tv->type = TVT_MGMT; - tv->id = TmThreadsRegisterThread(tv, tv->type); - TmThreadSetCPU(tv, MANAGEMENT_CPU_SET); - } - - return tv; -} - -/** - * \brief Creates and returns the TV instance for a Management thread(MGMT). - * This function supports only custom slot functions and hence a - * function pointer should be sent as an argument. - * - * \param name Name of this TV instance - * \param module Name of TmModule with MANAGEMENT flag set. - * \param mucond Flag to indicate whether to initialize the condition - * and the mutex variables for this newly created TV. - * - * \retval the newly created TV instance, or NULL on error - */ -ThreadVars *TmThreadCreateMgmtThreadByName(char *name, char *module, - int mucond) -{ - ThreadVars *tv = NULL; - - tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond); - - if (tv != NULL) { - tv->type = TVT_MGMT; - tv->id = TmThreadsRegisterThread(tv, tv->type); - TmThreadSetCPU(tv, MANAGEMENT_CPU_SET); - - TmModule *m = TmModuleGetByName(module); - if (m) { - TmSlotSetFuncAppend(tv, m, NULL); - } - } - - return tv; -} - -/** - * \brief Creates and returns the TV instance for a Command thread (CMD). - * This function supports only custom slot functions and hence a - * function pointer should be sent as an argument. - * - * \param name Name of this TV instance - * \param module Name of TmModule with COMMAND flag set. - * \param mucond Flag to indicate whether to initialize the condition - * and the mutex variables for this newly created TV. - * - * \retval the newly created TV instance, or NULL on error - */ -ThreadVars *TmThreadCreateCmdThreadByName(char *name, char *module, - int mucond) -{ - ThreadVars *tv = NULL; - - tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "command", NULL, mucond); - - if (tv != NULL) { - tv->type = TVT_CMD; - tv->id = TmThreadsRegisterThread(tv, tv->type); - TmThreadSetCPU(tv, MANAGEMENT_CPU_SET); - - TmModule *m = TmModuleGetByName(module); - if (m) { - TmSlotSetFuncAppend(tv, m, NULL); - } - } - - return tv; -} - -/** - * \brief Appends this TV to tv_root based on its type - * - * \param type holds the type this TV belongs to. - */ -void TmThreadAppend(ThreadVars *tv, int type) -{ - SCMutexLock(&tv_root_lock); - - if (tv_root[type] == NULL) { - tv_root[type] = tv; - tv->next = NULL; - tv->prev = NULL; - - SCMutexUnlock(&tv_root_lock); - - return; - } - - ThreadVars *t = tv_root[type]; - - while (t) { - if (t->next == NULL) { - t->next = tv; - tv->prev = t; - tv->next = NULL; - break; - } - - t = t->next; - } - - SCMutexUnlock(&tv_root_lock); - - return; -} - -/** - * \brief Removes this TV from tv_root based on its type - * - * \param tv The tv instance to remove from the global tv list. - * \param type Holds the type this TV belongs to. - */ -void TmThreadRemove(ThreadVars *tv, int type) -{ - SCMutexLock(&tv_root_lock); - - if (tv_root[type] == NULL) { - SCMutexUnlock(&tv_root_lock); - - return; - } - - ThreadVars *t = tv_root[type]; - while (t != tv) { - t = t->next; - } - - if (t != NULL) { - if (t->prev != NULL) - t->prev->next = t->next; - if (t->next != NULL) - t->next->prev = t->prev; - - if (t == tv_root[type]) - tv_root[type] = t->next;; - } - - SCMutexUnlock(&tv_root_lock); - - return; -} - -/** - * \brief Kill a thread. - * - * \param tv A ThreadVars instance corresponding to the thread that has to be - * killed. - */ -void TmThreadKillThread(ThreadVars *tv) -{ - int i = 0; - - if (tv == NULL) - return; - - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - while (q->len != 0) { - usleep(1000); - } - } - } - - /* set the thread flag informing the thread that it needs to be - * terminated */ - TmThreadsSetFlag(tv, THV_KILL); - TmThreadsSetFlag(tv, THV_DEINIT); - - /* to be sure, signal more */ - int cnt = 0; - while (1) { - if (TmThreadsCheckFlag(tv, THV_CLOSED)) { - SCLogDebug("signalled the thread %" PRId32 " times", cnt); - break; - } - - cnt++; - - if (tv->InShutdownHandler != NULL) { - tv->InShutdownHandler(tv); - } - if (tv->inq != NULL) { - for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { - if (tv->inq->q_type == 0) - SCCondSignal(&trans_q[tv->inq->id].cond_q); - else - SCCondSignal(&data_queues[tv->inq->id].cond_q); - } - SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); - } - - if (tv->ctrl_cond != NULL ) { - pthread_cond_broadcast(tv->ctrl_cond); - } - - usleep(100); - } - - if (tv->outctx != NULL) { - Tmqh *tmqh = TmqhGetQueueHandlerByName(tv->outqh_name); - if (tmqh == NULL) - BUG_ON(1); - - if (tmqh->OutHandlerCtxFree != NULL) { - tmqh->OutHandlerCtxFree(tv->outctx); - } - } - - /* join it */ - pthread_join(tv->t, NULL); - SCLogDebug("thread %s stopped", tv->name); - - return; -} - -/** - * \brief Disable all threads having the specified TMs. - * - * Breaks out of the packet acquisition loop, and bumps - * into the 'flow loop', where it will process packets - * from the flow engine's shutdown handling. - */ -void TmThreadDisableReceiveThreads(void) -{ - /* value in seconds */ -#define THREAD_KILL_MAX_WAIT_TIME 60 - /* value in microseconds */ -#define WAIT_TIME 100 - - double total_wait_time = 0; - - ThreadVars *tv = NULL; - -again: - SCMutexLock(&tv_root_lock); - - /* all receive threads are part of packet processing threads */ - tv = tv_root[TVT_PPT]; - - /* we do have to keep in mind that TVs are arranged in the order - * right from receive to log. The moment we fail to find a - * receive TM amongst the slots in a tv, it indicates we are done - * with all receive threads */ - while (tv) { - int disable = 0; - /* obtain the slots for this TV */ - TmSlot *slots = tv->tm_slots; - while (slots != NULL) { - TmModule *tm = TmModuleGetById(slots->tm_id); - - if (tm->flags & TM_FLAG_RECEIVE_TM) { - disable = 1; - break; - } - - slots = slots->slot_next; - continue; - } - - if (disable) { - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - usleep(1000); - goto again; - } - } - } - - /* we found a receive TV. Send it a KILL_PKTACQ signal. */ - TmThreadsSetFlag(tv, THV_KILL_PKTACQ); - - if (tv->inq != NULL) { - int i; - for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { - if (tv->inq->q_type == 0) - SCCondSignal(&trans_q[tv->inq->id].cond_q); - else - SCCondSignal(&data_queues[tv->inq->id].cond_q); - } - SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); - } - - /* wait for it to enter the 'flow loop' stage */ - while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) { - usleep(WAIT_TIME); - total_wait_time += WAIT_TIME / 1000000.0; - if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { - SCLogError(SC_ERR_FATAL, "Engine unable to " - "disable detect thread - \"%s\". " - "Killing engine", tv->name); - exit(EXIT_FAILURE); - } - } - } - - tv = tv->next; - } - - SCMutexUnlock(&tv_root_lock); - - return; -} - -/** - * \brief Disable all threads having the specified TMs. - */ -void TmThreadDisablePacketThreads(void) -{ - /* value in seconds */ -#define THREAD_KILL_MAX_WAIT_TIME 60 - /* value in microseconds */ -#define WAIT_TIME 100 - - double total_wait_time = 0; - - ThreadVars *tv = NULL; - -again: - SCMutexLock(&tv_root_lock); - - /* all receive threads are part of packet processing threads */ - tv = tv_root[TVT_PPT]; - - /* we do have to keep in mind that TVs are arranged in the order - * right from receive to log. The moment we fail to find a - * receive TM amongst the slots in a tv, it indicates we are done - * with all receive threads */ - while (tv) { - if (tv->inq != NULL) { - /* we wait till we dry out all the inq packets, before we - * kill this thread. Do note that you should have disabled - * packet acquire by now using TmThreadDisableReceiveThreads()*/ - if (!(strlen(tv->inq->name) == strlen("packetpool") && - strcasecmp(tv->inq->name, "packetpool") == 0)) { - PacketQueue *q = &trans_q[tv->inq->id]; - if (q->len != 0) { - SCMutexUnlock(&tv_root_lock); - /* don't sleep while holding a lock */ - usleep(1000); - goto again; - } - } - } - - /* we found our receive TV. Send it a KILL signal. This is all - * we need to do to kill receive threads */ - TmThreadsSetFlag(tv, THV_KILL); - - if (tv->inq != NULL) { - int i; - for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { - if (tv->inq->q_type == 0) - SCCondSignal(&trans_q[tv->inq->id].cond_q); - else - SCCondSignal(&data_queues[tv->inq->id].cond_q); - } - SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); - } - - while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { - usleep(WAIT_TIME); - total_wait_time += WAIT_TIME / 1000000.0; - if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { - SCLogError(SC_ERR_FATAL, "Engine unable to " - "disable detect thread - \"%s\". " - "Killing engine", tv->name); - exit(EXIT_FAILURE); - } - } - - tv = tv->next; - } - - SCMutexUnlock(&tv_root_lock); - - return; -} - -TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name) -{ - ThreadVars *tv = NULL; - TmSlot *slots = NULL; - - SCMutexLock(&tv_root_lock); - - /* all receive threads are part of packet processing threads */ - tv = tv_root[TVT_PPT]; - - while (tv) { - slots = tv->tm_slots; - - while (slots != NULL) { - TmModule *tm = TmModuleGetById(slots->tm_id); - - char *found = strstr(tm->name, tm_name); - if (found != NULL) - goto end; - - slots = slots->slot_next; - } - - tv = tv->next; - } - - end: - SCMutexUnlock(&tv_root_lock); - return slots; -} - -void TmThreadKillThreadsFamily(int family) -{ - ThreadVars *tv = NULL; - - if ((family < 0) || (family >= TVT_MAX)) - return; - - SCMutexLock(&tv_root_lock); - tv = tv_root[family]; - - while (tv) { - TmThreadKillThread(tv); - - tv = tv->next; - } - SCMutexUnlock(&tv_root_lock); -} - -void TmThreadKillThreads(void) -{ - int i = 0; - - for (i = 0; i < TVT_MAX; i++) { - TmThreadKillThreadsFamily(i); - } - - return; -} - -void TmThreadFree(ThreadVars *tv) -{ - TmSlot *s; - TmSlot *ps; - if (tv == NULL) - return; - - SCLogDebug("Freeing thread '%s'.", tv->name); - - StatsThreadCleanup(tv); - - s = (TmSlot *)tv->tm_slots; - while (s) { - ps = s; - s = s->slot_next; - SCFree(ps); - } - - TmThreadsUnregisterThread(tv->id); - SCFree(tv); -} - -void TmThreadClearThreadsFamily(int family) -{ - ThreadVars *tv = NULL; - ThreadVars *ptv = NULL; - - if ((family < 0) || (family >= TVT_MAX)) - return; - - SCMutexLock(&tv_root_lock); - tv = tv_root[family]; - - while (tv) { - ptv = tv; - tv = tv->next; - TmThreadFree(ptv); - } - tv_root[family] = NULL; - SCMutexUnlock(&tv_root_lock); -} - -/** - * \brief Spawns a thread associated with the ThreadVars instance tv - * - * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure - */ -TmEcode TmThreadSpawn(ThreadVars *tv) -{ - pthread_attr_t attr; - if (tv->tm_func == NULL) { - printf("ERROR: no thread function set\n"); - return TM_ECODE_FAILED; - } - - /* Initialize and set thread detached attribute */ - pthread_attr_init(&attr); - - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - - int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); - if (rc) { - printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc); - return TM_ECODE_FAILED; - } - - TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE); - - TmThreadAppend(tv, tv->type); - return TM_ECODE_OK; -} - -/** - * \brief Sets the thread flags for a thread instance(tv) - * - * \param tv Pointer to the thread instance for which the flag has to be set - * \param flags Holds the thread state this thread instance has to be set to - */ -#if 0 -void TmThreadSetFlags(ThreadVars *tv, uint8_t flags) -{ - if (tv != NULL) - tv->flags = flags; - - return; -} -#endif -/** - * \brief Sets the aof(Action on failure) for a thread instance(tv) - * - * \param tv Pointer to the thread instance for which the aof has to be set - * \param aof Holds the aof this thread instance has to be set to - */ -void TmThreadSetAOF(ThreadVars *tv, uint8_t aof) -{ - if (tv != NULL) - tv->aof = aof; - - return; -} - -/** - * \brief Initializes the mutex and condition variables for this TV - * - * It can be used by a thread to control a wait loop that can also be - * influenced by other threads. - * - * \param tv Pointer to a TV instance - */ -void TmThreadInitMC(ThreadVars *tv) -{ - if ( (tv->ctrl_mutex = SCMalloc(sizeof(*tv->ctrl_mutex))) == NULL) { - SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. " - "Exiting..."); - exit(EXIT_FAILURE); - } - - if (SCCtrlMutexInit(tv->ctrl_mutex, NULL) != 0) { - printf("Error initializing the tv->m mutex\n"); - exit(EXIT_FAILURE); - } - - if ( (tv->ctrl_cond = SCMalloc(sizeof(*tv->ctrl_cond))) == NULL) { - SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. " - "Exiting..."); - exit(EXIT_FAILURE); - } - - if (SCCtrlCondInit(tv->ctrl_cond, NULL) != 0) { - SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition " - "variable"); - exit(EXIT_FAILURE); - } - - return; -} - -/** - * \brief Tests if the thread represented in the arg has been unpaused or not. - * - * The function would return if the thread tv has been unpaused or if the - * kill flag for the thread has been set. - * - * \param tv Pointer to the TV instance. - */ -void TmThreadTestThreadUnPaused(ThreadVars *tv) -{ - while (TmThreadsCheckFlag(tv, THV_PAUSE)) { - usleep(100); - - if (TmThreadsCheckFlag(tv, THV_KILL)) - break; - } - - return; -} - -/** - * \brief Waits till the specified flag(s) is(are) set. We don't bother if - * the kill flag has been set or not on the thread. - * - * \param tv Pointer to the TV instance. - */ -void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags) -{ - while (!TmThreadsCheckFlag(tv, flags)) { - usleep(100); - } - - return; -} - -/** - * \brief Unpauses a thread - * - * \param tv Pointer to a TV instance that has to be unpaused - */ -void TmThreadContinue(ThreadVars *tv) -{ - TmThreadsUnsetFlag(tv, THV_PAUSE); - - return; -} - -/** - * \brief Unpauses all threads present in tv_root - */ -void TmThreadContinueThreads() -{ - ThreadVars *tv = NULL; - int i = 0; - - SCMutexLock(&tv_root_lock); - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - while (tv != NULL) { - TmThreadContinue(tv); - tv = tv->next; - } - } - SCMutexUnlock(&tv_root_lock); - - return; -} - -/** - * \brief Pauses a thread - * - * \param tv Pointer to a TV instance that has to be paused - */ -void TmThreadPause(ThreadVars *tv) -{ - TmThreadsSetFlag(tv, THV_PAUSE); - - return; -} - -/** - * \brief Pauses all threads present in tv_root - */ -void TmThreadPauseThreads() -{ - ThreadVars *tv = NULL; - int i = 0; - - TmThreadsListThreads(); - - SCMutexLock(&tv_root_lock); - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - while (tv != NULL) { - TmThreadPause(tv); - tv = tv->next; - } - } - SCMutexUnlock(&tv_root_lock); - - return; -} - -/** - * \brief Restarts the thread sent as the argument - * - * \param tv Pointer to the thread instance(tv) to be restarted - */ -static void TmThreadRestartThread(ThreadVars *tv) -{ - if (tv->restarted >= THV_MAX_RESTARTS) { - SCLogError(SC_ERR_TM_THREADS_ERROR,"thread restarts exceeded " - "threshold limit for thread \"%s\"", tv->name); - exit(EXIT_FAILURE); - } - - TmThreadsUnsetFlag(tv, THV_CLOSED); - TmThreadsUnsetFlag(tv, THV_FAILED); - - if (TmThreadSpawn(tv) != TM_ECODE_OK) { - SCLogError(SC_ERR_THREAD_SPAWN, "thread \"%s\" failed to spawn", tv->name); - exit(EXIT_FAILURE); - } - - tv->restarted++; - SCLogInfo("thread \"%s\" restarted", tv->name); - - return; -} - -/** - * \brief Used to check the thread for certain conditions of failure. If the - * thread has been specified to restart on failure, the thread is - * restarted. If the thread has been specified to gracefully shutdown - * the engine on failure, it does so. The global aof flag, tv_aof - * overrides the thread aof flag, if it holds a THV_ENGINE_EXIT; - */ -void TmThreadCheckThreadState(void) -{ - ThreadVars *tv = NULL; - int i = 0; - - SCMutexLock(&tv_root_lock); - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - - while (tv) { - if (TmThreadsCheckFlag(tv, THV_FAILED)) { - TmThreadsSetFlag(tv, THV_DEINIT); - pthread_join(tv->t, NULL); - if ((tv_aof & THV_ENGINE_EXIT) || (tv->aof & THV_ENGINE_EXIT)) { - EngineKill(); - goto end; - } else { - /* if the engine kill-stop has been received by now, chuck - * restarting and return to kill the engine */ - if ((suricata_ctl_flags & SURICATA_KILL) || - (suricata_ctl_flags & SURICATA_STOP)) { - goto end; - } - TmThreadRestartThread(tv); - } - } - tv = tv->next; - } - } -end: - SCMutexUnlock(&tv_root_lock); - return; -} - -/** - * \brief Used to check if all threads have finished their initialization. On - * finding an un-initialized thread, it waits till that thread completes - * its initialization, before proceeding to the next thread. - * - * \retval TM_ECODE_OK all initialized properly - * \retval TM_ECODE_FAILED failure - */ -TmEcode TmThreadWaitOnThreadInit(void) -{ - ThreadVars *tv = NULL; - int i = 0; - uint16_t mgt_num = 0; - uint16_t ppt_num = 0; - - SCMutexLock(&tv_root_lock); - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - while (tv != NULL) { - char started = FALSE; - while (started == FALSE) { - if (TmThreadsCheckFlag(tv, THV_INIT_DONE)) { - started = TRUE; - } else { - /* sleep a little to give the thread some - * time to finish initialization */ - usleep(100); - } - - if (TmThreadsCheckFlag(tv, THV_FAILED)) { - SCMutexUnlock(&tv_root_lock); - SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to " - "initialize.", tv->name); - return TM_ECODE_FAILED; - } - if (TmThreadsCheckFlag(tv, THV_CLOSED)) { - SCMutexUnlock(&tv_root_lock); - SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on " - "initialization.", tv->name); - return TM_ECODE_FAILED; - } - } - - if (i == TVT_MGMT) mgt_num++; - else if (i == TVT_PPT) ppt_num++; - - tv = tv->next; - } - } - SCMutexUnlock(&tv_root_lock); - - SCLogNotice("all %"PRIu16" packet processing threads, %"PRIu16" management " - "threads initialized, engine started.", ppt_num, mgt_num); - - return TM_ECODE_OK; -} - -/** - * \brief Returns the TV for the calling thread. - * - * \retval tv Pointer to the ThreadVars instance for the calling thread; - * NULL on no match - */ -ThreadVars *TmThreadsGetCallingThread(void) -{ - pthread_t self = pthread_self(); - ThreadVars *tv = NULL; - int i = 0; - - SCMutexLock(&tv_root_lock); - - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - while (tv) { - if (pthread_equal(self, tv->t)) { - SCMutexUnlock(&tv_root_lock); - return tv; - } - tv = tv->next; - } - } - - SCMutexUnlock(&tv_root_lock); - - return NULL; -} - -/** - * \brief returns a count of all the threads that match the flag - */ -uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags) -{ - ThreadVars *tv = NULL; - int i = 0; - uint32_t cnt = 0; - - SCMutexLock(&tv_root_lock); - for (i = 0; i < TVT_MAX; i++) { - tv = tv_root[i]; - while (tv != NULL) { - if ((tv->tmm_flags & flags) == flags) - cnt++; - - tv = tv->next; - } - } - SCMutexUnlock(&tv_root_lock); - return cnt; -} - -typedef struct Thread_ { - ThreadVars *tv; /**< threadvars structure */ - const char *name; - int type; - int in_use; /**< bool to indicate this is in use */ -} Thread; - -typedef struct Threads_ { - Thread *threads; - size_t threads_size; - int threads_cnt; -} Threads; - -static Threads thread_store = { NULL, 0, 0 }; -static SCMutex thread_store_lock = SCMUTEX_INITIALIZER; - -void TmThreadsListThreads(void) -{ - Thread *t; - size_t s; - - SCMutexLock(&thread_store_lock); - - for (s = 0; s < thread_store.threads_size; s++) { - t = &thread_store.threads[s]; - if (t == NULL || t->in_use == 0) - continue; - SCLogInfo("Thread %"PRIuMAX", %s type %d, tv %p", (uintmax_t)s+1, t->name, t->type, t->tv); - } - - SCMutexUnlock(&thread_store_lock); -} - -#define STEP 32 -/** - * \retval id thread id, or 0 if not found - */ -int TmThreadsRegisterThread(ThreadVars *tv, const int type) -{ - SCMutexLock(&thread_store_lock); - if (thread_store.threads == NULL) { - thread_store.threads = SCCalloc(STEP, sizeof(Thread)); - BUG_ON(thread_store.threads == NULL); - thread_store.threads_size = STEP; - } - - size_t s; - for (s = 0; s < thread_store.threads_size; s++) { - if (thread_store.threads[s].in_use == 0) { - Thread *t = &thread_store.threads[s]; - t->name = tv->name; - t->type = type; - t->tv = tv; - t->in_use = 1; - - SCMutexUnlock(&thread_store_lock); - return (int)(s+1); - } - } - - /* if we get here the array is completely filled */ - void *newmem = SCRealloc(thread_store.threads, ((thread_store.threads_size + STEP) * sizeof(Thread))); - BUG_ON(newmem == NULL); - thread_store.threads = newmem; - memset((uint8_t *)thread_store.threads + (thread_store.threads_size * sizeof(Thread)), 0x00, STEP); - - Thread *t = &thread_store.threads[thread_store.threads_size]; - t->name = tv->name; - t->type = type; - t->tv = tv; - t->in_use = 1; - - s = thread_store.threads_size; - thread_store.threads_size += STEP; - - SCMutexUnlock(&thread_store_lock); - return (int)(s+1); -} -#undef STEP - -void TmThreadsUnregisterThread(const int id) -{ - SCMutexLock(&thread_store_lock); - if (id <= 0 || id > (int)thread_store.threads_size) { - SCMutexUnlock(&thread_store_lock); - return; - } - - /* id is one higher than index */ - int idx = id - 1; - - /* reset thread_id, which serves as clearing the record */ - thread_store.threads[idx].in_use = 0; - - /* check if we have at least one registered thread left */ - size_t s; - for (s = 0; s < thread_store.threads_size; s++) { - Thread *t = &thread_store.threads[s]; - if (t->in_use == 1) { - goto end; - } - } - - /* if we get here no threads are registered */ - SCFree(thread_store.threads); - thread_store.threads = NULL; - thread_store.threads_size = 0; - thread_store.threads_cnt = 0; - -end: - SCMutexUnlock(&thread_store_lock); -} - -/** - * \retval r 1 if packet was accepted, 0 otherwise - * \note if packet was not accepted, it's still the responsibility - * of the caller. - */ -int TmThreadsInjectPacketsById(Packet **packets, const int id) -{ - if (id <= 0 || id > (int)thread_store.threads_size) - return 0; - - int idx = id - 1; - - Thread *t = &thread_store.threads[idx]; - ThreadVars *tv = t->tv; - - if (tv == NULL || tv->stream_pq == NULL) - return 0; - - SCMutexLock(&tv->stream_pq->mutex_q); - while (*packets != NULL) { - PacketEnqueue(tv->stream_pq, *packets); - packets++; - } - SCMutexUnlock(&tv->stream_pq->mutex_q); - - /* wake up listening thread(s) if necessary */ - if (tv->inq != NULL) { - SCCondSignal(&trans_q[tv->inq->id].cond_q); - } - return 1; -} -- cgit 1.2.3-korg