diff options
Diffstat (limited to 'framework/src/suricata/src/util-logopenfile.c')
-rw-r--r-- | framework/src/suricata/src/util-logopenfile.c | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/framework/src/suricata/src/util-logopenfile.c b/framework/src/suricata/src/util-logopenfile.c index b25c4a82..65b80fac 100644 --- a/framework/src/suricata/src/util-logopenfile.c +++ b/framework/src/suricata/src/util-logopenfile.c @@ -34,6 +34,9 @@ #include "util-logopenfile.h" #include "util-logopenfile-tile.h" +const char * redis_push_cmd = "LPUSH"; +const char * redis_publish_cmd = "PUBLISH"; + /** \brief connect to the indicated local stream socket, logging any errors * \param path filesystem path to connect to * \param log_err, non-zero if connect failure should be logged. @@ -330,6 +333,138 @@ int SCConfLogReopen(LogFileCtx *log_ctx) return 0; } + +#ifdef HAVE_LIBHIREDIS + +static void SCLogFileCloseRedis(LogFileCtx *log_ctx) +{ + if (log_ctx->redis) { + redisReply *reply; + int i; + for (i = 0; i < log_ctx->redis_setup.batch_count; i++) { + redisGetReply(log_ctx->redis, (void **)&reply); + if (reply) + freeReplyObject(reply); + } + redisFree(log_ctx->redis); + log_ctx->redis = NULL; + } + log_ctx->redis_setup.tried = 0; + log_ctx->redis_setup.batch_count = 0; +} + +int SCConfLogOpenRedis(ConfNode *redis_node, LogFileCtx *log_ctx) +{ + const char *redis_server = NULL; + const char *redis_port = NULL; + const char *redis_mode = NULL; + const char *redis_key = NULL; + + if (redis_node) { + redis_server = ConfNodeLookupChildValue(redis_node, "server"); + redis_port = ConfNodeLookupChildValue(redis_node, "port"); + redis_mode = ConfNodeLookupChildValue(redis_node, "mode"); + redis_key = ConfNodeLookupChildValue(redis_node, "key"); + } + if (!redis_server) { + redis_server = "127.0.0.1"; + SCLogInfo("Using default redis server (127.0.0.1)"); + } + if (!redis_port) + redis_port = "6379"; + if (!redis_mode) + redis_mode = "list"; + if (!redis_key) + redis_key = "suricata"; + log_ctx->redis_setup.key = SCStrdup(redis_key); + + if (!log_ctx->redis_setup.key) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key name"); + exit(EXIT_FAILURE); + } + + log_ctx->redis_setup.batch_size = 0; + + ConfNode *pipelining = ConfNodeLookupChild(redis_node, "pipelining"); + if (pipelining) { + int enabled = 0; + int ret; + intmax_t val; + ret = ConfGetChildValueBool(pipelining, "enabled", &enabled); + if (ret && enabled) { + ret = ConfGetChildValueInt(pipelining, "batch-size", &val); + if (ret) { + log_ctx->redis_setup.batch_size = val; + } else { + log_ctx->redis_setup.batch_size = 10; + } + } + } + + if (!strcmp(redis_mode, "list")) { + log_ctx->redis_setup.command = redis_push_cmd; + if (!log_ctx->redis_setup.command) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command"); + exit(EXIT_FAILURE); + } + } else { + log_ctx->redis_setup.command = redis_publish_cmd; + if (!log_ctx->redis_setup.command) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate redis key command"); + exit(EXIT_FAILURE); + } + } + redisContext *c = redisConnect(redis_server, atoi(redis_port)); + if (c != NULL && c->err) { + SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s", c->errstr); + exit(EXIT_FAILURE); + } + + /* store server params for reconnection */ + log_ctx->redis_setup.server = SCStrdup(redis_server); + if (!log_ctx->redis_setup.server) { + SCLogError(SC_ERR_MEM_ALLOC, "Error allocating redis server string"); + exit(EXIT_FAILURE); + } + log_ctx->redis_setup.port = atoi(redis_port); + log_ctx->redis_setup.tried = 0; + + log_ctx->redis = c; + + log_ctx->Close = SCLogFileCloseRedis; + + return 0; +} + +int SCConfLogReopenRedis(LogFileCtx *log_ctx) +{ + if (log_ctx->redis != NULL) { + redisFree(log_ctx->redis); + log_ctx->redis = NULL; + } + + /* only try to reconnect once per second */ + if (log_ctx->redis_setup.tried >= time(NULL)) { + return -1; + } + + redisContext *c = redisConnect(log_ctx->redis_setup.server, log_ctx->redis_setup.port); + if (c != NULL && c->err) { + if (log_ctx->redis_setup.tried == 0) { + SCLogError(SC_ERR_SOCKET, "Error connecting to redis server: %s\n", c->errstr); + } + redisFree(c); + log_ctx->redis_setup.tried = time(NULL); + return -1; + } + log_ctx->redis = c; + log_ctx->redis_setup.tried = 0; + log_ctx->redis_setup.batch_count = 0; + return 0; +} + +#endif + /** \brief LogFileNewCtx() Get a new LogFileCtx * \retval LogFileCtx * pointer if succesful, NULL if error * */ @@ -348,6 +483,10 @@ LogFileCtx *LogFileNewCtx(void) lf_ctx->Write = SCLogFileWrite; lf_ctx->Close = SCLogFileClose; +#ifdef HAVE_LIBHIREDIS + lf_ctx->redis_setup.batch_count = 0; +#endif + return lf_ctx; } @@ -367,6 +506,17 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCMutexUnlock(&lf_ctx->fp_mutex); } +#ifdef HAVE_LIBHIREDIS + if (lf_ctx->type == LOGFILE_TYPE_REDIS) { + if (lf_ctx->redis) + redisFree(lf_ctx->redis); + if (lf_ctx->redis_setup.server) + SCFree(lf_ctx->redis_setup.server); + if (lf_ctx->redis_setup.key) + SCFree(lf_ctx->redis_setup.key); + } +#endif + SCMutexDestroy(&lf_ctx->fp_mutex); if (lf_ctx->prefix != NULL) @@ -375,9 +525,120 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) if(lf_ctx->filename != NULL) SCFree(lf_ctx->filename); + if (lf_ctx->sensor_name) + SCFree(lf_ctx->sensor_name); + OutputUnregisterFileRotationFlag(&lf_ctx->rotation_flag); SCFree(lf_ctx); SCReturnInt(1); } + +#ifdef HAVE_LIBHIREDIS +static int LogFileWriteRedis(LogFileCtx *file_ctx, char *string, size_t string_len) +{ + if (file_ctx->redis == NULL) { + SCConfLogReopenRedis(file_ctx); + if (file_ctx->redis == NULL) { + return -1; + } else { + SCLogInfo("Reconnected to redis server"); + } + } + /* TODO go async here ? */ + if (file_ctx->redis_setup.batch_size) { + redisAppendCommand(file_ctx->redis, "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + if (file_ctx->redis_setup.batch_count == file_ctx->redis_setup.batch_size) { + redisReply *reply; + int i; + file_ctx->redis_setup.batch_count = 0; + for (i = 0; i <= file_ctx->redis_setup.batch_size; i++) { + if (redisGetReply(file_ctx->redis, (void **)&reply) == REDIS_OK) { + freeReplyObject(reply); + } else { + if (file_ctx->redis->err) { + SCLogInfo("Error when fetching reply: %s (%d)", + file_ctx->redis->errstr, + file_ctx->redis->err); + } + switch (file_ctx->redis->err) { + case REDIS_ERR_EOF: + case REDIS_ERR_IO: + SCLogInfo("Reopening connection to redis server"); + SCConfLogReopenRedis(file_ctx); + if (file_ctx->redis) { + SCLogInfo("Reconnected to redis server"); + return 0; + } else { + SCLogInfo("Unable to reconnect to redis server"); + return 0; + } + break; + default: + SCLogWarning(SC_ERR_INVALID_VALUE, + "Unsupported error code %d", + file_ctx->redis->err); + return 0; + } + } + } + } else { + file_ctx->redis_setup.batch_count++; + } + } else { + redisReply *reply = redisCommand(file_ctx->redis, "%s %s %s", + file_ctx->redis_setup.command, + file_ctx->redis_setup.key, + string); + + switch (reply->type) { + case REDIS_REPLY_ERROR: + SCLogWarning(SC_ERR_SOCKET, "Redis error: %s", reply->str); + SCConfLogReopenRedis(file_ctx); + break; + case REDIS_REPLY_INTEGER: + SCLogDebug("Redis integer %lld", reply->integer); + break; + default: + SCLogError(SC_ERR_INVALID_VALUE, + "Redis default triggered with %d", reply->type); + SCConfLogReopenRedis(file_ctx); + break; + } + freeReplyObject(reply); + } + return 0; +} +#endif + +int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) +{ + if (file_ctx->type == LOGFILE_TYPE_SYSLOG) { + syslog(file_ctx->syslog_setup.alert_syslog_level, "%s", + (const char *)MEMBUFFER_BUFFER(buffer)); + } else if (file_ctx->type == LOGFILE_TYPE_FILE || + file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM || + file_ctx->type == LOGFILE_TYPE_UNIX_STREAM) + { + /* append \n for files only */ + MemBufferWriteString(buffer, "\n"); + SCMutexLock(&file_ctx->fp_mutex); + file_ctx->Write((const char *)MEMBUFFER_BUFFER(buffer), + MEMBUFFER_OFFSET(buffer), file_ctx); + SCMutexUnlock(&file_ctx->fp_mutex); + } +#ifdef HAVE_LIBHIREDIS + else if (file_ctx->type == LOGFILE_TYPE_REDIS) { + SCMutexLock(&file_ctx->fp_mutex); + LogFileWriteRedis(file_ctx, (const char *)MEMBUFFER_BUFFER(buffer), + MEMBUFFER_OFFSET(buffer)); + SCMutexUnlock(&file_ctx->fp_mutex); + } +#endif + + return 0; +} |