diff options
Diffstat (limited to 'VNFs/DPPD-PROX/main.c')
-rw-r--r-- | VNFs/DPPD-PROX/main.c | 993 |
1 files changed, 993 insertions, 0 deletions
diff --git a/VNFs/DPPD-PROX/main.c b/VNFs/DPPD-PROX/main.c new file mode 100644 index 00000000..28533c78 --- /dev/null +++ b/VNFs/DPPD-PROX/main.c @@ -0,0 +1,993 @@ +/* +// Copyright (c) 2010-2017 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#include <string.h> +#include <locale.h> +#include <unistd.h> +#include <signal.h> + +#include <rte_cycles.h> +#include <rte_atomic.h> +#include <rte_table_hash.h> +#include <rte_memzone.h> +#include <rte_errno.h> + +#include "prox_malloc.h" +#include "run.h" +#include "main.h" +#include "log.h" +#include "quit.h" +#include "clock.h" +#include "defines.h" +#include "version.h" +#include "prox_args.h" +#include "prox_assert.h" +#include "prox_cfg.h" +#include "prox_shared.h" +#include "prox_port_cfg.h" +#include "toeplitz.h" +#include "hash_utils.h" +#include "handle_lb_net.h" +#include "prox_cksum.h" +#include "thread_nop.h" +#include "thread_generic.h" +#include "thread_pipeline.h" +#include "cqm.h" + +#if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0) +#define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE +#endif + +uint8_t lb_nb_txrings = 0xff; +struct rte_ring *ctrl_rings[RTE_MAX_LCORE*MAX_TASKS_PER_CORE]; + +static void __attribute__((noreturn)) prox_usage(const char *prgname) +{ + plog_info("\nUsage: %s [-f CONFIG_FILE] [-a|-e] [-m|-s|-i] [-w DEF] [-u] [-t]\n" + "\t-f CONFIG_FILE : configuration file to load, ./prox.cfg by default\n" + "\t-l LOG_FILE : log file name, ./prox.log by default\n" + "\t-p : include PID in log file name if default log file is used\n" + "\t-o DISPLAY: Set display to use, can be 'curses' (default), 'cli' or 'none'\n" + "\t-v verbosity : initial logging verbosity\n" + "\t-a : autostart all cores (by default)\n" + "\t-e : don't autostart\n" + "\t-n : Create NULL devices instead of using PCI devices, useful together with -i\n" + "\t-m : list supported task modes and exit\n" + "\t-s : check configuration file syntax and exit\n" + "\t-i : check initialization sequence and exit\n" + "\t-u : Listen on UDS /tmp/prox.sock\n" + "\t-t : Listen on TCP port 8474\n" + "\t-q : Pass argument to Lua interpreter, useful to define variables\n" + "\t-w : define variable using syntax varname=value\n" + "\t takes precedence over variables defined in CONFIG_FILE\n" + "\t-k : Log statistics to file \"stats_dump\" in current directory\n" + "\t-d : Run as daemon, the parent process will block until PROX is not initialized\n" + "\t-z : Ignore CPU topology, implies -i\n" + "\t-r : Change initial screen refresh rate. If set to a lower than 0.001 seconds,\n" + "\t screen refreshing will be disabled\n" + , prgname); + exit(EXIT_FAILURE); +} + +static void check_mixed_normal_pipeline(void) +{ + struct lcore_cfg *lconf = NULL; + uint32_t lcore_id = -1; + + while (prox_core_next(&lcore_id, 0) == 0) { + lconf = &lcore_cfg[lcore_id]; + + int all_thread_nop = 1; + int generic = 0; + int pipeline = 0; + for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { + struct task_args *targ = &lconf->targs[task_id]; + all_thread_nop = all_thread_nop && + targ->task_init->thread_x == thread_nop; + + pipeline = pipeline || targ->task_init->thread_x == thread_pipeline; + generic = generic || targ->task_init->thread_x == thread_generic; + } + PROX_PANIC(generic && pipeline, "Can't run both pipeline and normal thread on same core\n"); + + if (all_thread_nop) + lconf->thread_x = thread_nop; + else { + lconf->thread_x = thread_generic; + } + } +} + +static void check_missing_rx(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + + while (core_targ_next(&lconf, &targ, 0) == 0) { + PROX_PANIC((targ->flags & TASK_ARG_RX_RING) && targ->rx_rings[0] == 0 && !targ->tx_opt_ring_task, + "Configuration Error - Core %u task %u Receiving from ring, but nobody xmitting to this ring\n", lconf->id, targ->id); + if (targ->nb_rxports == 0 && targ->nb_rxrings == 0) { + PROX_PANIC(!task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX), + "\tCore %u task %u: no rx_ports and no rx_rings configured while required by mode %s\n", lconf->id, targ->id, targ->task_init->mode_str); + } + } +} + +static void check_cfg_consistent(void) +{ + check_missing_rx(); + check_mixed_normal_pipeline(); +} + +static void plog_all_rings(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + + while (core_targ_next(&lconf, &targ, 0) == 0) { + for (uint8_t ring_idx = 0; ring_idx < targ->nb_rxrings; ++ring_idx) { + plog_info("\tCore %u, task %u, rx_ring[%u] %p\n", lconf->id, targ->id, ring_idx, targ->rx_rings[ring_idx]); + } + } +} + +static int chain_flag_state(struct task_args *targ, uint64_t flag, int is_set) +{ + if (task_init_flag_set(targ->task_init, flag) == is_set) + return 1; + + int ret = 0; + + for (uint32_t i = 0; i < targ->n_prev_tasks; ++i) { + ret = chain_flag_state(targ->prev_tasks[i], flag, is_set); + if (ret) + return 1; + } + return 0; +} + +static void configure_if_tx_queues(struct task_args *targ, uint8_t socket) +{ + uint8_t if_port; + + for (uint8_t i = 0; i < targ->nb_txports; ++i) { + if_port = targ->tx_port_queue[i].port; + + PROX_PANIC(if_port == OUT_DISCARD, "port misconfigured, exiting\n"); + + PROX_PANIC(!prox_port_cfg[if_port].active, "\tPort %u not used, skipping...\n", if_port); + + int dsocket = prox_port_cfg[if_port].socket; + if (dsocket != -1 && dsocket != socket) { + plog_warn("TX core on socket %d while device on socket %d\n", socket, dsocket); + } + + if (prox_port_cfg[if_port].tx_ring[0] == '\0') { // Rings-backed port can use single queue + targ->tx_port_queue[i].queue = prox_port_cfg[if_port].n_txq; + prox_port_cfg[if_port].n_txq++; + } else { + prox_port_cfg[if_port].n_txq = 1; + targ->tx_port_queue[i].queue = 0; + } + /* Set the ETH_TXQ_FLAGS_NOREFCOUNT flag if none of + the tasks up to the task transmitting to the port + does not use refcnt. */ + if (!chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT, 1)) { + prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOREFCOUNT; + plog_info("\t\tEnabling No refcnt on port %d\n", if_port); + } + else { + plog_info("\t\tRefcnt used on port %d\n", if_port); + } + + /* By default OFFLOAD is enabled, but if the whole + chain has NOOFFLOADS set all the way until the + first task that receives from a port, it will be + disabled for the destination port. */ + if (chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS, 1)) { + prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOOFFLOADS; + plog_info("\t\tDisabling TX offloads on port %d\n", if_port); + } else { + plog_info("\t\tEnabling TX offloads on port %d\n", if_port); + } + + /* By default NOMULTSEGS is disabled, as drivers/NIC might split packets on RX + It should only be enabled when we know for sure that the RX does not split packets. + Set the ETH_TXQ_FLAGS_NOMULTSEGS flag if none of the tasks up to the task + transmitting to the port does not use multsegs. */ + if (!chain_flag_state(targ, TASK_FEATURE_TXQ_FLAGS_NOMULTSEGS, 0)) { + prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS; + plog_info("\t\tEnabling No MultiSegs on port %d\n", if_port); + } + else { + plog_info("\t\tMultiSegs used on port %d\n", if_port); + } + } +} + +static void configure_if_rx_queues(struct task_args *targ, uint8_t socket) +{ + for (int i = 0; i < targ->nb_rxports; i++) { + uint8_t if_port = targ->rx_port_queue[i].port; + + if (if_port == OUT_DISCARD) { + return; + } + + PROX_PANIC(!prox_port_cfg[if_port].active, "Port %u not used, aborting...\n", if_port); + + if(prox_port_cfg[if_port].rx_ring[0] != '\0') { + prox_port_cfg[if_port].n_rxq = 0; + } + + targ->rx_port_queue[i].queue = prox_port_cfg[if_port].n_rxq; + prox_port_cfg[if_port].pool[targ->rx_port_queue[i].queue] = targ->pool; + prox_port_cfg[if_port].pool_size[targ->rx_port_queue[i].queue] = targ->nb_mbuf - 1; + prox_port_cfg[if_port].n_rxq++; + + int dsocket = prox_port_cfg[if_port].socket; + if (dsocket != -1 && dsocket != socket) { + plog_warn("RX core on socket %d while device on socket %d\n", socket, dsocket); + } + } +} + +static void configure_if_queues(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + uint8_t socket; + + while (core_targ_next(&lconf, &targ, 0) == 0) { + socket = rte_lcore_to_socket_id(lconf->id); + + configure_if_tx_queues(targ, socket); + configure_if_rx_queues(targ, socket); + } +} + +static const char *gen_ring_name(void) +{ + static char retval[] = "XX"; + static const char* ring_names = + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "[\\]^_`!\"#$%&'()*+,-./:;<=" + ">?@{|}0123456789"; + static int idx2 = 0; + + int idx = idx2; + + retval[0] = ring_names[idx % strlen(ring_names)]; + idx /= strlen(ring_names); + retval[1] = idx ? ring_names[(idx - 1) % strlen(ring_names)] : 0; + + idx2++; + + return retval; +} + +static int task_is_master(struct task_args *targ) +{ + return !targ->lconf; +} + +struct ring_init_stats { + uint32_t n_pkt_rings; + uint32_t n_ctrl_rings; + uint32_t n_opt_rings; +}; + +static uint32_t ring_init_stats_total(const struct ring_init_stats *ris) +{ + return ris->n_pkt_rings + ris->n_ctrl_rings + ris->n_opt_rings; +} + +static uint32_t count_incoming_tasks(uint32_t lcore_worker, uint32_t dest_task) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + uint32_t ret = 0; + struct core_task ct; + + while (core_targ_next(&lconf, &targ, 0) == 0) { + for (uint8_t idxx = 0; idxx < MAX_PROTOCOLS; ++idxx) { + for (uint8_t ridx = 0; ridx < targ->core_task_set[idxx].n_elems; ++ridx) { + ct = targ->core_task_set[idxx].core_task[ridx]; + + if (dest_task == ct.task && lcore_worker == ct.core) + ret++; + } + } + } + return ret; +} + +static struct rte_ring *get_existing_ring(uint32_t lcore_id, uint32_t task_id) +{ + if (!prox_core_active(lcore_id, 0)) + return NULL; + + struct lcore_cfg *lconf = &lcore_cfg[lcore_id]; + + if (task_id >= lconf->n_tasks_all) + return NULL; + + if (lconf->targs[task_id].nb_rxrings == 0) + return NULL; + + return lconf->targs[task_id].rx_rings[0]; +} + +static void init_ring_between_tasks(struct lcore_cfg *lconf, struct task_args *starg, + const struct core_task ct, uint8_t ring_idx, int idx, + struct ring_init_stats *ris) +{ + uint8_t socket; + struct rte_ring *ring = NULL; + struct lcore_cfg *lworker; + struct task_args *dtarg; + + PROX_ASSERT(prox_core_active(ct.core, 0)); + lworker = &lcore_cfg[ct.core]; + + /* socket used is the one that the sending core resides on */ + socket = rte_lcore_to_socket_id(lconf->id); + + plog_info("\t\tCreating ring on socket %u with size %u\n" + "\t\t\tsource core, task and socket = %u, %u, %u\n" + "\t\t\tdestination core, task and socket = %u, %u, %u\n" + "\t\t\tdestination worker id = %u\n", + socket, starg->ring_size, + lconf->id, starg->id, socket, + ct.core, ct.task, rte_lcore_to_socket_id(ct.core), + ring_idx); + + if (ct.type) { + struct rte_ring **dring = NULL; + + if (ct.type == CTRL_TYPE_MSG) + dring = &lworker->ctrl_rings_m[ct.task]; + else if (ct.type == CTRL_TYPE_PKT) { + dring = &lworker->ctrl_rings_p[ct.task]; + starg->flags |= TASK_ARG_CTRL_RINGS_P; + } + + if (*dring == NULL) + ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ); + else + ring = *dring; + PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core); + + starg->tx_rings[starg->tot_n_txrings_inited] = ring; + starg->tot_n_txrings_inited++; + *dring = ring; + if (lconf->id == prox_cfg.master) { + ctrl_rings[ct.core*MAX_TASKS_PER_CORE + ct.task] = ring; + } + + plog_info("\t\tCore %u task %u to -> core %u task %u ctrl_ring %s %p %s\n", + lconf->id, starg->id, ct.core, ct.task, ct.type == CTRL_TYPE_PKT? + "pkt" : "msg", ring, ring->name); + ris->n_ctrl_rings++; + return; + } + + dtarg = &lworker->targs[ct.task]; + lworker->targs[ct.task].worker_thread_id = ring_idx; + PROX_ASSERT(dtarg->flags & TASK_ARG_RX_RING); + PROX_ASSERT(ct.task < lworker->n_tasks_all); + + /* If all the following conditions are met, the ring can be + optimized away. */ + if (!task_is_master(starg) && starg->lconf->id == dtarg->lconf->id && + starg->nb_txrings == 1 && idx == 0 && dtarg->task && + dtarg->tot_rxrings == 1 && starg->task == dtarg->task - 1) { + plog_info("\t\tOptimizing away ring on core %u from task %u to task %u\n", + dtarg->lconf->id, starg->task, dtarg->task); + /* No need to set up ws_mbuf. */ + starg->tx_opt_ring = 1; + /* During init of destination task, the buffer in the + source task will be initialized. */ + dtarg->tx_opt_ring_task = starg; + ris->n_opt_rings++; + ++dtarg->nb_rxrings; + return; + } + + int ring_created = 1; + /* Only create multi-producer rings if configured to do so AND + there is only one task sending to the task */ + if ((prox_cfg.flags & DSF_MP_RINGS && count_incoming_tasks(ct.core, ct.task) > 1) + || (prox_cfg.flags & DSF_ENABLE_BYPASS)) { + ring = get_existing_ring(ct.core, ct.task); + + if (ring) { + plog_info("\t\tCore %u task %u creatign MP ring %p to core %u task %u\n", + lconf->id, starg->id, ring, ct.core, ct.task); + ring_created = 0; + } + else { + ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ); + plog_info("\t\tCore %u task %u using MP ring %p from core %u task %u\n", + lconf->id, starg->id, ring, ct.core, ct.task); + } + } + else + ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SP_ENQ | RING_F_SC_DEQ); + + PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core); + + starg->tx_rings[starg->tot_n_txrings_inited] = ring; + starg->tot_n_txrings_inited++; + + if (ring_created) { + PROX_ASSERT(dtarg->nb_rxrings < MAX_RINGS_PER_TASK); + dtarg->rx_rings[dtarg->nb_rxrings] = ring; + ++dtarg->nb_rxrings; + } + dtarg->nb_slave_threads = starg->core_task_set[idx].n_elems; + dtarg->lb_friend_core = lconf->id; + dtarg->lb_friend_task = starg->id; + plog_info("\t\tWorker thread %d has core %d, task %d as a lb friend\n", ct.core, lconf->id, starg->id); + plog_info("\t\tCore %u task %u tx_ring[%u] -> core %u task %u rx_ring[%u] %p %s %u WT\n", + lconf->id, starg->id, ring_idx, ct.core, ct.task, dtarg->nb_rxrings, ring, ring->name, + dtarg->nb_slave_threads); + ++ris->n_pkt_rings; +} + +static void init_rings(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *starg; + struct ring_init_stats ris = {0}; + + while (core_targ_next(&lconf, &starg, 1) == 0) { + plog_info("\t*** Initializing rings on core %u, task %u ***\n", lconf->id, starg->id); + for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) { + for (uint8_t ring_idx = 0; ring_idx < starg->core_task_set[idx].n_elems; ++ring_idx) { + PROX_ASSERT(ring_idx < MAX_WT_PER_LB); + PROX_ASSERT(starg->tot_n_txrings_inited < MAX_RINGS_PER_TASK); + + struct core_task ct = starg->core_task_set[idx].core_task[ring_idx]; + init_ring_between_tasks(lconf, starg, ct, ring_idx, idx, &ris); + } + } + } + + plog_info("\tInitialized %d rings:\n" + "\t\tNumber of packet rings: %u\n" + "\t\tNumber of control rings: %u\n" + "\t\tNumber of optimized rings: %u\n", + ring_init_stats_total(&ris), + ris.n_pkt_rings, + ris.n_ctrl_rings, + ris.n_opt_rings); +} + +static void shuffle_mempool(struct rte_mempool* mempool, uint32_t nb_mbuf) +{ + struct rte_mbuf** pkts = prox_zmalloc(nb_mbuf * sizeof(*pkts), rte_socket_id()); + uint64_t got = 0; + + while (rte_mempool_get_bulk(mempool, (void**)(pkts + got), 1) == 0) + ++got; + + while (got) { + int idx; + do { + idx = rand() % nb_mbuf - 1; + } while (pkts[idx] == 0); + + rte_mempool_put_bulk(mempool, (void**)&pkts[idx], 1); + pkts[idx] = 0; + --got; + }; + prox_free(pkts); +} + +static void setup_mempools_unique_per_socket(void) +{ + uint32_t flags = 0; + char name[64]; + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + + struct rte_mempool *pool[MAX_SOCKETS]; + uint32_t mbuf_count[MAX_SOCKETS] = {0}; + uint32_t nb_cache_mbuf[MAX_SOCKETS] = {0}; + uint32_t mbuf_size[MAX_SOCKETS] = {0}; + + while (core_targ_next_early(&lconf, &targ, 0) == 0) { + PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id); + uint8_t socket = rte_lcore_to_socket_id(lconf->id); + PROX_ASSERT(socket < MAX_SOCKETS); + + if (targ->mbuf_size_set_explicitely) + flags = MEMPOOL_F_NO_SPREAD; + if ((!targ->mbuf_size_set_explicitely) && (targ->task_init->mbuf_size != 0)) { + targ->mbuf_size = targ->task_init->mbuf_size; + } + if (targ->rx_port_queue[0].port != OUT_DISCARD) { + struct prox_port_cfg* port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port]; + PROX_ASSERT(targ->nb_mbuf != 0); + mbuf_count[socket] += targ->nb_mbuf; + if (nb_cache_mbuf[socket] == 0) + nb_cache_mbuf[socket] = targ->nb_cache_mbuf; + else { + PROX_PANIC(nb_cache_mbuf[socket] != targ->nb_cache_mbuf, + "all mbuf_cache must have the same size if using a unique mempool per socket\n"); + } + if (mbuf_size[socket] == 0) + mbuf_size[socket] = targ->mbuf_size; + else { + PROX_PANIC(mbuf_size[socket] != targ->mbuf_size, + "all mbuf_size must have the same size if using a unique mempool per socket\n"); + } + if ((!targ->mbuf_size_set_explicitely) && (strcmp(port_cfg->short_name, "vmxnet3") == 0)) { + if (mbuf_size[socket] < MBUF_SIZE + RTE_PKTMBUF_HEADROOM) + mbuf_size[socket] = MBUF_SIZE + RTE_PKTMBUF_HEADROOM; + } + } + } + for (int i = 0 ; i < MAX_SOCKETS; i++) { + if (mbuf_count[i] != 0) { + sprintf(name, "socket_%u_pool", i); + pool[i] = rte_mempool_create(name, + mbuf_count[i] - 1, mbuf_size[i], + nb_cache_mbuf[i], + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + prox_pktmbuf_init, NULL, + i, flags); + PROX_PANIC(pool[i] == NULL, "\t\tError: cannot create mempool for socket %u\n", i); + plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", pool[i], + mbuf_count[i], mbuf_size[i], nb_cache_mbuf[i], i); + + if (prox_cfg.flags & DSF_SHUFFLE) { + shuffle_mempool(pool[i], mbuf_count[i]); + } + } + } + + lconf = NULL; + while (core_targ_next_early(&lconf, &targ, 0) == 0) { + uint8_t socket = rte_lcore_to_socket_id(lconf->id); + + if (targ->rx_port_queue[0].port != OUT_DISCARD) { + /* use this pool for the interface that the core is receiving from */ + /* If one core receives from multiple ports, all the ports use the same mempool */ + targ->pool = pool[socket]; + /* Set the number of mbuf to the number of the unique mempool, so that the used and free work */ + targ->nb_mbuf = mbuf_count[socket]; + plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool, + targ->nb_mbuf, mbuf_size[socket], targ->nb_cache_mbuf, socket); + } + } +} + +static void setup_mempool_for_rx_task(struct lcore_cfg *lconf, struct task_args *targ) +{ + const uint8_t socket = rte_lcore_to_socket_id(lconf->id); + struct prox_port_cfg *port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port]; + const struct rte_memzone *mz; + struct rte_mempool *mp = NULL; + uint32_t flags = 0; + char memzone_name[64]; + char name[64]; + + /* mbuf size can be set + * - from config file (highest priority, overwriting any other config) - should only be used as workaround + * - through each 'mode', overwriting the default mbuf_size + * - defaulted to MBUF_SIZE i.e. 1518 Bytes + * Except is set expliciteky, ensure that size is big enough for vmxnet3 driver + */ + if (targ->mbuf_size_set_explicitely) { + flags = MEMPOOL_F_NO_SPREAD; + /* targ->mbuf_size already set */ + } + else if (targ->task_init->mbuf_size != 0) { + /* mbuf_size not set through config file but set through mode */ + targ->mbuf_size = targ->task_init->mbuf_size; + } + else if (strcmp(port_cfg->short_name, "vmxnet3") == 0) { + if (targ->mbuf_size < MBUF_SIZE + RTE_PKTMBUF_HEADROOM) + targ->mbuf_size = MBUF_SIZE + RTE_PKTMBUF_HEADROOM; + } + + /* allocate memory pool for packets */ + PROX_ASSERT(targ->nb_mbuf != 0); + + if (targ->pool_name[0] == '\0') { + sprintf(name, "core_%u_port_%u_pool", lconf->id, targ->id); + } + + snprintf(memzone_name, sizeof(memzone_name)-1, "MP_%s", targ->pool_name); + mz = rte_memzone_lookup(memzone_name); + + if (mz != NULL) { + mp = (struct rte_mempool*)mz->addr; + + targ->nb_mbuf = mp->size; + targ->pool = mp; + } + +#ifdef RTE_LIBRTE_IVSHMEM_FALSE + if (mz != NULL && mp != NULL && mp->phys_addr != mz->ioremap_addr) { + /* Init mbufs with ioremap_addr for dma */ + mp->phys_addr = mz->ioremap_addr; + mp->elt_pa[0] = mp->phys_addr + (mp->elt_va_start - (uintptr_t)mp); + + struct prox_pktmbuf_reinit_args init_args; + init_args.mp = mp; + init_args.lconf = lconf; + + uint32_t elt_sz = mp->elt_size + mp->header_size + mp->trailer_size; + rte_mempool_obj_iter((void*)mp->elt_va_start, mp->size, elt_sz, 1, + mp->elt_pa, mp->pg_num, mp->pg_shift, prox_pktmbuf_reinit, &init_args); + } +#endif + + /* Use this pool for the interface that the core is + receiving from if one core receives from multiple + ports, all the ports use the same mempool */ + if (targ->pool == NULL) { + plog_info("\t\tCreating mempool with name '%s'\n", name); + targ->pool = rte_mempool_create(name, + targ->nb_mbuf - 1, targ->mbuf_size, + targ->nb_cache_mbuf, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + prox_pktmbuf_init, lconf, + socket, flags); + } + + PROX_PANIC(targ->pool == NULL, + "\t\tError: cannot create mempool for core %u port %u: %s\n", lconf->id, targ->id, rte_strerror(rte_errno)); + + plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool, + targ->nb_mbuf, targ->mbuf_size, targ->nb_cache_mbuf, socket); + if (prox_cfg.flags & DSF_SHUFFLE) { + shuffle_mempool(targ->pool, targ->nb_mbuf); + } +} + +static void setup_mempools_multiple_per_socket(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + + while (core_targ_next_early(&lconf, &targ, 0) == 0) { + PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id); + if (targ->rx_port_queue[0].port == OUT_DISCARD) + continue; + setup_mempool_for_rx_task(lconf, targ); + } +} + +static void setup_mempools(void) +{ + if (prox_cfg.flags & UNIQUE_MEMPOOL_PER_SOCKET) + setup_mempools_unique_per_socket(); + else + setup_mempools_multiple_per_socket(); +} + +static void set_task_lconf(void) +{ + struct lcore_cfg *lconf; + uint32_t lcore_id = -1; + + while(prox_core_next(&lcore_id, 0) == 0) { + lconf = &lcore_cfg[lcore_id]; + for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { + lconf->targs[task_id].lconf = lconf; + } + } +} + +static void set_dest_threads(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + + while (core_targ_next(&lconf, &targ, 0) == 0) { + for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) { + for (uint8_t ring_idx = 0; ring_idx < targ->core_task_set[idx].n_elems; ++ring_idx) { + struct core_task ct = targ->core_task_set[idx].core_task[ring_idx]; + + struct task_args *dest_task = core_targ_get(ct.core, ct.task); + dest_task->prev_tasks[dest_task->n_prev_tasks++] = targ; + } + } + } +} + +static void setup_all_task_structs_early_init(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + + plog_info("\t*** Calling early init on all tasks ***\n"); + while (core_targ_next(&lconf, &targ, 0) == 0) { + if (targ->task_init->early_init) { + targ->task_init->early_init(targ); + } + } +} + +static void setup_all_task_structs(void) +{ + struct lcore_cfg *lconf; + uint32_t lcore_id = -1; + + while(prox_core_next(&lcore_id, 0) == 0) { + lconf = &lcore_cfg[lcore_id]; + for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { + lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]); + } + } +} + +static void init_port_activate(void) +{ + struct lcore_cfg *lconf = NULL; + struct task_args *targ; + uint8_t port_id = 0; + + while (core_targ_next_early(&lconf, &targ, 0) == 0) { + for (int i = 0; i < targ->nb_rxports; i++) { + port_id = targ->rx_port_queue[i].port; + prox_port_cfg[port_id].active = 1; + } + + for (int i = 0; i < targ->nb_txports; i++) { + port_id = targ->tx_port_queue[i].port; + prox_port_cfg[port_id].active = 1; + } + } +} + +/* Initialize cores and allocate mempools */ +static void init_lcores(void) +{ + struct lcore_cfg *lconf = 0; + uint32_t lcore_id = -1; + + while(prox_core_next(&lcore_id, 0) == 0) { + uint8_t socket = rte_lcore_to_socket_id(lcore_id); + PROX_PANIC(socket + 1 > MAX_SOCKETS, "Can't configure core %u (on socket %u). MAX_SOCKET is set to %d\n", lcore_id, socket, MAX_SOCKETS); + } + + /* need to allocate mempools as the first thing to use the lowest possible address range */ + plog_info("=== Initializing mempools ===\n"); + setup_mempools(); + + lcore_cfg_alloc_hp(); + + set_dest_threads(); + set_task_lconf(); + + plog_info("=== Initializing port addresses ===\n"); + init_port_addr(); + + plog_info("=== Initializing queue numbers on cores ===\n"); + configure_if_queues(); + + plog_info("=== Initializing rings on cores ===\n"); + init_rings(); + + plog_info("=== Checking configuration consistency ===\n"); + check_cfg_consistent(); + + plog_all_rings(); + + setup_all_task_structs_early_init(); + plog_info("=== Initializing tasks ===\n"); + setup_all_task_structs(); +} + +static int setup_prox(int argc, char **argv) +{ + if (prox_read_config_file() != 0 || + prox_setup_rte(argv[0]) != 0) { + return -1; + } + + if (prox_cfg.flags & DSF_CHECK_SYNTAX) { + plog_info("=== Configuration file syntax has been checked ===\n\n"); + exit(EXIT_SUCCESS); + } + + init_port_activate(); + plog_info("=== Initializing rte devices ===\n"); + if (!(prox_cfg.flags & DSF_USE_DUMMY_DEVICES)) + init_rte_ring_dev(); + init_rte_dev(prox_cfg.flags & DSF_USE_DUMMY_DEVICES); + plog_info("=== Calibrating TSC overhead ===\n"); + clock_init(); + plog_info("\tTSC running at %"PRIu64" Hz\n", rte_get_tsc_hz()); + + init_lcores(); + plog_info("=== Initializing ports ===\n"); + init_port_all(); + + if (prox_cfg.logbuf_size) { + prox_cfg.logbuf = prox_zmalloc(prox_cfg.logbuf_size, rte_socket_id()); + PROX_PANIC(prox_cfg.logbuf == NULL, "Failed to allocate memory for logbuf with size = %d\n", prox_cfg.logbuf_size); + } + + if (prox_cfg.flags & DSF_CHECK_INIT) { + plog_info("=== Initialization sequence completed ===\n\n"); + exit(EXIT_SUCCESS); + } + + /* Current way that works to disable DPDK logging */ + FILE *f = fopen("/dev/null", "r"); + rte_openlog_stream(f); + plog_info("=== PROX started ===\n"); + return 0; +} + +static int success = 0; +static void siguser_handler(int signal) +{ + if (signal == SIGUSR1) + success = 1; + else + success = 0; +} + +static void sigabrt_handler(__attribute__((unused)) int signum) +{ + /* restore default disposition for SIGABRT and SIGPIPE */ + signal(SIGABRT, SIG_DFL); + signal(SIGPIPE, SIG_DFL); + + /* ignore further Ctrl-C */ + signal(SIGINT, SIG_IGN); + + /* more drastic exit on tedious termination signal */ + plog_info("Aborting...\n"); + if (lcore_cfg != NULL) { + uint32_t lcore_id; + pthread_t thread_id, tid0, tid = pthread_self(); + memset(&tid0, 0, sizeof(tid0)); + + /* cancel all threads except current one */ + lcore_id = -1; + while (prox_core_next(&lcore_id, 1) == 0) { + thread_id = lcore_cfg[lcore_id].thread_id; + if (pthread_equal(thread_id, tid0)) + continue; + if (pthread_equal(thread_id, tid)) + continue; + pthread_cancel(thread_id); + } + + /* wait for cancelled threads to terminate */ + lcore_id = -1; + while (prox_core_next(&lcore_id, 1) == 0) { + thread_id = lcore_cfg[lcore_id].thread_id; + if (pthread_equal(thread_id, tid0)) + continue; + if (pthread_equal(thread_id, tid)) + continue; + pthread_join(thread_id, NULL); + } + } + + /* close ncurses */ + display_end(); + + /* close ports on termination signal */ + close_ports_atexit(); + + /* terminate now */ + abort(); +} + +static void sigterm_handler(int signum) +{ + /* abort on second Ctrl-C */ + if (signum == SIGINT) + signal(SIGINT, sigabrt_handler); + + /* gracefully quit on harmless termination signal */ + /* ports will subsequently get closed at resulting exit */ + quit(); +} + +int main(int argc, char **argv) +{ + /* set en_US locale to print big numbers with ',' */ + setlocale(LC_NUMERIC, "en_US.utf-8"); + + if (prox_parse_args(argc, argv) != 0){ + prox_usage(argv[0]); + } + + plog_init(prox_cfg.log_name, prox_cfg.log_name_pid); + plog_info("=== " PROGRAM_NAME " " VERSION_STR " ===\n"); + plog_info("\tUsing DPDK %s\n", rte_version() + sizeof(RTE_VER_PREFIX)); + read_rdt_info(); + + if (prox_cfg.flags & DSF_LIST_TASK_MODES) { + /* list supported task modes and exit */ + tasks_list(); + return EXIT_SUCCESS; + } + + /* close ports at normal exit */ + atexit(close_ports_atexit); + /* gracefully quit on harmless termination signals */ + signal(SIGHUP, sigterm_handler); + signal(SIGINT, sigterm_handler); + signal(SIGQUIT, sigterm_handler); + signal(SIGTERM, sigterm_handler); + signal(SIGUSR1, sigterm_handler); + signal(SIGUSR2, sigterm_handler); + /* more drastic exit on tedious termination signals */ + signal(SIGABRT, sigabrt_handler); + signal(SIGPIPE, sigabrt_handler); + + if (prox_cfg.flags & DSF_DAEMON) { + signal(SIGUSR1, siguser_handler); + signal(SIGUSR2, siguser_handler); + plog_info("=== Running in Daemon mode ===\n"); + plog_info("\tForking child and waiting for setup completion\n"); + + pid_t ppid = getpid(); + pid_t pid = fork(); + if (pid < 0) { + plog_err("Failed to fork process to run in daemon mode\n"); + return EXIT_FAILURE; + } + + if (pid == 0) { + fclose(stdin); + fclose(stdout); + fclose(stderr); + if (setsid() < 0) { + kill(ppid, SIGUSR2); + return EXIT_FAILURE; + } + if (setup_prox(argc, argv) != 0) { + kill(ppid, SIGUSR2); + return EXIT_FAILURE; + } + else { + kill(ppid, SIGUSR1); + run(prox_cfg.flags); + return EXIT_SUCCESS; + } + } + else { + /* Before exiting the parent, wait until the + child process has finished setting up */ + pause(); + if (prox_cfg.logbuf) { + file_print(prox_cfg.logbuf); + } + return success? EXIT_SUCCESS : EXIT_FAILURE; + } + } + + if (setup_prox(argc, argv) != 0) + return EXIT_FAILURE; + run(prox_cfg.flags); + return EXIT_SUCCESS; +} |