summaryrefslogtreecommitdiffstats
path: root/framework/src/suricata/src/util-ringbuffer.c
diff options
context:
space:
mode:
authorAshlee Young <ashlee@onosfw.com>2015-09-09 22:21:41 -0700
committerAshlee Young <ashlee@onosfw.com>2015-09-09 22:21:41 -0700
commit8879b125d26e8db1a5633de5a9c692eb2d1c4f83 (patch)
treec7259d85a991b83dfa85ab2e339360669fc1f58e /framework/src/suricata/src/util-ringbuffer.c
parent13d05bc8458758ee39cb829098241e89616717ee (diff)
suricata checkin based on commit id a4bce14770beee46a537eda3c3f6e8e8565d5d0a
Change-Id: I9a214fa0ee95e58fc640e50bd604dac7f42db48f
Diffstat (limited to 'framework/src/suricata/src/util-ringbuffer.c')
-rw-r--r--framework/src/suricata/src/util-ringbuffer.c1088
1 files changed, 1088 insertions, 0 deletions
diff --git a/framework/src/suricata/src/util-ringbuffer.c b/framework/src/suricata/src/util-ringbuffer.c
new file mode 100644
index 00000000..4566bf9f
--- /dev/null
+++ b/framework/src/suricata/src/util-ringbuffer.c
@@ -0,0 +1,1088 @@
+/* Copyright (C) 2007-2010 Open Information Security Foundation
+ *
+ * You can copy, redistribute or modify this Program under the terms of
+ * the GNU General Public License version 2 as published by the Free
+ * Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
+/**
+ * \file
+ *
+ * \author Victor Julien <victor@inliniac.net>
+ *
+ * Ringbuffer implementation that is lockless for the most part IF atomic
+ * operations are available.
+ *
+ * Two sizes are implemented currently: 256 and 65536. Those sizes are chosen
+ * for simplicity when working with the read and write indexes. Both can just
+ * wrap around.
+ *
+ * Implemented are:
+ * Single reader, single writer (lockless)
+ * Single reader, multi writer (partly locked)
+ * Multi reader, single writer (lockless)
+ * Multi reader, multi writer (partly locked)
+ */
+#include "suricata-common.h"
+#include "suricata.h"
+#include "util-ringbuffer.h"
+#include "util-atomic.h"
+#include "util-unittest.h"
+
+#define USLEEP_TIME 5
+
+/** \brief wait function for condition where ringbuffer is either
+ * full or empty.
+ *
+ * \param rb ringbuffer
+ *
+ * Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin
+ * or use thread condition to wait.
+ */
+static inline void RingBuffer8DoWait(RingBuffer8 *rb)
+{
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCMutexLock(&rb->wait_mutex);
+ SCCondWait(&rb->wait_cond, &rb->wait_mutex);
+ SCMutexUnlock(&rb->wait_mutex);
+#else
+ usleep(USLEEP_TIME);
+#endif
+}
+
+/** \brief wait function for condition where ringbuffer is either
+ * full or empty.
+ *
+ * \param rb ringbuffer
+ *
+ * Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin
+ * or use thread condition to wait.
+ */
+static inline void RingBufferDoWait(RingBuffer16 *rb)
+{
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCMutexLock(&rb->wait_mutex);
+ SCCondWait(&rb->wait_cond, &rb->wait_mutex);
+ SCMutexUnlock(&rb->wait_mutex);
+#else
+ usleep(USLEEP_TIME);
+#endif
+}
+
+/** \brief wait function for condition where ringbuffer is either
+ * full or empty.
+ *
+ * \param rb ringbuffer
+ *
+ * Based on RINGBUFFER_MUTEX_WAIT define, we either sleep and spin
+ * or use thread condition to wait.
+ */
+void RingBufferWait(RingBuffer16 *rb)
+{
+ RingBufferDoWait(rb);
+}
+
+/** \brief tell the ringbuffer to shut down
+ *
+ * \param rb ringbuffer
+ */
+void RingBuffer8Shutdown(RingBuffer8 *rb)
+{
+ rb->shutdown = 1;
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+}
+
+/** \brief check the ringbuffer is empty (no data in it)
+ *
+ * \param rb ringbuffer
+ *
+ * \retval 1 empty
+ * \retval 0 not empty
+ */
+int RingBuffer8IsEmpty(RingBuffer8 *rb)
+{
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/** \brief check the ringbuffer is full (no more data will fit)
+ *
+ * \param rb ringbuffer
+ *
+ * \retval 1 empty
+ * \retval 0 not empty
+ */
+int RingBuffer8IsFull(RingBuffer8 *rb)
+{
+ if ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/** \brief tell the ringbuffer to shut down
+ *
+ * \param rb ringbuffer
+ */
+void RingBufferShutdown(RingBuffer16 *rb)
+{
+ rb->shutdown = 1;
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+}
+
+/** \brief get number of items in the ringbuffer */
+uint16_t RingBufferSize(RingBuffer16 *rb)
+{
+ SCEnter();
+ uint16_t size = (uint16_t)(SC_ATOMIC_GET(rb->write) - SC_ATOMIC_GET(rb->read));
+ SCReturnUInt(size);
+}
+
+/** \brief check the ringbuffer is empty (no data in it)
+ *
+ * \param rb ringbuffer
+ *
+ * \retval 1 empty
+ * \retval 0 not empty
+ */
+int RingBufferIsEmpty(RingBuffer16 *rb)
+{
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/** \brief check the ringbuffer is full (no more data will fit)
+ *
+ * \param rb ringbuffer
+ *
+ * \retval 1 empty
+ * \retval 0 not empty
+ */
+int RingBufferIsFull(RingBuffer16 *rb)
+{
+ if ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/* Single Reader, Single Writer, 8 bits */
+
+void *RingBufferSrSw8Get(RingBuffer8 *rb)
+{
+ void *ptr = NULL;
+
+ /* buffer is empty, wait... */
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ ptr = rb->array[SC_ATOMIC_GET(rb->read)];
+ (void) SC_ATOMIC_ADD(rb->read, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr)
+{
+ /* buffer is full, wait... */
+ while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+/* Single Reader, Multi Writer, 8 bites */
+
+void *RingBufferSrMw8Get(RingBuffer8 *rb)
+{
+ void *ptr = NULL;
+
+ /* buffer is empty, wait... */
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ ptr = rb->array[SC_ATOMIC_GET(rb->read)];
+ (void) SC_ATOMIC_ADD(rb->read, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer.
+ *
+ * As we support multiple writers we need to protect 2 things:
+ * 1. writing the ptr to the array
+ * 2. incrementing the rb->write idx
+ *
+ * We can't do both at the same time in one atomic operation, so
+ * we need to (spin) lock it. We do increment rb->write atomically
+ * after that, so that we don't need to use the lock in our *Get
+ * function.
+ *
+ * \param rb the ringbuffer
+ * \param ptr ptr to store
+ *
+ * \retval 0 ok
+ * \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferSrMw8Put(RingBuffer8 *rb, void *ptr)
+{
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+retry:
+ while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ /* get our lock */
+ SCSpinLock(&rb->spin);
+ /* if while we got our lock the buffer changed, we need to retry */
+ if ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ SCSpinUnlock(&rb->spin);
+ goto retry;
+ }
+
+ SCLogDebug("rb->write %u, ptr %p", SC_ATOMIC_GET(rb->write), ptr);
+
+ /* update the ring buffer */
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+ SCSpinUnlock(&rb->spin);
+ SCLogDebug("ptr %p, done", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+/* Multi Reader, Single Writer, 8 bits */
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrSw8Get(RingBuffer8 *rb)
+{
+ void *ptr;
+ /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned char readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ /* atomically update rb->read */
+ readp = SC_ATOMIC_GET(rb->read) - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer
+ */
+int RingBufferMrSw8Put(RingBuffer8 *rb, void *ptr)
+{
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+ while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+
+/* Multi Reader, Single Writer */
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrSwGet(RingBuffer16 *rb)
+{
+ void *ptr;
+ /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned short readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBufferDoWait(rb);
+ }
+
+ /* atomically update rb->read */
+ readp = SC_ATOMIC_GET(rb->read) - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer
+ */
+int RingBufferMrSwPut(RingBuffer16 *rb, void *ptr)
+{
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+ while ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBufferDoWait(rb);
+ }
+
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+
+/* Single Reader, Single Writer */
+
+void *RingBufferSrSwGet(RingBuffer16 *rb)
+{
+ void *ptr = NULL;
+
+ /* buffer is empty, wait... */
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBufferDoWait(rb);
+ }
+
+ ptr = rb->array[SC_ATOMIC_GET(rb->read)];
+ (void) SC_ATOMIC_ADD(rb->read, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+int RingBufferSrSwPut(RingBuffer16 *rb, void *ptr)
+{
+ /* buffer is full, wait... */
+ while ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBufferDoWait(rb);
+ }
+
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+/* Multi Reader, Multi Writer, 8 bits */
+
+RingBuffer8 *RingBuffer8Init(void)
+{
+ RingBuffer8 *rb = SCMalloc(sizeof(RingBuffer8));
+ if (unlikely(rb == NULL)) {
+ return NULL;
+ }
+
+ memset(rb, 0x00, sizeof(RingBuffer8));
+
+ SC_ATOMIC_INIT(rb->write);
+ SC_ATOMIC_INIT(rb->read);
+
+ SCSpinInit(&rb->spin, 0);
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCMutexInit(&rb->wait_mutex, NULL);
+ SCCondInit(&rb->wait_cond, NULL);
+#endif
+ return rb;
+}
+
+void RingBuffer8Destroy(RingBuffer8 *rb)
+{
+ if (rb != NULL) {
+ SC_ATOMIC_DESTROY(rb->write);
+ SC_ATOMIC_DESTROY(rb->read);
+
+ SCSpinDestroy(&rb->spin);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCMutexDestroy(&rb->wait_mutex);
+ SCCondDestroy(&rb->wait_cond);
+#endif
+ SCFree(rb);
+ }
+}
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrMw8Get(RingBuffer8 *rb)
+{
+ void *ptr;
+ /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned char readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ /* atomically update rb->read */
+ readp = SC_ATOMIC_GET(rb->read) - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer.
+ *
+ * As we support multiple writers we need to protect 2 things:
+ * 1. writing the ptr to the array
+ * 2. incrementing the rb->write idx
+ *
+ * We can't do both at the same time in one atomic operation, so
+ * we need to (spin) lock it. We do increment rb->write atomically
+ * after that, so that we don't need to use the lock in our *Get
+ * function.
+ *
+ * \param rb the ringbuffer
+ * \param ptr ptr to store
+ *
+ * \retval 0 ok
+ * \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferMrMw8Put(RingBuffer8 *rb, void *ptr)
+{
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+retry:
+ while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBuffer8DoWait(rb);
+ }
+
+ /* get our lock */
+ SCSpinLock(&rb->spin);
+ /* if while we got our lock the buffer changed, we need to retry */
+ if ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ SCSpinUnlock(&rb->spin);
+ goto retry;
+ }
+
+ SCLogDebug("rb->write %u, ptr %p", SC_ATOMIC_GET(rb->write), ptr);
+
+ /* update the ring buffer */
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+ SCSpinUnlock(&rb->spin);
+ SCLogDebug("ptr %p, done", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+/* Multi Reader, Multi Writer, 16 bits */
+
+RingBuffer16 *RingBufferInit(void)
+{
+ RingBuffer16 *rb = SCMalloc(sizeof(RingBuffer16));
+ if (unlikely(rb == NULL)) {
+ return NULL;
+ }
+
+ memset(rb, 0x00, sizeof(RingBuffer16));
+
+ SC_ATOMIC_INIT(rb->write);
+ SC_ATOMIC_INIT(rb->read);
+
+ SCSpinInit(&rb->spin, 0);
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCMutexInit(&rb->wait_mutex, NULL);
+ SCCondInit(&rb->wait_cond, NULL);
+#endif
+ return rb;
+}
+
+void RingBufferDestroy(RingBuffer16 *rb)
+{
+ if (rb != NULL) {
+ SC_ATOMIC_DESTROY(rb->write);
+ SC_ATOMIC_DESTROY(rb->read);
+
+ SCSpinDestroy(&rb->spin);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCMutexDestroy(&rb->wait_mutex);
+ SCCondDestroy(&rb->wait_cond);
+#endif
+
+ SCFree(rb);
+ }
+}
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ */
+void *RingBufferMrMwGet(RingBuffer16 *rb)
+{
+ void *ptr;
+ /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned short readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return NULL;
+
+ RingBufferDoWait(rb);
+ }
+
+ /* atomically update rb->read */
+ readp = SC_ATOMIC_GET(rb->read) - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+/**
+ * \brief get the next ptr from the ring buffer
+ *
+ * Because we allow for multiple readers we take great care in making sure
+ * that the threads don't interfere with one another.
+ *
+ * This version does NOT enter a wait if the buffer is empty loop.
+ *
+ * \retval ptr pointer to the data, or NULL if buffer is empty
+ */
+void *RingBufferMrMwGetNoWait(RingBuffer16 *rb)
+{
+ void *ptr;
+ /** local pointer for data races. If SCAtomicCompareAndSwap (CAS)
+ * fails we increase our local array idx to try the next array member
+ * until we succeed. Or when the buffer is empty again we jump back
+ * to the waiting loop. */
+ unsigned short readp;
+
+ /* buffer is empty, wait... */
+retry:
+ while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {
+ /* break if buffer is empty */
+ return NULL;
+ }
+
+ /* atomically update rb->read */
+ readp = SC_ATOMIC_GET(rb->read) - 1;
+ do {
+ /* with multiple readers we can get in the situation that we exitted
+ * from the wait loop but the rb is empty again once we get here. */
+ if (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read))
+ goto retry;
+
+ readp++;
+ ptr = rb->array[readp];
+ } while (!(SC_ATOMIC_CAS(&rb->read, readp, (readp + 1))));
+
+ SCLogDebug("ptr %p", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return ptr;
+}
+
+/**
+ * \brief put a ptr in the RingBuffer.
+ *
+ * As we support multiple writers we need to protect 2 things:
+ * 1. writing the ptr to the array
+ * 2. incrementing the rb->write idx
+ *
+ * We can't do both at the same time in one atomic operation, so
+ * we need to (spin) lock it. We do increment rb->write atomically
+ * after that, so that we don't need to use the lock in our *Get
+ * function.
+ *
+ * \param rb the ringbuffer
+ * \param ptr ptr to store
+ *
+ * \retval 0 ok
+ * \retval -1 wait loop interrupted because of engine flags
+ */
+int RingBufferMrMwPut(RingBuffer16 *rb, void *ptr)
+{
+ SCLogDebug("ptr %p", ptr);
+
+ /* buffer is full, wait... */
+retry:
+ while ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ /* break out if the engine wants to shutdown */
+ if (rb->shutdown != 0)
+ return -1;
+
+ RingBufferDoWait(rb);
+ }
+
+ /* get our lock */
+ SCSpinLock(&rb->spin);
+ /* if while we got our lock the buffer changed, we need to retry */
+ if ((unsigned short)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {
+ SCSpinUnlock(&rb->spin);
+ goto retry;
+ }
+
+ SCLogDebug("rb->write %u, ptr %p", SC_ATOMIC_GET(rb->write), ptr);
+
+ /* update the ring buffer */
+ rb->array[SC_ATOMIC_GET(rb->write)] = ptr;
+ (void) SC_ATOMIC_ADD(rb->write, 1);
+ SCSpinUnlock(&rb->spin);
+ SCLogDebug("ptr %p, done", ptr);
+
+#ifdef RINGBUFFER_MUTEX_WAIT
+ SCCondSignal(&rb->wait_cond);
+#endif
+ return 0;
+}
+
+#ifdef UNITTESTS
+static int RingBuffer8SrSwInit01 (void)
+{
+ int result = 0;
+
+ RingBuffer8 *rb = NULL;
+
+ rb = RingBuffer8Init();
+ if (rb == NULL) {
+ printf("rb == NULL: ");
+ goto end;
+ }
+
+ int r = SCSpinLock(&rb->spin);
+ if (r != 0) {
+ printf("r = %d, expected %d: ", r, 0);
+ goto end;
+ }
+ SCSpinUnlock(&rb->spin);
+
+ if (SC_ATOMIC_GET(rb->read) != 0) {
+ printf("read %u, expected 0: ", SC_ATOMIC_GET(rb->read));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != 0) {
+ printf("write %u, expected 0: ", SC_ATOMIC_GET(rb->write));
+ goto end;
+ }
+
+ result = 1;
+end:
+ if (rb != NULL) {
+ RingBuffer8Destroy(rb);
+ }
+ return result;
+}
+
+static int RingBuffer8SrSwPut01 (void)
+{
+ int result = 0;
+
+ RingBuffer8 *rb = NULL;
+
+ rb = RingBuffer8Init();
+ if (rb == NULL) {
+ printf("rb == NULL: ");
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->read) != 0) {
+ printf("read %u, expected 0: ", SC_ATOMIC_GET(rb->read));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != 0) {
+ printf("write %u, expected 0: ", SC_ATOMIC_GET(rb->write));
+ goto end;
+ }
+
+ void *ptr = &result;
+
+ RingBufferSrSw8Put(rb, ptr);
+
+ if (SC_ATOMIC_GET(rb->read) != 0) {
+ printf("read %u, expected 0: ", SC_ATOMIC_GET(rb->read));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != 1) {
+ printf("write %u, expected 1: ", SC_ATOMIC_GET(rb->write));
+ goto end;
+ }
+
+ if (rb->array[0] != ptr) {
+ printf("ptr is %p, expected %p: ", rb->array[0], ptr);
+ goto end;
+ }
+
+ result = 1;
+end:
+ if (rb != NULL) {
+ RingBuffer8Destroy(rb);
+ }
+ return result;
+}
+
+static int RingBuffer8SrSwPut02 (void)
+{
+ int result = 0;
+ RingBuffer8 *rb = NULL;
+
+ int array[255];
+ int cnt = 0;
+ for (cnt = 0; cnt < 255; cnt++) {
+ array[cnt] = cnt;
+ }
+
+ rb = RingBuffer8Init();
+ if (rb == NULL) {
+ printf("rb == NULL: ");
+ goto end;
+ }
+
+ for (cnt = 0; cnt < 255; cnt++) {
+ RingBufferSrSw8Put(rb, (void *)&array[cnt]);
+
+ if (SC_ATOMIC_GET(rb->read) != 0) {
+ printf("read %u, expected 0: ", SC_ATOMIC_GET(rb->read));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != (unsigned char)(cnt+1)) {
+ printf("write %u, expected %u: ", SC_ATOMIC_GET(rb->write), (unsigned char)(cnt+1));
+ goto end;
+ }
+
+ if (rb->array[cnt] != (void *)&array[cnt]) {
+ printf("ptr is %p, expected %p: ", rb->array[cnt], (void *)&array[cnt]);
+ goto end;
+ }
+ }
+
+ if (!(RingBuffer8IsFull(rb))) {
+ printf("ringbuffer should be full, isn't: ");
+ goto end;
+ }
+
+ result = 1;
+end:
+ if (rb != NULL) {
+ RingBuffer8Destroy(rb);
+ }
+ return result;
+}
+
+static int RingBuffer8SrSwGet01 (void)
+{
+ int result = 0;
+
+ RingBuffer8 *rb = NULL;
+
+ rb = RingBuffer8Init();
+ if (rb == NULL) {
+ printf("rb == NULL: ");
+ goto end;
+ }
+
+ void *ptr = &result;
+
+ RingBufferSrSw8Put(rb, ptr);
+ void *ptr2 = RingBufferSrSw8Get(rb);
+
+ if (ptr != ptr2) {
+ printf("ptr %p != ptr2 %p: ", ptr, ptr2);
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->read) != 1) {
+ printf("read %u, expected 1: ", SC_ATOMIC_GET(rb->read));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != 1) {
+ printf("write %u, expected 1: ", SC_ATOMIC_GET(rb->write));
+ goto end;
+ }
+
+ result = 1;
+end:
+ if (rb != NULL) {
+ RingBuffer8Destroy(rb);
+ }
+ return result;
+}
+
+static int RingBuffer8SrSwGet02 (void)
+{
+ int result = 0;
+ RingBuffer8 *rb = NULL;
+
+ int array[255];
+ int cnt = 0;
+ for (cnt = 0; cnt < 255; cnt++) {
+ array[cnt] = cnt;
+ }
+
+ rb = RingBuffer8Init();
+ if (rb == NULL) {
+ printf("rb == NULL: ");
+ goto end;
+ }
+
+ for (cnt = 0; cnt < 255; cnt++) {
+ RingBufferSrSw8Put(rb, (void *)&array[cnt]);
+
+ if (SC_ATOMIC_GET(rb->read) != 0) {
+ printf("read %u, expected 0: ", SC_ATOMIC_GET(rb->read));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != (unsigned char)(cnt+1)) {
+ printf("write %u, expected %u: ", SC_ATOMIC_GET(rb->write), (unsigned char)(cnt+1));
+ goto end;
+ }
+
+ if (rb->array[cnt] != (void *)&array[cnt]) {
+ printf("ptr is %p, expected %p: ", rb->array[cnt], (void *)&array[cnt]);
+ goto end;
+ }
+ }
+
+ if (!(RingBuffer8IsFull(rb))) {
+ printf("ringbuffer should be full, isn't: ");
+ goto end;
+ }
+
+ for (cnt = 0; cnt < 255; cnt++) {
+ void *ptr = RingBufferSrSw8Get(rb);
+
+ if (SC_ATOMIC_GET(rb->read) != (unsigned char)(cnt+1)) {
+ printf("read %u, expected %u: ", SC_ATOMIC_GET(rb->read), (unsigned char)(cnt+1));
+ goto end;
+ }
+
+ if (SC_ATOMIC_GET(rb->write) != 255) {
+ printf("write %u, expected %u: ", SC_ATOMIC_GET(rb->write), 255);
+ goto end;
+ }
+
+ if (ptr != (void *)&array[cnt]) {
+ printf("ptr is %p, expected %p: ", ptr, (void *)&array[cnt]);
+ goto end;
+ }
+ }
+
+ if (!(RingBuffer8IsEmpty(rb))) {
+ printf("ringbuffer should be empty, isn't: ");
+ goto end;
+ }
+
+ result = 1;
+end:
+ if (rb != NULL) {
+ RingBuffer8Destroy(rb);
+ }
+ return result;
+}
+
+#endif /* UNITTESTS */
+
+void DetectRingBufferRegisterTests(void)
+{
+#ifdef UNITTESTS /* UNITTESTS */
+ UtRegisterTest("RingBuffer8SrSwInit01", RingBuffer8SrSwInit01, 1);
+ UtRegisterTest("RingBuffer8SrSwPut01", RingBuffer8SrSwPut01, 1);
+ UtRegisterTest("RingBuffer8SrSwPut02", RingBuffer8SrSwPut02, 1);
+ UtRegisterTest("RingBuffer8SrSwGet01", RingBuffer8SrSwGet01, 1);
+ UtRegisterTest("RingBuffer8SrSwGet02", RingBuffer8SrSwGet02, 1);
+#endif /* UNITTESTS */
+}
+