diff options
Diffstat (limited to 'framework/src/suricata/src/source-af-packet.c')
-rw-r--r-- | framework/src/suricata/src/source-af-packet.c | 1919 |
1 files changed, 1919 insertions, 0 deletions
diff --git a/framework/src/suricata/src/source-af-packet.c b/framework/src/suricata/src/source-af-packet.c new file mode 100644 index 00000000..3f1f44e1 --- /dev/null +++ b/framework/src/suricata/src/source-af-packet.c @@ -0,0 +1,1919 @@ +/* Copyright (C) 2011-2014 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \defgroup afppacket AF_PACKET running mode + * + * @{ + */ + +/** + * \file + * + * \author Eric Leblond <eric@regit.org> + * + * AF_PACKET socket acquisition support + * + * \todo watch other interface event to detect suppression of the monitored + * interface + */ + +#include "suricata-common.h" +#include "config.h" +#include "suricata.h" +#include "decode.h" +#include "packet-queue.h" +#include "threads.h" +#include "threadvars.h" +#include "tm-queuehandlers.h" +#include "tm-modules.h" +#include "tm-threads.h" +#include "tm-threads-common.h" +#include "conf.h" +#include "util-debug.h" +#include "util-device.h" +#include "util-error.h" +#include "util-privs.h" +#include "util-optimize.h" +#include "util-checksum.h" +#include "util-ioctl.h" +#include "util-host-info.h" +#include "tmqh-packetpool.h" +#include "source-af-packet.h" +#include "runmodes.h" + +#ifdef __SC_CUDA_SUPPORT__ + +#include "util-cuda.h" +#include "util-cuda-buffer.h" +#include "util-mpm-ac.h" +#include "util-cuda-handlers.h" +#include "detect-engine.h" +#include "detect-engine-mpm.h" +#include "util-cuda-vars.h" + +#endif /* __SC_CUDA_SUPPORT__ */ + +#ifdef HAVE_AF_PACKET + +#if HAVE_SYS_IOCTL_H +#include <sys/ioctl.h> +#endif + +#if HAVE_LINUX_IF_ETHER_H +#include <linux/if_ether.h> +#endif + +#if HAVE_LINUX_IF_PACKET_H +#include <linux/if_packet.h> +#endif + +#if HAVE_LINUX_IF_ARP_H +#include <linux/if_arp.h> +#endif + +#if HAVE_LINUX_FILTER_H +#include <linux/filter.h> +#endif + +#if HAVE_SYS_MMAN_H +#include <sys/mman.h> +#endif + +#endif /* HAVE_AF_PACKET */ + +extern int max_pending_packets; + +#ifndef HAVE_AF_PACKET + +TmEcode NoAFPSupportExit(ThreadVars *, void *, void **); + +void TmModuleReceiveAFPRegister (void) +{ + tmm_modules[TMM_RECEIVEAFP].name = "ReceiveAFP"; + tmm_modules[TMM_RECEIVEAFP].ThreadInit = NoAFPSupportExit; + tmm_modules[TMM_RECEIVEAFP].Func = NULL; + tmm_modules[TMM_RECEIVEAFP].ThreadExitPrintStats = NULL; + tmm_modules[TMM_RECEIVEAFP].ThreadDeinit = NULL; + tmm_modules[TMM_RECEIVEAFP].RegisterTests = NULL; + tmm_modules[TMM_RECEIVEAFP].cap_flags = 0; + tmm_modules[TMM_RECEIVEAFP].flags = TM_FLAG_RECEIVE_TM; +} + +/** + * \brief Registration Function for DecodeAFP. + * \todo Unit tests are needed for this module. + */ +void TmModuleDecodeAFPRegister (void) +{ + tmm_modules[TMM_DECODEAFP].name = "DecodeAFP"; + tmm_modules[TMM_DECODEAFP].ThreadInit = NoAFPSupportExit; + tmm_modules[TMM_DECODEAFP].Func = NULL; + tmm_modules[TMM_DECODEAFP].ThreadExitPrintStats = NULL; + tmm_modules[TMM_DECODEAFP].ThreadDeinit = NULL; + tmm_modules[TMM_DECODEAFP].RegisterTests = NULL; + tmm_modules[TMM_DECODEAFP].cap_flags = 0; + tmm_modules[TMM_DECODEAFP].flags = TM_FLAG_DECODE_TM; +} + +/** + * \brief this function prints an error message and exits. + */ +TmEcode NoAFPSupportExit(ThreadVars *tv, void *initdata, void **data) +{ + SCLogError(SC_ERR_NO_AF_PACKET,"Error creating thread %s: you do not have " + "support for AF_PACKET enabled, on Linux host please recompile " + "with --enable-af-packet", tv->name); + exit(EXIT_FAILURE); +} + +#else /* We have AF_PACKET support */ + +#define AFP_IFACE_NAME_LENGTH 48 + +#define AFP_STATE_DOWN 0 +#define AFP_STATE_UP 1 + +#define AFP_RECONNECT_TIMEOUT 500000 +#define AFP_DOWN_COUNTER_INTERVAL 40 + +#define POLL_TIMEOUT 100 + +#ifndef TP_STATUS_USER_BUSY +/* for new use latest bit available in tp_status */ +#define TP_STATUS_USER_BUSY (1 << 31) +#endif + +#ifndef TP_STATUS_VLAN_VALID +#define TP_STATUS_VLAN_VALID (1 << 4) +#endif + +/** protect pfring_set_bpf_filter, as it is not thread safe */ +static SCMutex afpacket_bpf_set_filter_lock = SCMUTEX_INITIALIZER; + +enum { + AFP_READ_OK, + AFP_READ_FAILURE, + AFP_FAILURE, + AFP_KERNEL_DROP, +}; + +enum { + AFP_FATAL_ERROR = 1, + AFP_RECOVERABLE_ERROR, +}; + +union thdr { + struct tpacket2_hdr *h2; + void *raw; +}; + +/** + * \brief Structure to hold thread specific variables. + */ +typedef struct AFPThreadVars_ +{ + /* thread specific socket */ + int socket; + /* handle state */ + unsigned char afp_state; + + /* data link type for the thread */ + int datalink; + int cooked; + + /* counters */ + uint64_t pkts; + uint64_t bytes; + uint64_t errs; + + ThreadVars *tv; + TmSlot *slot; + + uint8_t *data; /** Per function and thread data */ + int datalen; /** Length of per function and thread data */ + + int vlan_disabled; + + char iface[AFP_IFACE_NAME_LENGTH]; + LiveDevice *livedev; + int down_count; + + /* Filter */ + char *bpf_filter; + + /* socket buffer size */ + int buffer_size; + int promisc; + ChecksumValidationMode checksum_mode; + + /* IPS stuff */ + char out_iface[AFP_IFACE_NAME_LENGTH]; + AFPPeer *mpeer; + + int flags; + uint16_t capture_kernel_packets; + uint16_t capture_kernel_drops; + + int cluster_id; + int cluster_type; + + int threads; + int copy_mode; + + struct tpacket_req req; + unsigned int tp_hdrlen; + unsigned int ring_buflen; + char *ring_buf; + char *frame_buf; + unsigned int frame_offset; + int ring_size; + +} AFPThreadVars; + +TmEcode ReceiveAFP(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode ReceiveAFPThreadInit(ThreadVars *, void *, void **); +void ReceiveAFPThreadExitStats(ThreadVars *, void *); +TmEcode ReceiveAFPThreadDeinit(ThreadVars *, void *); +TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot); + +TmEcode DecodeAFPThreadInit(ThreadVars *, void *, void **); +TmEcode DecodeAFPThreadDeinit(ThreadVars *tv, void *data); +TmEcode DecodeAFP(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); + +TmEcode AFPSetBPFFilter(AFPThreadVars *ptv); +static int AFPGetIfnumByDev(int fd, const char *ifname, int verbose); +static int AFPGetDevFlags(int fd, const char *ifname); +static int AFPDerefSocket(AFPPeer* peer); +static int AFPRefSocket(AFPPeer* peer); + +/** + * \brief Registration Function for RecieveAFP. + * \todo Unit tests are needed for this module. + */ +void TmModuleReceiveAFPRegister (void) +{ + tmm_modules[TMM_RECEIVEAFP].name = "ReceiveAFP"; + tmm_modules[TMM_RECEIVEAFP].ThreadInit = ReceiveAFPThreadInit; + tmm_modules[TMM_RECEIVEAFP].Func = NULL; + tmm_modules[TMM_RECEIVEAFP].PktAcqLoop = ReceiveAFPLoop; + tmm_modules[TMM_RECEIVEAFP].ThreadExitPrintStats = ReceiveAFPThreadExitStats; + tmm_modules[TMM_RECEIVEAFP].ThreadDeinit = NULL; + tmm_modules[TMM_RECEIVEAFP].RegisterTests = NULL; + tmm_modules[TMM_RECEIVEAFP].cap_flags = SC_CAP_NET_RAW; + tmm_modules[TMM_RECEIVEAFP].flags = TM_FLAG_RECEIVE_TM; +} + + +/** + * \defgroup afppeers AFP peers list + * + * AF_PACKET has an IPS mode were interface are peered: packet from + * on interface are sent the peered interface and the other way. The ::AFPPeer + * list is maitaining the list of peers. Each ::AFPPeer is storing the needed + * information to be able to send packet on the interface. + * A element of the list must not be destroyed during the run of Suricata as it + * is used by ::Packet and other threads. + * + * @{ + */ + +typedef struct AFPPeersList_ { + TAILQ_HEAD(, AFPPeer_) peers; /**< Head of list of fragments. */ + int cnt; + int peered; + int turn; /**< Next value for initialisation order */ + SC_ATOMIC_DECLARE(int, reached); /**< Counter used to synchronize start */ +} AFPPeersList; + +/** + * \brief Update the peer. + * + * Update the AFPPeer of a thread ie set new state, socket number + * or iface index. + * + */ +void AFPPeerUpdate(AFPThreadVars *ptv) +{ + if (ptv->mpeer == NULL) { + return; + } + (void)SC_ATOMIC_SET(ptv->mpeer->if_idx, AFPGetIfnumByDev(ptv->socket, ptv->iface, 0)); + (void)SC_ATOMIC_SET(ptv->mpeer->socket, ptv->socket); + (void)SC_ATOMIC_SET(ptv->mpeer->state, ptv->afp_state); +} + +/** + * \brief Clean and free ressource used by an ::AFPPeer + */ +void AFPPeerClean(AFPPeer *peer) +{ + if (peer->flags & AFP_SOCK_PROTECT) + SCMutexDestroy(&peer->sock_protect); + SC_ATOMIC_DESTROY(peer->socket); + SC_ATOMIC_DESTROY(peer->if_idx); + SC_ATOMIC_DESTROY(peer->state); + SCFree(peer); +} + +AFPPeersList peerslist; + + +/** + * \brief Init the global list of ::AFPPeer + */ +TmEcode AFPPeersListInit() +{ + SCEnter(); + TAILQ_INIT(&peerslist.peers); + peerslist.peered = 0; + peerslist.cnt = 0; + peerslist.turn = 0; + SC_ATOMIC_INIT(peerslist.reached); + (void) SC_ATOMIC_SET(peerslist.reached, 0); + SCReturnInt(TM_ECODE_OK); +} + +/** + * \brief Check that all ::AFPPeer got a peer + * + * \retval TM_ECODE_FAILED if some threads are not peered or TM_ECODE_OK else. + */ +TmEcode AFPPeersListCheck() +{ +#define AFP_PEERS_MAX_TRY 4 +#define AFP_PEERS_WAIT 20000 + int try = 0; + SCEnter(); + while (try < AFP_PEERS_MAX_TRY) { + if (peerslist.cnt != peerslist.peered) { + usleep(AFP_PEERS_WAIT); + } else { + SCReturnInt(TM_ECODE_OK); + } + try++; + } + SCLogError(SC_ERR_AFP_CREATE, "Threads number not equals"); + SCReturnInt(TM_ECODE_FAILED); +} + +/** + * \brief Declare a new AFP thread to AFP peers list. + */ +TmEcode AFPPeersListAdd(AFPThreadVars *ptv) +{ + SCEnter(); + AFPPeer *peer = SCMalloc(sizeof(AFPPeer)); + AFPPeer *pitem; + int mtu, out_mtu; + + if (unlikely(peer == NULL)) { + SCReturnInt(TM_ECODE_FAILED); + } + memset(peer, 0, sizeof(AFPPeer)); + SC_ATOMIC_INIT(peer->socket); + SC_ATOMIC_INIT(peer->sock_usage); + SC_ATOMIC_INIT(peer->if_idx); + SC_ATOMIC_INIT(peer->state); + peer->flags = ptv->flags; + peer->turn = peerslist.turn++; + + if (peer->flags & AFP_SOCK_PROTECT) { + SCMutexInit(&peer->sock_protect, NULL); + } + + (void)SC_ATOMIC_SET(peer->sock_usage, 0); + (void)SC_ATOMIC_SET(peer->state, AFP_STATE_DOWN); + strlcpy(peer->iface, ptv->iface, AFP_IFACE_NAME_LENGTH); + ptv->mpeer = peer; + /* add element to iface list */ + TAILQ_INSERT_TAIL(&peerslist.peers, peer, next); + + if (ptv->copy_mode != AFP_COPY_MODE_NONE) { + peerslist.cnt++; + + /* Iter to find a peer */ + TAILQ_FOREACH(pitem, &peerslist.peers, next) { + if (pitem->peer) + continue; + if (strcmp(pitem->iface, ptv->out_iface)) + continue; + peer->peer = pitem; + pitem->peer = peer; + mtu = GetIfaceMTU(ptv->iface); + out_mtu = GetIfaceMTU(ptv->out_iface); + if (mtu != out_mtu) { + SCLogError(SC_ERR_AFP_CREATE, + "MTU on %s (%d) and %s (%d) are not equal, " + "transmission of packets bigger than %d will fail.", + ptv->iface, mtu, + ptv->out_iface, out_mtu, + (out_mtu > mtu) ? mtu : out_mtu); + } + peerslist.peered += 2; + break; + } + } + + AFPPeerUpdate(ptv); + + SCReturnInt(TM_ECODE_OK); +} + +int AFPPeersListWaitTurn(AFPPeer *peer) +{ + /* If turn is zero, we already have started threads once */ + if (peerslist.turn == 0) + return 0; + + if (peer->turn == SC_ATOMIC_GET(peerslist.reached)) + return 0; + return 1; +} + +void AFPPeersListReachedInc() +{ + if (peerslist.turn == 0) + return; + + if (SC_ATOMIC_ADD(peerslist.reached, 1) == peerslist.turn) { + SCLogInfo("All AFP capture threads are running."); + (void)SC_ATOMIC_SET(peerslist.reached, 0); + /* Set turn to 0 to skip syncrhonization when ReceiveAFPLoop is + * restarted. + */ + peerslist.turn = 0; + } +} + +static int AFPPeersListStarted() +{ + return !peerslist.turn; +} + +/** + * \brief Clean the global peers list. + */ +void AFPPeersListClean() +{ + AFPPeer *pitem; + + while ((pitem = TAILQ_FIRST(&peerslist.peers))) { + TAILQ_REMOVE(&peerslist.peers, pitem, next); + AFPPeerClean(pitem); + } +} + +/** + * @} + */ + +/** + * \brief Registration Function for DecodeAFP. + * \todo Unit tests are needed for this module. + */ +void TmModuleDecodeAFPRegister (void) +{ + tmm_modules[TMM_DECODEAFP].name = "DecodeAFP"; + tmm_modules[TMM_DECODEAFP].ThreadInit = DecodeAFPThreadInit; + tmm_modules[TMM_DECODEAFP].Func = DecodeAFP; + tmm_modules[TMM_DECODEAFP].ThreadExitPrintStats = NULL; + tmm_modules[TMM_DECODEAFP].ThreadDeinit = DecodeAFPThreadDeinit; + tmm_modules[TMM_DECODEAFP].RegisterTests = NULL; + tmm_modules[TMM_DECODEAFP].cap_flags = 0; + tmm_modules[TMM_DECODEAFP].flags = TM_FLAG_DECODE_TM; +} + + +static int AFPCreateSocket(AFPThreadVars *ptv, char *devname, int verbose); + +static inline void AFPDumpCounters(AFPThreadVars *ptv) +{ +#ifdef PACKET_STATISTICS + struct tpacket_stats kstats; + socklen_t len = sizeof (struct tpacket_stats); + if (getsockopt(ptv->socket, SOL_PACKET, PACKET_STATISTICS, + &kstats, &len) > -1) { + SCLogDebug("(%s) Kernel: Packets %" PRIu32 ", dropped %" PRIu32 "", + ptv->tv->name, + kstats.tp_packets, kstats.tp_drops); + StatsAddUI64(ptv->tv, ptv->capture_kernel_packets, kstats.tp_packets); + StatsAddUI64(ptv->tv, ptv->capture_kernel_drops, kstats.tp_drops); + (void) SC_ATOMIC_ADD(ptv->livedev->drop, (uint64_t) kstats.tp_drops); + (void) SC_ATOMIC_ADD(ptv->livedev->pkts, (uint64_t) kstats.tp_packets); + } +#endif +} + +/** + * \brief AF packet read function. + * + * This function fills + * From here the packets are picked up by the DecodeAFP thread. + * + * \param user pointer to AFPThreadVars + * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success + */ +int AFPRead(AFPThreadVars *ptv) +{ + Packet *p = NULL; + /* XXX should try to use read that get directly to packet */ + int offset = 0; + int caplen; + struct sockaddr_ll from; + struct iovec iov; + struct msghdr msg; + struct cmsghdr *cmsg; + union { + struct cmsghdr cmsg; + char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))]; + } cmsg_buf; + unsigned char aux_checksum = 0; + + msg.msg_name = &from; + msg.msg_namelen = sizeof(from); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg_buf; + msg.msg_controllen = sizeof(cmsg_buf); + msg.msg_flags = 0; + + if (ptv->cooked) + offset = SLL_HEADER_LEN; + else + offset = 0; + iov.iov_len = ptv->datalen - offset; + iov.iov_base = ptv->data + offset; + + caplen = recvmsg(ptv->socket, &msg, MSG_TRUNC); + + if (caplen < 0) { + SCLogWarning(SC_ERR_AFP_READ, "recvmsg failed with error code %" PRId32, + errno); + SCReturnInt(AFP_READ_FAILURE); + } + + p = PacketGetFromQueueOrAlloc(); + if (p == NULL) { + SCReturnInt(AFP_FAILURE); + } + PKT_SET_SRC(p, PKT_SRC_WIRE); + + /* get timestamp of packet via ioctl */ + if (ioctl(ptv->socket, SIOCGSTAMP, &p->ts) == -1) { + SCLogWarning(SC_ERR_AFP_READ, "recvmsg failed with error code %" PRId32, + errno); + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_READ_FAILURE); + } + + ptv->pkts++; + ptv->bytes += caplen + offset; + p->livedev = ptv->livedev; + + /* add forged header */ + if (ptv->cooked) { + SllHdr * hdrp = (SllHdr *)ptv->data; + /* XXX this is minimalist, but this seems enough */ + hdrp->sll_protocol = from.sll_protocol; + } + + p->datalink = ptv->datalink; + SET_PKT_LEN(p, caplen + offset); + if (PacketCopyData(p, ptv->data, GET_PKT_LEN(p)) == -1) { + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_FAILURE); + } + SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", + GET_PKT_LEN(p), p, GET_PKT_DATA(p)); + + /* We only check for checksum disable */ + if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { + p->flags |= PKT_IGNORE_CHECKSUM; + } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_AUTO) { + if (ptv->livedev->ignore_checksum) { + p->flags |= PKT_IGNORE_CHECKSUM; + } else if (ChecksumAutoModeCheck(ptv->pkts, + SC_ATOMIC_GET(ptv->livedev->pkts), + SC_ATOMIC_GET(ptv->livedev->invalid_checksums))) { + ptv->livedev->ignore_checksum = 1; + p->flags |= PKT_IGNORE_CHECKSUM; + } + } else { + aux_checksum = 1; + } + + /* List is NULL if we don't have activated auxiliary data */ + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + struct tpacket_auxdata *aux; + + if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct tpacket_auxdata)) || + cmsg->cmsg_level != SOL_PACKET || + cmsg->cmsg_type != PACKET_AUXDATA) + continue; + + aux = (struct tpacket_auxdata *)CMSG_DATA(cmsg); + + if (aux_checksum && (aux->tp_status & TP_STATUS_CSUMNOTREADY)) { + p->flags |= PKT_IGNORE_CHECKSUM; + } + break; + } + + if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_FAILURE); + } + SCReturnInt(AFP_READ_OK); +} + +TmEcode AFPWritePacket(Packet *p) +{ + struct sockaddr_ll socket_address; + int socket; + + if (p->afp_v.copy_mode == AFP_COPY_MODE_IPS) { + if (PACKET_TEST_ACTION(p, ACTION_DROP)) { + return TM_ECODE_OK; + } + } + + if (SC_ATOMIC_GET(p->afp_v.peer->state) == AFP_STATE_DOWN) + return TM_ECODE_OK; + + if (p->ethh == NULL) { + SCLogWarning(SC_ERR_INVALID_VALUE, "Should have an Ethernet header"); + return TM_ECODE_FAILED; + } + /* Index of the network device */ + socket_address.sll_ifindex = SC_ATOMIC_GET(p->afp_v.peer->if_idx); + /* Address length*/ + socket_address.sll_halen = ETH_ALEN; + /* Destination MAC */ + memcpy(socket_address.sll_addr, p->ethh, 6); + + /* Send packet, locking the socket if necessary */ + if (p->afp_v.peer->flags & AFP_SOCK_PROTECT) + SCMutexLock(&p->afp_v.peer->sock_protect); + socket = SC_ATOMIC_GET(p->afp_v.peer->socket); + if (sendto(socket, GET_PKT_DATA(p), GET_PKT_LEN(p), 0, + (struct sockaddr*) &socket_address, + sizeof(struct sockaddr_ll)) < 0) { + SCLogWarning(SC_ERR_SOCKET, "Sending packet failed on socket %d: %s", + socket, + strerror(errno)); + if (p->afp_v.peer->flags & AFP_SOCK_PROTECT) + SCMutexUnlock(&p->afp_v.peer->sock_protect); + return TM_ECODE_FAILED; + } + if (p->afp_v.peer->flags & AFP_SOCK_PROTECT) + SCMutexUnlock(&p->afp_v.peer->sock_protect); + + return TM_ECODE_OK; +} + +void AFPReleaseDataFromRing(Packet *p) +{ + /* Need to be in copy mode and need to detect early release + where Ethernet header could not be set (and pseudo packet) */ + if ((p->afp_v.copy_mode != AFP_COPY_MODE_NONE) && !PKT_IS_PSEUDOPKT(p)) { + AFPWritePacket(p); + } + + if (AFPDerefSocket(p->afp_v.mpeer) == 0) + goto cleanup; + + if (p->afp_v.relptr) { + union thdr h; + h.raw = p->afp_v.relptr; + h.h2->tp_status = TP_STATUS_KERNEL; + } + +cleanup: + AFPV_CLEANUP(&p->afp_v); +} + +void AFPReleasePacket(Packet *p) +{ + AFPReleaseDataFromRing(p); + PacketFreeOrRelease(p); +} + +/** + * \brief AF packet read function for ring + * + * This function fills + * From here the packets are picked up by the DecodeAFP thread. + * + * \param user pointer to AFPThreadVars + * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success + */ +int AFPReadFromRing(AFPThreadVars *ptv) +{ + Packet *p = NULL; + union thdr h; + struct sockaddr_ll *from; + uint8_t emergency_flush = 0; + int read_pkts = 0; + int loop_start = -1; + + + /* Loop till we have packets available */ + while (1) { + if (unlikely(suricata_ctl_flags != 0)) { + break; + } + + /* Read packet from ring */ + h.raw = (((union thdr **)ptv->frame_buf)[ptv->frame_offset]); + if (h.raw == NULL) { + SCReturnInt(AFP_FAILURE); + } + + if ((! h.h2->tp_status) || (h.h2->tp_status & TP_STATUS_USER_BUSY)) { + if (read_pkts == 0) { + if (loop_start == -1) { + loop_start = ptv->frame_offset; + } else if (unlikely(loop_start == (int)ptv->frame_offset)) { + SCReturnInt(AFP_READ_OK); + } + if (++ptv->frame_offset >= ptv->req.tp_frame_nr) { + ptv->frame_offset = 0; + } + continue; + } + if ((emergency_flush) && (ptv->flags & AFP_EMERGENCY_MODE)) { + SCReturnInt(AFP_KERNEL_DROP); + } else { + SCReturnInt(AFP_READ_OK); + } + } + + read_pkts++; + loop_start = -1; + + /* Our packet is still used by suricata, we exit read loop to + * gain some time */ + if (h.h2->tp_status & TP_STATUS_USER_BUSY) { + SCReturnInt(AFP_READ_OK); + } + + if ((ptv->flags & AFP_EMERGENCY_MODE) && (emergency_flush == 1)) { + h.h2->tp_status = TP_STATUS_KERNEL; + goto next_frame; + } + + p = PacketGetFromQueueOrAlloc(); + if (p == NULL) { + SCReturnInt(AFP_FAILURE); + } + PKT_SET_SRC(p, PKT_SRC_WIRE); + + /* Suricata will treat packet so telling it is busy, this + * status will be reset to 0 (ie TP_STATUS_KERNEL) in the release + * function. */ + h.h2->tp_status |= TP_STATUS_USER_BUSY; + + from = (void *)h.raw + TPACKET_ALIGN(ptv->tp_hdrlen); + + ptv->pkts++; + ptv->bytes += h.h2->tp_len; + p->livedev = ptv->livedev; + + /* add forged header */ + if (ptv->cooked) { + SllHdr * hdrp = (SllHdr *)ptv->data; + /* XXX this is minimalist, but this seems enough */ + hdrp->sll_protocol = from->sll_protocol; + } + + p->datalink = ptv->datalink; + if (h.h2->tp_len > h.h2->tp_snaplen) { + SCLogDebug("Packet length (%d) > snaplen (%d), truncating", + h.h2->tp_len, h.h2->tp_snaplen); + } + + /* get vlan id from header */ + if ((!ptv->vlan_disabled) && + (h.h2->tp_status & TP_STATUS_VLAN_VALID || h.h2->tp_vlan_tci)) { + p->vlan_id[0] = h.h2->tp_vlan_tci; + p->vlan_idx = 1; + p->vlanh[0] = NULL; + } + + if (ptv->flags & AFP_ZERO_COPY) { + if (PacketSetData(p, (unsigned char*)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_FAILURE); + } else { + p->afp_v.relptr = h.raw; + p->ReleasePacket = AFPReleasePacket; + p->afp_v.mpeer = ptv->mpeer; + AFPRefSocket(ptv->mpeer); + + p->afp_v.copy_mode = ptv->copy_mode; + if (p->afp_v.copy_mode != AFP_COPY_MODE_NONE) { + p->afp_v.peer = ptv->mpeer->peer; + } else { + p->afp_v.peer = NULL; + } + } + } else { + if (PacketCopyData(p, (unsigned char*)h.raw + h.h2->tp_mac, h.h2->tp_snaplen) == -1) { + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_FAILURE); + } + } + /* Timestamp */ + p->ts.tv_sec = h.h2->tp_sec; + p->ts.tv_usec = h.h2->tp_nsec/1000; + SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", + GET_PKT_LEN(p), p, GET_PKT_DATA(p)); + + /* We only check for checksum disable */ + if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) { + p->flags |= PKT_IGNORE_CHECKSUM; + } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_AUTO) { + if (ptv->livedev->ignore_checksum) { + p->flags |= PKT_IGNORE_CHECKSUM; + } else if (ChecksumAutoModeCheck(ptv->pkts, + SC_ATOMIC_GET(ptv->livedev->pkts), + SC_ATOMIC_GET(ptv->livedev->invalid_checksums))) { + ptv->livedev->ignore_checksum = 1; + p->flags |= PKT_IGNORE_CHECKSUM; + } + } else { + if (h.h2->tp_status & TP_STATUS_CSUMNOTREADY) { + p->flags |= PKT_IGNORE_CHECKSUM; + } + } + if (h.h2->tp_status & TP_STATUS_LOSING) { + emergency_flush = 1; + AFPDumpCounters(ptv); + } + + /* release frame if not in zero copy mode */ + if (!(ptv->flags & AFP_ZERO_COPY)) { + h.h2->tp_status = TP_STATUS_KERNEL; + } + + if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) { + h.h2->tp_status = TP_STATUS_KERNEL; + if (++ptv->frame_offset >= ptv->req.tp_frame_nr) { + ptv->frame_offset = 0; + } + TmqhOutputPacketpool(ptv->tv, p); + SCReturnInt(AFP_FAILURE); + } + +next_frame: + if (++ptv->frame_offset >= ptv->req.tp_frame_nr) { + ptv->frame_offset = 0; + /* Get out of loop to be sure we will reach maintenance tasks */ + SCReturnInt(AFP_READ_OK); + } + } + + SCReturnInt(AFP_READ_OK); +} + +/** + * \brief Reference socket + * + * \retval O in case of failure, 1 in case of success + */ +static int AFPRefSocket(AFPPeer* peer) +{ + if (unlikely(peer == NULL)) + return 0; + + (void)SC_ATOMIC_ADD(peer->sock_usage, 1); + return 1; +} + + +/** + * \brief Dereference socket + * + * \retval 1 if socket is still alive, 0 if not + */ +static int AFPDerefSocket(AFPPeer* peer) +{ + if (peer == NULL) + return 1; + + if (SC_ATOMIC_SUB(peer->sock_usage, 1) == 0) { + if (SC_ATOMIC_GET(peer->state) == AFP_STATE_DOWN) { + SCLogInfo("Cleaning socket connected to '%s'", peer->iface); + close(SC_ATOMIC_GET(peer->socket)); + return 0; + } + } + return 1; +} + +void AFPSwitchState(AFPThreadVars *ptv, int state) +{ + ptv->afp_state = state; + ptv->down_count = 0; + + AFPPeerUpdate(ptv); + + /* Do cleaning if switching to down state */ + if (state == AFP_STATE_DOWN) { + if (ptv->frame_buf) { + /* only used in reading phase, we can free it */ + SCFree(ptv->frame_buf); + ptv->frame_buf = NULL; + } + if (ptv->socket != -1) { + /* we need to wait for all packets to return data */ + if (SC_ATOMIC_SUB(ptv->mpeer->sock_usage, 1) == 0) { + SCLogInfo("Cleaning socket connected to '%s'", ptv->iface); + close(ptv->socket); + ptv->socket = -1; + } + } + } + if (state == AFP_STATE_UP) { + (void)SC_ATOMIC_SET(ptv->mpeer->sock_usage, 1); + } +} + +static int AFPReadAndDiscard(AFPThreadVars *ptv, struct timeval *synctv) +{ + struct sockaddr_ll from; + struct iovec iov; + struct msghdr msg; + struct timeval ts; + union { + struct cmsghdr cmsg; + char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))]; + } cmsg_buf; + + + if (unlikely(suricata_ctl_flags != 0)) { + return 1; + } + + msg.msg_name = &from; + msg.msg_namelen = sizeof(from); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg_buf; + msg.msg_controllen = sizeof(cmsg_buf); + msg.msg_flags = 0; + + iov.iov_len = ptv->datalen; + iov.iov_base = ptv->data; + + recvmsg(ptv->socket, &msg, MSG_TRUNC); + + if (ioctl(ptv->socket, SIOCGSTAMP, &ts) == -1) { + /* FIXME */ + return -1; + } + + if ((ts.tv_sec > synctv->tv_sec) || + (ts.tv_sec >= synctv->tv_sec && + ts.tv_usec > synctv->tv_usec)) { + return 1; + } + return 0; +} + +static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv) +{ + union thdr h; + + if (unlikely(suricata_ctl_flags != 0)) { + return 1; + } + + /* Read packet from ring */ + h.raw = (((union thdr **)ptv->frame_buf)[ptv->frame_offset]); + if (h.raw == NULL) { + return -1; + } + + if (((time_t)h.h2->tp_sec > synctv->tv_sec) || + ((time_t)h.h2->tp_sec == synctv->tv_sec && + (suseconds_t) (h.h2->tp_nsec / 1000) > synctv->tv_usec)) { + return 1; + } + + h.h2->tp_status = TP_STATUS_KERNEL; + if (++ptv->frame_offset >= ptv->req.tp_frame_nr) { + ptv->frame_offset = 0; + } + + + return 0; +} + +/** \brief wait for all afpacket threads to fully init + * + * Discard packets before all threads are ready, as the cluster + * setup is not complete yet. + * + * if AFPPeersListStarted() returns true init is complete + * + * \retval r 1 = happy, otherwise unhappy + */ +static int AFPSynchronizeStart(AFPThreadVars *ptv) +{ + int r; + struct timeval synctv; + struct pollfd fds; + + fds.fd = ptv->socket; + fds.events = POLLIN; + + /* Set timeval to end of the world */ + synctv.tv_sec = 0xffffffff; + synctv.tv_usec = 0xffffffff; + + while (1) { + r = poll(&fds, 1, POLL_TIMEOUT); + if (r > 0 && + (fds.revents & (POLLHUP|POLLRDHUP|POLLERR|POLLNVAL))) { + SCLogWarning(SC_ERR_AFP_READ, "poll failed %02x", + fds.revents & (POLLHUP|POLLRDHUP|POLLERR|POLLNVAL)); + return 0; + } else if (r > 0) { + if (AFPPeersListStarted() && synctv.tv_sec == (time_t) 0xffffffff) { + gettimeofday(&synctv, NULL); + } + if (ptv->flags & AFP_RING_MODE) { + r = AFPReadAndDiscardFromRing(ptv, &synctv); + } else { + r = AFPReadAndDiscard(ptv, &synctv); + } + SCLogDebug("Discarding on %s", ptv->tv->name); + switch (r) { + case 1: + SCLogInfo("Starting to read on %s", ptv->tv->name); + return 1; + case -1: + return r; + } + /* no packets */ + } else if (r == 0 && AFPPeersListStarted()) { + SCLogInfo("Starting to read on %s", ptv->tv->name); + return 1; + } else if (r < 0) { /* only exit on error */ + SCLogWarning(SC_ERR_AFP_READ, "poll failed with retval %d", r); + return 0; + } + } + return 1; +} + +/** + * \brief Try to reopen socket + * + * \retval 0 in case of success, negative if error occurs or a condition + * is not met. + */ +static int AFPTryReopen(AFPThreadVars *ptv) +{ + int afp_activate_r; + + ptv->down_count++; + + + /* Don't reconnect till we have packet that did not release data */ + if (SC_ATOMIC_GET(ptv->mpeer->sock_usage) != 0) { + return -1; + } + + afp_activate_r = AFPCreateSocket(ptv, ptv->iface, 0); + if (afp_activate_r != 0) { + if (ptv->down_count % AFP_DOWN_COUNTER_INTERVAL == 0) { + SCLogWarning(SC_ERR_AFP_CREATE, "Can not open iface '%s'", + ptv->iface); + } + return afp_activate_r; + } + + SCLogInfo("Interface '%s' is back", ptv->iface); + return 0; +} + +/** + * \brief Main AF_PACKET reading Loop function + */ +TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) +{ + SCEnter(); + + AFPThreadVars *ptv = (AFPThreadVars *)data; + struct pollfd fds; + int r; + TmSlot *s = (TmSlot *)slot; + time_t last_dump = 0; + struct timeval current_time; + + ptv->slot = s->slot_next; + + if (ptv->afp_state == AFP_STATE_DOWN) { + /* Wait for our turn, threads before us must have opened the socket */ + while (AFPPeersListWaitTurn(ptv->mpeer)) { + usleep(1000); + if (suricata_ctl_flags != 0) { + break; + } + } + r = AFPCreateSocket(ptv, ptv->iface, 1); + if (r < 0) { + switch (-r) { + case AFP_FATAL_ERROR: + SCLogError(SC_ERR_AFP_CREATE, "Couldn't init AF_PACKET socket, fatal error"); + /* fatal is fatal, we want suri to exit */ + EngineKill(); + //tv->aof = THV_ENGINE_EXIT; + SCReturnInt(TM_ECODE_FAILED); + case AFP_RECOVERABLE_ERROR: + SCLogWarning(SC_ERR_AFP_CREATE, "Couldn't init AF_PACKET socket, retrying soon"); + } + } + AFPPeersListReachedInc(); + } + if (ptv->afp_state == AFP_STATE_UP) { + SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket); + AFPSynchronizeStart(ptv); + } + + fds.fd = ptv->socket; + fds.events = POLLIN; + + while (1) { + /* Start by checking the state of our interface */ + if (unlikely(ptv->afp_state == AFP_STATE_DOWN)) { + int dbreak = 0; + + do { + usleep(AFP_RECONNECT_TIMEOUT); + if (suricata_ctl_flags != 0) { + dbreak = 1; + break; + } + r = AFPTryReopen(ptv); + fds.fd = ptv->socket; + } while (r < 0); + if (dbreak == 1) + break; + } + + /* make sure we have at least one packet in the packet pool, to prevent + * us from alloc'ing packets at line rate */ + PacketPoolWait(); + + r = poll(&fds, 1, POLL_TIMEOUT); + + if (suricata_ctl_flags != 0) { + break; + } + + if (r > 0 && + (fds.revents & (POLLHUP|POLLRDHUP|POLLERR|POLLNVAL))) { + if (fds.revents & (POLLHUP | POLLRDHUP)) { + AFPSwitchState(ptv, AFP_STATE_DOWN); + continue; + } else if (fds.revents & POLLERR) { + char c; + /* Do a recv to get errno */ + if (recv(ptv->socket, &c, sizeof c, MSG_PEEK) != -1) + continue; /* what, no error? */ + SCLogError(SC_ERR_AFP_READ, + "Error reading data from iface '%s': (%d" PRIu32 ") %s", + ptv->iface, errno, strerror(errno)); + AFPSwitchState(ptv, AFP_STATE_DOWN); + continue; + } else if (fds.revents & POLLNVAL) { + SCLogError(SC_ERR_AFP_READ, "Invalid polling request"); + AFPSwitchState(ptv, AFP_STATE_DOWN); + continue; + } + } else if (r > 0) { + if (ptv->flags & AFP_RING_MODE) { + r = AFPReadFromRing(ptv); + } else { + /* AFPRead will call TmThreadsSlotProcessPkt on read packets */ + r = AFPRead(ptv); + } + switch (r) { + case AFP_READ_FAILURE: + /* AFPRead in error: best to reset the socket */ + SCLogError(SC_ERR_AFP_READ, + "AFPRead error reading data from iface '%s': (%d" PRIu32 ") %s", + ptv->iface, errno, strerror(errno)); + AFPSwitchState(ptv, AFP_STATE_DOWN); + continue; + case AFP_FAILURE: + AFPSwitchState(ptv, AFP_STATE_DOWN); + SCReturnInt(TM_ECODE_FAILED); + break; + case AFP_READ_OK: + /* Trigger one dump of stats every second */ + TimeGet(¤t_time); + if (current_time.tv_sec != last_dump) { + AFPDumpCounters(ptv); + last_dump = current_time.tv_sec; + } + break; + case AFP_KERNEL_DROP: + AFPDumpCounters(ptv); + break; + } + } else if ((r < 0) && (errno != EINTR)) { + SCLogError(SC_ERR_AFP_READ, "Error reading data from iface '%s': (%d" PRIu32 ") %s", + ptv->iface, + errno, strerror(errno)); + AFPSwitchState(ptv, AFP_STATE_DOWN); + continue; + } + StatsSyncCountersIfSignalled(tv); + } + + AFPDumpCounters(ptv); + StatsSyncCountersIfSignalled(tv); + SCReturnInt(TM_ECODE_OK); +} + +static int AFPGetDevFlags(int fd, const char *ifname) +{ + struct ifreq ifr; + + memset(&ifr, 0, sizeof(ifr)); + strlcpy(ifr.ifr_name, ifname, sizeof(ifr.ifr_name)); + + if (ioctl(fd, SIOCGIFFLAGS, &ifr) == -1) { + SCLogError(SC_ERR_AFP_CREATE, "Unable to find type for iface \"%s\": %s", + ifname, strerror(errno)); + return -1; + } + + return ifr.ifr_flags; +} + + +static int AFPGetIfnumByDev(int fd, const char *ifname, int verbose) +{ + struct ifreq ifr; + + memset(&ifr, 0, sizeof(ifr)); + strlcpy(ifr.ifr_name, ifname, sizeof(ifr.ifr_name)); + + if (ioctl(fd, SIOCGIFINDEX, &ifr) == -1) { + if (verbose) + SCLogError(SC_ERR_AFP_CREATE, "Unable to find iface %s: %s", + ifname, strerror(errno)); + return -1; + } + + return ifr.ifr_ifindex; +} + +static int AFPGetDevLinktype(int fd, const char *ifname) +{ + struct ifreq ifr; + + memset(&ifr, 0, sizeof(ifr)); + strlcpy(ifr.ifr_name, ifname, sizeof(ifr.ifr_name)); + + if (ioctl(fd, SIOCGIFHWADDR, &ifr) == -1) { + SCLogError(SC_ERR_AFP_CREATE, "Unable to find type for iface \"%s\": %s", + ifname, strerror(errno)); + return -1; + } + + switch (ifr.ifr_hwaddr.sa_family) { + case ARPHRD_LOOPBACK: + return LINKTYPE_ETHERNET; + case ARPHRD_PPP: + return LINKTYPE_RAW; + default: + return ifr.ifr_hwaddr.sa_family; + } +} + +static int AFPComputeRingParams(AFPThreadVars *ptv, int order) +{ + /* Compute structure: + Target is to store all pending packets + with a size equal to MTU + auxdata + And we keep a decent number of block + + To do so: + Compute frame_size (aligned to be able to fit in block + Check which block size we need. Blocksize is a 2^n * pagesize + We then need to get order, big enough to have + frame_size < block size + Find number of frame per block (divide) + Fill in packet_req + + Compute frame size: + described in packet_mmap.txt + dependant on snaplen (need to use a variable ?) +snaplen: MTU ? +tp_hdrlen determine_version in daq_afpacket +in V1: sizeof(struct tpacket_hdr); +in V2: val in getsockopt(instance->fd, SOL_PACKET, PACKET_HDRLEN, &val, &len) +frame size: TPACKET_ALIGN(snaplen + TPACKET_ALIGN(TPACKET_ALIGN(tp_hdrlen) + sizeof(struct sockaddr_ll) + ETH_HLEN) - ETH_HLEN); + + */ + int tp_hdrlen = sizeof(struct tpacket_hdr); + int snaplen = default_packet_size; + + if (snaplen == 0) { + snaplen = GetIfaceMaxPacketSize(ptv->iface); + if (snaplen <= 0) { + SCLogWarning(SC_ERR_INVALID_VALUE, + "Unable to get MTU, setting snaplen to sane default of 1514"); + snaplen = 1514; + } + } + + ptv->req.tp_frame_size = TPACKET_ALIGN(snaplen +TPACKET_ALIGN(TPACKET_ALIGN(tp_hdrlen) + sizeof(struct sockaddr_ll) + ETH_HLEN) - ETH_HLEN); + ptv->req.tp_block_size = getpagesize() << order; + int frames_per_block = ptv->req.tp_block_size / ptv->req.tp_frame_size; + if (frames_per_block == 0) { + SCLogInfo("frame size to big"); + return -1; + } + ptv->req.tp_frame_nr = ptv->ring_size; + ptv->req.tp_block_nr = ptv->req.tp_frame_nr / frames_per_block + 1; + /* exact division */ + ptv->req.tp_frame_nr = ptv->req.tp_block_nr * frames_per_block; + SCLogInfo("AF_PACKET RX Ring params: block_size=%d block_nr=%d frame_size=%d frame_nr=%d", + ptv->req.tp_block_size, ptv->req.tp_block_nr, + ptv->req.tp_frame_size, ptv->req.tp_frame_nr); + return 1; +} + +static int AFPCreateSocket(AFPThreadVars *ptv, char *devname, int verbose) +{ + int r; + int ret = AFP_FATAL_ERROR; + struct packet_mreq sock_params; + struct sockaddr_ll bind_address; + int order; + unsigned int i; + int if_idx; + + /* open socket */ + ptv->socket = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL)); + if (ptv->socket == -1) { + SCLogError(SC_ERR_AFP_CREATE, "Couldn't create a AF_PACKET socket, error %s", strerror(errno)); + goto error; + } + if_idx = AFPGetIfnumByDev(ptv->socket, devname, verbose); + /* bind socket */ + memset(&bind_address, 0, sizeof(bind_address)); + bind_address.sll_family = AF_PACKET; + bind_address.sll_protocol = htons(ETH_P_ALL); + bind_address.sll_ifindex = if_idx; + if (bind_address.sll_ifindex == -1) { + if (verbose) + SCLogError(SC_ERR_AFP_CREATE, "Couldn't find iface %s", devname); + ret = AFP_RECOVERABLE_ERROR; + goto socket_err; + } + + if (ptv->promisc != 0) { + /* Force promiscuous mode */ + memset(&sock_params, 0, sizeof(sock_params)); + sock_params.mr_type = PACKET_MR_PROMISC; + sock_params.mr_ifindex = bind_address.sll_ifindex; + r = setsockopt(ptv->socket, SOL_PACKET, PACKET_ADD_MEMBERSHIP,(void *)&sock_params, sizeof(sock_params)); + if (r < 0) { + SCLogError(SC_ERR_AFP_CREATE, + "Couldn't switch iface %s to promiscuous, error %s", + devname, strerror(errno)); + goto frame_err; + } + } + + if (ptv->checksum_mode == CHECKSUM_VALIDATION_KERNEL) { + int val = 1; + if (setsockopt(ptv->socket, SOL_PACKET, PACKET_AUXDATA, &val, + sizeof(val)) == -1 && errno != ENOPROTOOPT) { + SCLogWarning(SC_ERR_NO_AF_PACKET, + "'kernel' checksum mode not supported, failling back to full mode."); + ptv->checksum_mode = CHECKSUM_VALIDATION_ENABLE; + } + } + + /* set socket recv buffer size */ + if (ptv->buffer_size != 0) { + /* + * Set the socket buffer size to the specified value. + */ + SCLogInfo("Setting AF_PACKET socket buffer to %d", ptv->buffer_size); + if (setsockopt(ptv->socket, SOL_SOCKET, SO_RCVBUF, + &ptv->buffer_size, + sizeof(ptv->buffer_size)) == -1) { + SCLogError(SC_ERR_AFP_CREATE, + "Couldn't set buffer size to %d on iface %s, error %s", + ptv->buffer_size, devname, strerror(errno)); + goto frame_err; + } + } + + r = bind(ptv->socket, (struct sockaddr *)&bind_address, sizeof(bind_address)); + if (r < 0) { + if (verbose) { + if (errno == ENETDOWN) { + SCLogError(SC_ERR_AFP_CREATE, + "Couldn't bind AF_PACKET socket, iface %s is down", + devname); + } else { + SCLogError(SC_ERR_AFP_CREATE, + "Couldn't bind AF_PACKET socket to iface %s, error %s", + devname, strerror(errno)); + } + } + ret = AFP_RECOVERABLE_ERROR; + goto frame_err; + } + +#ifdef HAVE_PACKET_FANOUT + /* add binded socket to fanout group */ + if (ptv->threads > 1) { + uint32_t option = 0; + uint16_t mode = ptv->cluster_type; + uint16_t id = ptv->cluster_id; + option = (mode << 16) | (id & 0xffff); + r = setsockopt(ptv->socket, SOL_PACKET, PACKET_FANOUT,(void *)&option, sizeof(option)); + if (r < 0) { + SCLogError(SC_ERR_AFP_CREATE, + "Coudn't set fanout mode, error %s", + strerror(errno)); + goto frame_err; + } + } +#endif + + int if_flags = AFPGetDevFlags(ptv->socket, ptv->iface); + if (if_flags == -1) { + if (verbose) { + SCLogError(SC_ERR_AFP_READ, + "Can not acces to interface '%s'", + ptv->iface); + } + ret = AFP_RECOVERABLE_ERROR; + goto frame_err; + } + if ((if_flags & IFF_UP) == 0) { + if (verbose) { + SCLogError(SC_ERR_AFP_READ, + "Interface '%s' is down", + ptv->iface); + } + ret = AFP_RECOVERABLE_ERROR; + goto frame_err; + } + + if (ptv->flags & AFP_RING_MODE) { + int val = TPACKET_V2; + unsigned int len = sizeof(val); + if (getsockopt(ptv->socket, SOL_PACKET, PACKET_HDRLEN, &val, &len) < 0) { + if (errno == ENOPROTOOPT) { + SCLogError(SC_ERR_AFP_CREATE, + "Too old kernel giving up (need 2.6.27 at least)"); + } + SCLogError(SC_ERR_AFP_CREATE, "Error when retrieving packet header len"); + goto socket_err; + } + ptv->tp_hdrlen = val; + + val = TPACKET_V2; + if (setsockopt(ptv->socket, SOL_PACKET, PACKET_VERSION, &val, + sizeof(val)) < 0) { + SCLogError(SC_ERR_AFP_CREATE, + "Can't activate TPACKET_V2 on packet socket: %s", + strerror(errno)); + goto socket_err; + } + + /* Allocate RX ring */ +#define DEFAULT_ORDER 3 + for (order = DEFAULT_ORDER; order >= 0; order--) { + if (AFPComputeRingParams(ptv, order) != 1) { + SCLogInfo("Ring parameter are incorrect. Please correct the devel"); + } + + r = setsockopt(ptv->socket, SOL_PACKET, PACKET_RX_RING, (void *) &ptv->req, sizeof(ptv->req)); + if (r < 0) { + if (errno == ENOMEM) { + SCLogInfo("Memory issue with ring parameters. Retrying."); + continue; + } + SCLogError(SC_ERR_MEM_ALLOC, + "Unable to allocate RX Ring for iface %s: (%d) %s", + devname, + errno, + strerror(errno)); + goto socket_err; + } else { + break; + } + } + + if (order < 0) { + SCLogError(SC_ERR_MEM_ALLOC, + "Unable to allocate RX Ring for iface %s (order 0 failed)", + devname); + goto socket_err; + } + + /* Allocate the Ring */ + ptv->ring_buflen = ptv->req.tp_block_nr * ptv->req.tp_block_size; + ptv->ring_buf = mmap(0, ptv->ring_buflen, PROT_READ|PROT_WRITE, + MAP_SHARED, ptv->socket, 0); + if (ptv->ring_buf == MAP_FAILED) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to mmap"); + goto socket_err; + } + /* allocate a ring for each frame header pointer*/ + ptv->frame_buf = SCMalloc(ptv->req.tp_frame_nr * sizeof (union thdr *)); + if (ptv->frame_buf == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate frame buf"); + goto mmap_err; + } + memset(ptv->frame_buf, 0, ptv->req.tp_frame_nr * sizeof (union thdr *)); + /* fill the header ring with proper frame ptr*/ + ptv->frame_offset = 0; + for (i = 0; i < ptv->req.tp_block_nr; ++i) { + void *base = &ptv->ring_buf[i * ptv->req.tp_block_size]; + unsigned int j; + for (j = 0; j < ptv->req.tp_block_size / ptv->req.tp_frame_size; ++j, ++ptv->frame_offset) { + (((union thdr **)ptv->frame_buf)[ptv->frame_offset]) = base; + base += ptv->req.tp_frame_size; + } + } + ptv->frame_offset = 0; + } + + SCLogInfo("Using interface '%s' via socket %d", (char *)devname, ptv->socket); + + + ptv->datalink = AFPGetDevLinktype(ptv->socket, ptv->iface); + switch (ptv->datalink) { + case ARPHRD_PPP: + case ARPHRD_ATM: + ptv->cooked = 1; + break; + } + + TmEcode rc; + rc = AFPSetBPFFilter(ptv); + if (rc == TM_ECODE_FAILED) { + SCLogError(SC_ERR_AFP_CREATE, "Set AF_PACKET bpf filter \"%s\" failed.", ptv->bpf_filter); + goto frame_err; + } + + /* Init is ok */ + AFPSwitchState(ptv, AFP_STATE_UP); + return 0; + +frame_err: + if (ptv->frame_buf) + SCFree(ptv->frame_buf); +mmap_err: + /* Packet mmap does the cleaning when socket is closed */ +socket_err: + close(ptv->socket); + ptv->socket = -1; +error: + return -ret; +} + +TmEcode AFPSetBPFFilter(AFPThreadVars *ptv) +{ + struct bpf_program filter; + struct sock_fprog fcode; + int rc; + + if (!ptv->bpf_filter) + return TM_ECODE_OK; + + SCMutexLock(&afpacket_bpf_set_filter_lock); + + SCLogInfo("Using BPF '%s' on iface '%s'", + ptv->bpf_filter, + ptv->iface); + if (pcap_compile_nopcap(default_packet_size, /* snaplen_arg */ + ptv->datalink, /* linktype_arg */ + &filter, /* program */ + ptv->bpf_filter, /* const char *buf */ + 0, /* optimize */ + 0 /* mask */ + ) == -1) { + SCLogError(SC_ERR_AFP_CREATE, "Filter compilation failed."); + SCMutexUnlock(&afpacket_bpf_set_filter_lock); + return TM_ECODE_FAILED; + } + SCMutexUnlock(&afpacket_bpf_set_filter_lock); + + if (filter.bf_insns == NULL) { + SCLogError(SC_ERR_AFP_CREATE, "Filter badly setup."); + return TM_ECODE_FAILED; + } + + fcode.len = filter.bf_len; + fcode.filter = (struct sock_filter*)filter.bf_insns; + + rc = setsockopt(ptv->socket, SOL_SOCKET, SO_ATTACH_FILTER, &fcode, sizeof(fcode)); + + if(rc == -1) { + SCLogError(SC_ERR_AFP_CREATE, "Failed to attach filter: %s", strerror(errno)); + return TM_ECODE_FAILED; + } + + return TM_ECODE_OK; +} + + +/** + * \brief Init function for ReceiveAFP. + * + * \param tv pointer to ThreadVars + * \param initdata pointer to the interface passed from the user + * \param data pointer gets populated with AFPThreadVars + * + * \todo Create a general AFP setup function. + */ +TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) +{ + SCEnter(); + AFPIfaceConfig *afpconfig = initdata; + + if (initdata == NULL) { + SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL"); + SCReturnInt(TM_ECODE_FAILED); + } + + AFPThreadVars *ptv = SCMalloc(sizeof(AFPThreadVars)); + if (unlikely(ptv == NULL)) { + afpconfig->DerefFunc(afpconfig); + SCReturnInt(TM_ECODE_FAILED); + } + memset(ptv, 0, sizeof(AFPThreadVars)); + + ptv->tv = tv; + ptv->cooked = 0; + + strlcpy(ptv->iface, afpconfig->iface, AFP_IFACE_NAME_LENGTH); + ptv->iface[AFP_IFACE_NAME_LENGTH - 1]= '\0'; + + ptv->livedev = LiveGetDevice(ptv->iface); + if (ptv->livedev == NULL) { + SCLogError(SC_ERR_INVALID_VALUE, "Unable to find Live device"); + SCFree(ptv); + SCReturnInt(TM_ECODE_FAILED); + } + + ptv->buffer_size = afpconfig->buffer_size; + ptv->ring_size = afpconfig->ring_size; + + ptv->promisc = afpconfig->promisc; + ptv->checksum_mode = afpconfig->checksum_mode; + ptv->bpf_filter = NULL; + + ptv->threads = 1; +#ifdef HAVE_PACKET_FANOUT + ptv->cluster_type = PACKET_FANOUT_LB; + ptv->cluster_id = 1; + /* We only set cluster info if the number of reader threads is greater than 1 */ + if (afpconfig->threads > 1) { + ptv->cluster_id = afpconfig->cluster_id; + ptv->cluster_type = afpconfig->cluster_type; + ptv->threads = afpconfig->threads; + } +#endif + ptv->flags = afpconfig->flags; + + if (afpconfig->bpf_filter) { + ptv->bpf_filter = afpconfig->bpf_filter; + } + +#ifdef PACKET_STATISTICS + ptv->capture_kernel_packets = StatsRegisterCounter("capture.kernel_packets", + ptv->tv); + ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", + ptv->tv); +#endif + + char *active_runmode = RunmodeGetActive(); + + if (active_runmode && !strcmp("workers", active_runmode)) { + ptv->flags |= AFP_ZERO_COPY; + SCLogInfo("Enabling zero copy mode"); + } else { + /* If we are using copy mode we need a lock */ + ptv->flags |= AFP_SOCK_PROTECT; + } + + /* If we are in RING mode, then we can use ZERO copy + * by using the data release mechanism */ + if (ptv->flags & AFP_RING_MODE) { + ptv->flags |= AFP_ZERO_COPY; + SCLogInfo("Enabling zero copy mode by using data release call"); + } + + ptv->copy_mode = afpconfig->copy_mode; + if (ptv->copy_mode != AFP_COPY_MODE_NONE) { + strlcpy(ptv->out_iface, afpconfig->out_iface, AFP_IFACE_NAME_LENGTH); + ptv->out_iface[AFP_IFACE_NAME_LENGTH - 1]= '\0'; + /* Warn about BPF filter consequence */ + if (ptv->bpf_filter) { + SCLogWarning(SC_WARN_UNCOMMON, "Enabling a BPF filter in IPS mode result" + " in dropping all non matching packets."); + } + } + + + if (AFPPeersListAdd(ptv) == TM_ECODE_FAILED) { + SCFree(ptv); + afpconfig->DerefFunc(afpconfig); + SCReturnInt(TM_ECODE_FAILED); + } + +#define T_DATA_SIZE 70000 + ptv->data = SCMalloc(T_DATA_SIZE); + if (ptv->data == NULL) { + afpconfig->DerefFunc(afpconfig); + SCFree(ptv); + SCReturnInt(TM_ECODE_FAILED); + } + ptv->datalen = T_DATA_SIZE; +#undef T_DATA_SIZE + + *data = (void *)ptv; + + afpconfig->DerefFunc(afpconfig); + + /* A bit strange to have this here but we only have vlan information + * during reading so we need to know if we want to keep vlan during + * the capture phase */ + int vlanbool = 0; + if ((ConfGetBool("vlan.use-for-tracking", &vlanbool)) == 1 && vlanbool == 0) { + ptv->vlan_disabled = 1; + } + + /* If kernel is older than 3.0, VLAN is not stripped so we don't + * get the info from packet extended header but we will use a standard + * parsing of packet data (See Linux commit bcc6d47903612c3861201cc3a866fb604f26b8b2) */ + if (! SCKernelVersionIsAtLeast(3, 0)) { + ptv->vlan_disabled = 1; + } + + SCReturnInt(TM_ECODE_OK); +} + +/** + * \brief This function prints stats to the screen at exit. + * \param tv pointer to ThreadVars + * \param data pointer that gets cast into AFPThreadVars for ptv + */ +void ReceiveAFPThreadExitStats(ThreadVars *tv, void *data) +{ + SCEnter(); + AFPThreadVars *ptv = (AFPThreadVars *)data; + +#ifdef PACKET_STATISTICS + AFPDumpCounters(ptv); + SCLogInfo("(%s) Kernel: Packets %" PRIu64 ", dropped %" PRIu64 "", + tv->name, + StatsGetLocalCounterValue(tv, ptv->capture_kernel_packets), + StatsGetLocalCounterValue(tv, ptv->capture_kernel_drops)); +#endif + + SCLogInfo("(%s) Packets %" PRIu64 ", bytes %" PRIu64 "", tv->name, ptv->pkts, ptv->bytes); +} + +/** + * \brief DeInit function closes af packet socket at exit. + * \param tv pointer to ThreadVars + * \param data pointer that gets cast into AFPThreadVars for ptv + */ +TmEcode ReceiveAFPThreadDeinit(ThreadVars *tv, void *data) +{ + AFPThreadVars *ptv = (AFPThreadVars *)data; + + AFPSwitchState(ptv, AFP_STATE_DOWN); + + if (ptv->data != NULL) { + SCFree(ptv->data); + ptv->data = NULL; + } + ptv->datalen = 0; + + ptv->bpf_filter = NULL; + + SCReturnInt(TM_ECODE_OK); +} + +/** + * \brief This function passes off to link type decoders. + * + * DecodeAFP reads packets from the PacketQueue and passes + * them off to the proper link type decoder. + * + * \param t pointer to ThreadVars + * \param p pointer to the current packet + * \param data pointer that gets cast into AFPThreadVars for ptv + * \param pq pointer to the current PacketQueue + */ +TmEcode DecodeAFP(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) +{ + SCEnter(); + DecodeThreadVars *dtv = (DecodeThreadVars *)data; + + /* XXX HACK: flow timeout can call us for injected pseudo packets + * see bug: https://redmine.openinfosecfoundation.org/issues/1107 */ + if (p->flags & PKT_PSEUDO_STREAM_END) + return TM_ECODE_OK; + + /* update counters */ + DecodeUpdatePacketCounters(tv, dtv, p); + + /* If suri has set vlan during reading, we increase vlan counter */ + if (p->vlan_idx) { + StatsIncr(tv, dtv->counter_vlan); + } + + /* call the decoder */ + switch(p->datalink) { + case LINKTYPE_LINUX_SLL: + DecodeSll(tv, dtv, p, GET_PKT_DATA(p), GET_PKT_LEN(p), pq); + break; + case LINKTYPE_ETHERNET: + DecodeEthernet(tv, dtv, p,GET_PKT_DATA(p), GET_PKT_LEN(p), pq); + break; + case LINKTYPE_PPP: + DecodePPP(tv, dtv, p, GET_PKT_DATA(p), GET_PKT_LEN(p), pq); + break; + case LINKTYPE_RAW: + DecodeRaw(tv, dtv, p, GET_PKT_DATA(p), GET_PKT_LEN(p), pq); + break; + default: + SCLogError(SC_ERR_DATALINK_UNIMPLEMENTED, "Error: datalink type %" PRId32 " not yet supported in module DecodeAFP", p->datalink); + break; + } + + PacketDecodeFinalize(tv, dtv, p); + + SCReturnInt(TM_ECODE_OK); +} + +TmEcode DecodeAFPThreadInit(ThreadVars *tv, void *initdata, void **data) +{ + SCEnter(); + DecodeThreadVars *dtv = NULL; + + dtv = DecodeThreadVarsAlloc(tv); + + if (dtv == NULL) + SCReturnInt(TM_ECODE_FAILED); + + DecodeRegisterPerfCounters(dtv, tv); + + *data = (void *)dtv; + +#ifdef __SC_CUDA_SUPPORT__ + if (CudaThreadVarsInit(&dtv->cuda_vars) < 0) + SCReturnInt(TM_ECODE_FAILED); +#endif + + SCReturnInt(TM_ECODE_OK); +} + +TmEcode DecodeAFPThreadDeinit(ThreadVars *tv, void *data) +{ + if (data != NULL) + DecodeThreadVarsFree(tv, data); + SCReturnInt(TM_ECODE_OK); +} + +#endif /* HAVE_AF_PACKET */ +/* eof */ +/** + * @} + */ |