/* // Copyright (c) 2010-2020 Intel Corporation // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. */ #include #include #include #include #include #include #include #include #include #include #include "prox_malloc.h" #include "run.h" #include "main.h" #include "log.h" #include "quit.h" #include "clock.h" #include "defines.h" #include "version.h" #include "prox_args.h" #include "prox_assert.h" #include "prox_cfg.h" #include "prox_shared.h" #include "prox_port_cfg.h" #include "toeplitz.h" #include "hash_utils.h" #include "handle_lb_net.h" #include "prox_cksum.h" #include "thread_nop.h" #include "thread_generic.h" #include "thread_pipeline.h" #include "cqm.h" #include "handle_master.h" #if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0) #define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE #endif uint8_t lb_nb_txrings = 0xff; struct rte_ring *ctrl_rings[RTE_MAX_LCORE*MAX_TASKS_PER_CORE]; static void __attribute__((noreturn)) prox_usage(const char *prgname) { plog_info("\nUsage: %s [-f CONFIG_FILE] [-a|-e] [-m|-s|-i] [-w DEF] [-u] [-t]\n" "\t-f CONFIG_FILE : configuration file to load, ./prox.cfg by default\n" "\t-l LOG_FILE : log file name, ./prox.log by default\n" "\t-p : include PID in log file name if default log file is used\n" "\t-o DISPLAY: Set display to use, can be 'curses' (default), 'cli' or 'none'\n" "\t-v verbosity : initial logging verbosity\n" "\t-a : autostart all cores (by default)\n" "\t-e : don't autostart\n" "\t-n : Create NULL devices instead of using PCI devices, useful together with -i\n" "\t-m : list supported task modes and exit\n" "\t-s : check configuration file syntax and exit\n" "\t-i : check initialization sequence and exit\n" "\t-u : Listen on UDS /tmp/prox.sock\n" "\t-t : Listen on TCP port 8474\n" "\t-q : Pass argument to Lua interpreter, useful to define variables\n" "\t-w : define variable using syntax varname=value\n" "\t takes precedence over variables defined in CONFIG_FILE\n" "\t-k : Log statistics to file \"stats_dump\" in current directory\n" "\t-d : Run as daemon, the parent process will block until PROX is not initialized\n" "\t-z : Ignore CPU topology, implies -i\n" "\t-r : Change initial screen refresh rate. If set to a lower than 0.001 seconds,\n" "\t screen refreshing will be disabled\n" , prgname); exit(EXIT_FAILURE); } static void check_mixed_normal_pipeline(void) { struct lcore_cfg *lconf = NULL; uint32_t lcore_id = -1; while (prox_core_next(&lcore_id, 0) == 0) { lconf = &lcore_cfg[lcore_id]; int all_thread_nop = 1; int generic = 0; int pipeline = 0; int l3 = 0; for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { struct task_args *targ = &lconf->targs[task_id]; l3 = !strcmp("l3", targ->sub_mode_str); all_thread_nop = all_thread_nop && !l3 && targ->task_init->thread_x == thread_nop; pipeline = pipeline || targ->task_init->thread_x == thread_pipeline; generic = generic || targ->task_init->thread_x == thread_generic || l3; } PROX_PANIC(generic && pipeline, "Can't run both pipeline and normal thread on same core\n"); if (all_thread_nop) lconf->thread_x = thread_nop; else { lconf->thread_x = thread_generic; } } } static void check_zero_rx(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; while (core_targ_next(&lconf, &targ, 0) == 0) { if (targ->nb_rxports != 0) { PROX_PANIC(task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX), "\tCore %u task %u: rx_ports configured while mode %s does not use it\n", lconf->id, targ->id, targ->task_init->mode_str); } } } static void check_nb_mbuf(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ = NULL; uint8_t port_id; int n_txd = 0, n_rxd = 0; while (core_targ_next(&lconf, &targ, 0) == 0) { for (uint8_t i = 0; i < targ->nb_txports; ++i) { port_id = targ->tx_port_queue[i].port; n_txd = prox_port_cfg[port_id].n_txd; } for (uint8_t i = 0; i < targ->nb_rxports; ++i) { port_id = targ->rx_port_queue[i].port; n_rxd = prox_port_cfg[port_id].n_rxd; } if (targ->nb_mbuf <= n_rxd + n_txd + targ->nb_cache_mbuf + MAX_PKT_BURST) { plog_warn("Core %d, task %d might not have enough mbufs (%d) to support %d txd, %d rxd and %d cache_mbuf\n", lconf->id, targ->id, targ->nb_mbuf, n_txd, n_rxd, targ->nb_cache_mbuf); } } } static void check_missing_rx(void) { struct lcore_cfg *lconf = NULL, *rx_lconf = NULL, *tx_lconf = NULL; struct task_args *targ, *rx_targ = NULL, *tx_targ = NULL; uint8_t port_id, rx_port_id, ok, l3, ndp; while (core_targ_next(&lconf, &targ, 0) == 0) { PROX_PANIC((targ->flags & TASK_ARG_RX_RING) && targ->rx_rings[0] == 0 && !targ->tx_opt_ring_task, "Configuration Error - Core %u task %u Receiving from ring, but nobody xmitting to this ring\n", lconf->id, targ->id); if (targ->nb_rxports == 0 && targ->nb_rxrings == 0) { PROX_PANIC(!task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX), "\tCore %u task %u: no rx_ports and no rx_rings configured while required by mode %s\n", lconf->id, targ->id, targ->task_init->mode_str); } } lconf = NULL; while (core_targ_next(&lconf, &targ, 0) == 0) { l3 = ndp = 0; if (strcmp(targ->sub_mode_str, "l3") == 0) l3 = 1; else if (strcmp(targ->sub_mode_str, "ndp") == 0) ndp = 1; else continue; PROX_PANIC((targ->nb_rxports == 0) && (targ->nb_txports == 0), "L3/NDP task must have a RX or a TX port\n"); // If the L3/NDP sub_mode receives from a port, check that there is at least one core/task // transmitting to this port in L3/NDP sub_mode for (uint8_t i = 0; i < targ->nb_rxports; ++i) { rx_port_id = targ->rx_port_queue[i].port; ok = 0; tx_lconf = NULL; while (core_targ_next(&tx_lconf, &tx_targ, 0) == 0) { if ((port_id = tx_targ->tx_port_queue[0].port) == OUT_DISCARD) continue; if ((rx_port_id == port_id) && ( ((tx_targ->flags & TASK_ARG_L3) && l3) || ((tx_targ->flags & TASK_ARG_NDP) && ndp) ) ) { ok = 1; break; } } PROX_PANIC(ok == 0, "RX %s sub mode for port %d on core %d task %d, but no core/task transmitting on that port\n", l3 ? "l3":"ndp", rx_port_id, lconf->id, targ->id); } // If the L3/NDP sub_mode transmits to a port, check that there is at least one core/task // receiving from that port in L3/NDP sub_mode. if ((port_id = targ->tx_port_queue[0].port) == OUT_DISCARD) continue; rx_lconf = NULL; ok = 0; plog_info("\tCore %d task %d transmitting to port %d in %s submode\n", lconf->id, targ->id, port_id, l3 ? "l3":"ndp"); while (core_targ_next(&rx_lconf, &rx_targ, 0) == 0) { for (uint8_t i = 0; i < rx_targ->nb_rxports; ++i) { rx_port_id = rx_targ->rx_port_queue[i].port; if ((rx_port_id == port_id) && ( ((rx_targ->flags & TASK_ARG_L3) && l3) || ((rx_targ->flags & TASK_ARG_NDP) && ndp) ) ){ ok = 1; break; } } if (ok == 1) { plog_info("\tCore %d task %d has found core %d task %d receiving from port %d in %s submode\n", lconf->id, targ->id, rx_lconf->id, rx_targ->id, port_id, ((rx_targ->flags & TASK_ARG_L3) && l3) ? "l3":"ndp"); break; } } PROX_PANIC(ok == 0, "%s sub mode for port %d on core %d task %d, but no core/task receiving on that port\n", l3 ? "l3":"ndp", port_id, lconf->id, targ->id); } } static void check_cfg_consistent(void) { check_nb_mbuf(); check_missing_rx(); check_zero_rx(); check_mixed_normal_pipeline(); } static void plog_all_rings(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; while (core_targ_next(&lconf, &targ, 0) == 0) { for (uint8_t ring_idx = 0; ring_idx < targ->nb_rxrings; ++ring_idx) { plog_info("\tCore %u, task %u, rx_ring[%u] %p\n", lconf->id, targ->id, ring_idx, targ->rx_rings[ring_idx]); } } } static int chain_flag_state(struct task_args *targ, uint64_t flag, int is_set) { if (task_init_flag_set(targ->task_init, flag) == is_set) return 1; int ret = 0; for (uint32_t i = 0; i < targ->n_prev_tasks; ++i) { ret = chain_flag_state(targ->prev_tasks[i], flag, is_set); if (ret) return 1; } return 0; } static int chain_flag_always_set(struct task_args *targ, uint64_t flag) { return (!chain_flag_state(targ, flag, 0)); } static int chain_flag_never_set(struct task_args *targ, uint64_t flag) { return (!chain_flag_state(targ, flag, 1)); } static int chain_flag_sometimes_set(struct task_args *targ, uint64_t flag) { return (chain_flag_state(targ, flag, 1)); } static void configure_if_tx_queues(struct task_args *targ, uint8_t socket) { uint8_t if_port; for (uint8_t i = 0; i < targ->nb_txports; ++i) { if_port = targ->tx_port_queue[i].port; PROX_PANIC(if_port == OUT_DISCARD, "port misconfigured, exiting\n"); PROX_PANIC(!prox_port_cfg[if_port].active, "\tPort %u not used, skipping...\n", if_port); int dsocket = prox_port_cfg[if_port].socket; if (dsocket != -1 && dsocket != socket) { plog_warn("TX core on socket %d while device on socket %d\n", socket, dsocket); } if (prox_port_cfg[if_port].tx_ring[0] == '\0') { // Rings-backed port can use single queue targ->tx_port_queue[i].queue = prox_port_cfg[if_port].n_txq; prox_port_cfg[if_port].n_txq++; } else { prox_port_cfg[if_port].n_txq = 1; targ->tx_port_queue[i].queue = 0; } /* By default OFFLOAD is enabled, but if the whole chain has NOOFFLOADS set all the way until the first task that receives from a port, it will be disabled for the destination port. */ #if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1) if (chain_flag_always_set(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS)) { prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOOFFLOADS; } #else if (chain_flag_always_set(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS)) { prox_port_cfg[if_port].requested_tx_offload &= ~(DEV_TX_OFFLOAD_IPV4_CKSUM | DEV_TX_OFFLOAD_UDP_CKSUM); } #endif } } static void configure_if_rx_queues(struct task_args *targ, uint8_t socket) { struct prox_port_cfg *port; for (int i = 0; i < targ->nb_rxports; i++) { uint8_t if_port = targ->rx_port_queue[i].port; if (if_port == OUT_DISCARD) { return; } port = &prox_port_cfg[if_port]; PROX_PANIC(!port->active, "Port %u not used, aborting...\n", if_port); if(port->rx_ring[0] != '\0') { port->n_rxq = 0; } // If the mbuf size (of the rx task) is not big enough, we might receive multiple segments // This is usually the case when setting a big mtu size i.e. enabling jumbo frames. // If the packets get transmitted, then multi segments will have to be enabled on the TX port uint16_t max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE; if (max_frame_size + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM > targ->mbuf_size) { targ->task_init->flag_features |= TASK_FEATURE_TXQ_FLAGS_MULTSEGS; } targ->rx_port_queue[i].queue = port->n_rxq; port->pool[targ->rx_port_queue[i].queue] = targ->pool; port->pool_size[targ->rx_port_queue[i].queue] = targ->nb_mbuf - 1; port->n_rxq++; int dsocket = port->socket; if (dsocket != -1 && dsocket != socket) { plog_warn("RX core on socket %d while device on socket %d\n", socket, dsocket); } } } static void configure_if_queues(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; uint8_t socket; while (core_targ_next(&lconf, &targ, 0) == 0) { socket = rte_lcore_to_socket_id(lconf->id); configure_if_rx_queues(targ, socket); configure_if_tx_queues(targ, socket); } } static void configure_tx_queue_flags(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; uint8_t socket; uint8_t if_port; while (core_targ_next(&lconf, &targ, 0) == 0) { socket = rte_lcore_to_socket_id(lconf->id); for (uint8_t i = 0; i < targ->nb_txports; ++i) { if_port = targ->tx_port_queue[i].port; #if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1) /* Set the ETH_TXQ_FLAGS_NOREFCOUNT flag if none of the tasks up to the task transmitting to the port use refcnt. */ if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT)) { prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOREFCOUNT; } #else /* Set the DEV_TX_OFFLOAD_MBUF_FAST_FREE flag if none of the tasks up to the task transmitting to the port use refcnt and per-queue all mbufs comes from the same mempool. */ if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT)) { if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL)) prox_port_cfg[if_port].requested_tx_offload |= DEV_TX_OFFLOAD_MBUF_FAST_FREE; } #endif } } } static void configure_multi_segments(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; uint8_t if_port; while (core_targ_next(&lconf, &targ, 0) == 0) { for (uint8_t i = 0; i < targ->nb_txports; ++i) { if_port = targ->tx_port_queue[i].port; // Multi segment is disabled for most tasks. It is only enabled for tasks requiring big packets. #if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1) // We can only enable "no multi segment" if no such task exists in the chain of tasks. if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTSEGS)) { prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS; } #else // We enable "multi segment" if at least one task requires it in the chain of tasks. if (chain_flag_sometimes_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTSEGS)) { prox_port_cfg[if_port].requested_tx_offload |= DEV_TX_OFFLOAD_MULTI_SEGS; } #endif } } } static const char *gen_ring_name(void) { static char retval[] = "XX"; static const char* ring_names = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "[\\]^_`!\"#$%&'()*+,-./:;<=" ">?@{|}0123456789"; static int idx2 = 0; int idx = idx2; retval[0] = ring_names[idx % strlen(ring_names)]; idx /= strlen(ring_names); retval[1] = idx ? ring_names[(idx - 1) % strlen(ring_names)] : 0; idx2++; return retval; } struct ring_init_stats { uint32_t n_pkt_rings; uint32_t n_ctrl_rings; uint32_t n_opt_rings; }; static uint32_t ring_init_stats_total(const struct ring_init_stats *ris) { return ris->n_pkt_rings + ris->n_ctrl_rings + ris->n_opt_rings; } static uint32_t count_incoming_tasks(uint32_t lcore_worker, uint32_t dest_task) { struct lcore_cfg *lconf = NULL; struct task_args *targ; uint32_t ret = 0; struct core_task ct; while (core_targ_next(&lconf, &targ, 0) == 0) { for (uint8_t idxx = 0; idxx < MAX_PROTOCOLS; ++idxx) { for (uint8_t ridx = 0; ridx < targ->core_task_set[idxx].n_elems; ++ridx) { ct = targ->core_task_set[idxx].core_task[ridx]; if (dest_task == ct.task && lcore_worker == ct.core) ret++; } } } return ret; } static struct rte_ring *get_existing_ring(uint32_t lcore_id, uint32_t task_id) { if (!prox_core_active(lcore_id, 0)) return NULL; struct lcore_cfg *lconf = &lcore_cfg[lcore_id]; if (task_id >= lconf->n_tasks_all) return NULL; if (lconf->targs[task_id].nb_rxrings == 0) return NULL; return lconf->targs[task_id].rx_rings[0]; } static struct rte_ring *init_ring_between_tasks(struct lcore_cfg *lconf, struct task_args *starg, const struct core_task ct, uint8_t ring_idx, int idx, struct ring_init_stats *ris) { uint8_t socket; struct rte_ring *ring = NULL; struct lcore_cfg *lworker; struct task_args *dtarg; PROX_ASSERT(prox_core_active(ct.core, 0)); lworker = &lcore_cfg[ct.core]; /* socket used is the one that the sending core resides on */ socket = rte_lcore_to_socket_id(lconf->id); plog_info("\t\tCreating ring on socket %u with size %u\n" "\t\t\tsource core, task and socket = %u, %u, %u\n" "\t\t\tdestination core, task and socket = %u, %u, %u\n" "\t\t\tdestination worker id = %u\n", socket, starg->ring_size, lconf->id, starg->id, socket, ct.core, ct.task, rte_lcore_to_socket_id(ct.core), ring_idx); if (ct.type) { struct rte_ring **dring = NULL; if (ct.type == CTRL_TYPE_MSG) dring = &lworker->ctrl_rings_m[ct.task]; else if (ct.type == CTRL_TYPE_PKT) { dring = &lworker->ctrl_rings_p[ct.task]; starg->flags |= TASK_ARG_CTRL_RINGS_P; } if (*dring == NULL) ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ); else ring = *dring; PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core); starg->tx_rings[starg->tot_n_txrings_inited] = ring; starg->tot_n_txrings_inited++; *dring = ring; if (lconf->id == prox_cfg.master) { ctrl_rings[ct.core*MAX_TASKS_PER_CORE + ct.task] = ring; } else if (ct.core == prox_cfg.master) { starg->ctrl_plane_ring = ring; } plog_info("\t\t\tCore %u task %u to -> core %u task %u ctrl_ring %s %p %s\n", lconf->id, starg->id, ct.core, ct.task, ct.type == CTRL_TYPE_PKT? "pkt" : "msg", ring, ring->name); ris->n_ctrl_rings++; return ring; } dtarg = &lworker->targs[ct.task]; lworker->targs[ct.task].worker_thread_id = ring_idx; PROX_ASSERT(dtarg->flags & TASK_ARG_RX_RING); PROX_ASSERT(ct.task < lworker->n_tasks_all); /* If all the following conditions are met, the ring can be optimized away. */ if (!task_is_master(starg) && !task_is_master(dtarg) && starg->lconf->id == dtarg->lconf->id && starg->nb_txrings == 1 && idx == 0 && dtarg->task && dtarg->tot_rxrings == 1 && starg->task == dtarg->task - 1) { plog_info("\t\tOptimizing away ring on core %u from task %u to task %u\n", dtarg->lconf->id, starg->task, dtarg->task); /* No need to set up ws_mbuf. */ starg->tx_opt_ring = 1; /* During init of destination task, the buffer in the source task will be initialized. */ dtarg->tx_opt_ring_task = starg; ris->n_opt_rings++; ++dtarg->nb_rxrings; return NULL; } int ring_created = 1; /* Only create multi-producer rings if configured to do so AND there is only one task sending to the task */ if ((prox_cfg.flags & DSF_MP_RINGS && count_incoming_tasks(ct.core, ct.task) > 1) || (prox_cfg.flags & DSF_ENABLE_BYPASS)) { ring = get_existing_ring(ct.core, ct.task); if (ring) { plog_info("\t\tCore %u task %u creatign MP ring %p to core %u task %u\n", lconf->id, starg->id, ring, ct.core, ct.task); ring_created = 0; } else { ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ); plog_info("\t\tCore %u task %u using MP ring %p from core %u task %u\n", lconf->id, starg->id, ring, ct.core, ct.task); } } else ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SP_ENQ | RING_F_SC_DEQ); PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core); starg->tx_rings[starg->tot_n_txrings_inited] = ring; starg->tot_n_txrings_inited++; if (ring_created) { PROX_ASSERT(dtarg->nb_rxrings < MAX_RINGS_PER_TASK); dtarg->rx_rings[dtarg->nb_rxrings] = ring; ++dtarg->nb_rxrings; if (dtarg->nb_rxrings > 1) dtarg->task_init->flag_features |= TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL; } dtarg->nb_slave_threads = starg->core_task_set[idx].n_elems; dtarg->lb_friend_core = lconf->id; dtarg->lb_friend_task = starg->id; plog_info("\t\tWorker thread %d has core %d, task %d as a lb friend\n", ct.core, lconf->id, starg->id); plog_info("\t\tCore %u task %u tx_ring[%u] -> core %u task %u rx_ring[%u] %p %s %u WT\n", lconf->id, starg->id, ring_idx, ct.core, ct.task, dtarg->nb_rxrings, ring, ring->name, dtarg->nb_slave_threads); ++ris->n_pkt_rings; return ring; } static void init_rings(void) { struct lcore_cfg *lconf = NULL; struct task_args *starg; struct ring_init_stats ris = {0}; while (core_targ_next(&lconf, &starg, 1) == 0) { plog_info("\t*** Initializing rings on core %u, task %u ***\n", lconf->id, starg->id); for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) { for (uint8_t ring_idx = 0; ring_idx < starg->core_task_set[idx].n_elems; ++ring_idx) { PROX_ASSERT(ring_idx < MAX_WT_PER_LB); PROX_ASSERT(starg->tot_n_txrings_inited < MAX_RINGS_PER_TASK); struct core_task ct = starg->core_task_set[idx].core_task[ring_idx]; init_ring_between_tasks(lconf, starg, ct, ring_idx, idx, &ris); } } } plog_info("\tInitialized %d rings:\n" "\t\tNumber of packet rings: %u\n" "\t\tNumber of control rings: %u\n" "\t\tNumber of optimized rings: %u\n", ring_init_stats_total(&ris), ris.n_pkt_rings, ris.n_ctrl_rings, ris.n_opt_rings); lconf = NULL; struct prox_port_cfg *port; while (core_targ_next(&lconf, &starg, 1) == 0) { if ((starg->task_init) && (starg->flags & (TASK_ARG_L3|TASK_ARG_NDP))) { struct core_task ct; ct.core = prox_cfg.master; ct.task = 0; ct.type = CTRL_TYPE_PKT; struct rte_ring *rx_ring = init_ring_between_tasks(lconf, starg, ct, 0, 0, &ris); ct.core = lconf->id; ct.task = starg->id;; struct rte_ring *tx_ring = init_ring_between_tasks(&lcore_cfg[prox_cfg.master], lcore_cfg[prox_cfg.master].targs, ct, 0, 0, &ris); } } } static void shuffle_mempool(struct rte_mempool* mempool, uint32_t nb_mbuf) { struct rte_mbuf** pkts = prox_zmalloc(nb_mbuf * sizeof(*pkts), rte_socket_id()); uint64_t got = 0; while ((got < nb_mbuf) && (rte_mempool_get_bulk(mempool, (void**)(pkts + got), 1) == 0)) ++got; nb_mbuf = got; while (got) { int idx; do { idx = rand() % nb_mbuf; } while (pkts[idx] == 0); rte_mempool_put_bulk(mempool, (void**)&pkts[idx], 1); pkts[idx] = 0; --got; }; prox_free(pkts); } static void set_mbuf_size(struct task_args *targ) { /* mbuf size can be set * - from config file (highest priority, overwriting any other config) - should only be used as workaround * - defaulted to MBUF_SIZE. * Except if set explicitely, ensure that size is big enough for vmxnet3 driver */ if (targ->mbuf_size) return; targ->mbuf_size = MBUF_SIZE; struct prox_port_cfg *port; uint16_t max_frame_size = 0, min_buffer_size = 0; int i40e = 0; for (int i = 0; i < targ->nb_rxports; i++) { uint8_t if_port = targ->rx_port_queue[i].port; if (if_port == OUT_DISCARD) { continue; } port = &prox_port_cfg[if_port]; if (max_frame_size < port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE) max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE; if (min_buffer_size < port->min_rx_bufsize) min_buffer_size = port->min_rx_bufsize; // Check whether we receive from i40e. This driver have extra mbuf size requirements if (strcmp(port->short_name, "i40e") == 0) i40e = 1; } if (i40e) { // i40e supports a maximum of 5 descriptors chained uint16_t required_mbuf_size = RTE_ALIGN(max_frame_size / 5, 128) + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; if (required_mbuf_size > targ->mbuf_size) { targ->mbuf_size = required_mbuf_size; plog_info("\t\tSetting mbuf_size to %u to support frame_size %u\n", targ->mbuf_size, max_frame_size); } } if (min_buffer_size > targ->mbuf_size) { plog_warn("Mbuf size might be too small. This might result in packet segmentation and memory leak\n"); } } static void setup_mempools_unique_per_socket(void) { uint32_t flags = 0; char name[64]; struct lcore_cfg *lconf = NULL; struct task_args *targ; struct rte_mempool *pool[MAX_SOCKETS]; uint32_t mbuf_count[MAX_SOCKETS] = {0}; uint32_t nb_cache_mbuf[MAX_SOCKETS] = {0}; uint32_t mbuf_size[MAX_SOCKETS] = {0}; while (core_targ_next_early(&lconf, &targ, 0) == 0) { PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id); uint8_t socket = rte_lcore_to_socket_id(lconf->id); PROX_ASSERT(socket < MAX_SOCKETS); set_mbuf_size(targ); if (targ->rx_port_queue[0].port != OUT_DISCARD) { struct prox_port_cfg* port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port]; PROX_ASSERT(targ->nb_mbuf != 0); mbuf_count[socket] += targ->nb_mbuf; if (nb_cache_mbuf[socket] == 0) nb_cache_mbuf[socket] = targ->nb_cache_mbuf; else { PROX_PANIC(nb_cache_mbuf[socket] != targ->nb_cache_mbuf, "all mbuf_cache must have the same size if using a unique mempool per socket\n"); } if (mbuf_size[socket] == 0) mbuf_size[socket] = targ->mbuf_size; else { PROX_PANIC(mbuf_size[socket] != targ->mbuf_size, "all mbuf_size must have the same size if using a unique mempool per socket\n"); } } } for (int i = 0 ; i < MAX_SOCKETS; i++) { if (mbuf_count[i] != 0) { sprintf(name, "socket_%u_pool", i); if ((pool[i] = rte_mempool_lookup(name)) == NULL) { pool[i] = rte_mempool_create(name, mbuf_count[i] - 1, mbuf_size[i], nb_cache_mbuf[i], sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, NULL, prox_pktmbuf_init, NULL, i, flags); PROX_PANIC(pool[i] == NULL, "\t\tError: cannot create mempool for socket %u\n", i); plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", pool[i], mbuf_count[i], mbuf_size[i], nb_cache_mbuf[i], i); if (prox_cfg.flags & DSF_SHUFFLE) { shuffle_mempool(pool[i], mbuf_count[i]); } } } } lconf = NULL; while (core_targ_next_early(&lconf, &targ, 0) == 0) { uint8_t socket = rte_lcore_to_socket_id(lconf->id); if (targ->rx_port_queue[0].port != OUT_DISCARD) { /* use this pool for the interface that the core is receiving from */ /* If one core receives from multiple ports, all the ports use the same mempool */ targ->pool = pool[socket]; /* Set the number of mbuf to the number of the unique mempool, so that the used and free work */ targ->nb_mbuf = mbuf_count[socket]; plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool, targ->nb_mbuf, mbuf_size[socket], targ->nb_cache_mbuf, socket); } } } static void setup_mempool_for_rx_task(struct lcore_cfg *lconf, struct task_args *targ) { const uint8_t socket = rte_lcore_to_socket_id(lconf->id); struct prox_port_cfg *port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port]; const struct rte_memzone *mz; struct rte_mempool *mp = NULL; uint32_t flags = 0; char memzone_name[64]; char name[64]; set_mbuf_size(targ); /* allocate memory pool for packets */ PROX_ASSERT(targ->nb_mbuf != 0); if (targ->pool_name[0] == '\0') { sprintf(name, "core_%u_task_%u_pool", lconf->id, targ->id); } snprintf(memzone_name, sizeof(memzone_name)-1, "MP_%s", targ->pool_name); mz = rte_memzone_lookup(memzone_name); if (mz != NULL) { mp = (struct rte_mempool*)mz->addr; targ->nb_mbuf = mp->size; targ->pool = mp; } #ifdef RTE_LIBRTE_IVSHMEM_FALSE if (mz != NULL && mp != NULL && mp->phys_addr != mz->ioremap_addr) { /* Init mbufs with ioremap_addr for dma */ mp->phys_addr = mz->ioremap_addr; mp->elt_pa[0] = mp->phys_addr + (mp->elt_va_start - (uintptr_t)mp); struct prox_pktmbuf_reinit_args init_args; init_args.mp = mp; init_args.lconf = lconf; uint32_t elt_sz = mp->elt_size + mp->header_size + mp->trailer_size; rte_mempool_obj_iter((void*)mp->elt_va_start, mp->size, elt_sz, 1, mp->elt_pa, mp->pg_num, mp->pg_shift, prox_pktmbuf_reinit, &init_args); } #endif /* Use this pool for the interface that the core is receiving from if one core receives from multiple ports, all the ports use the same mempool */ if (targ->pool == NULL) { plog_info("\tCreating mempool with name '%s' on socket %d\n", name, socket); targ->pool = rte_mempool_create(name, targ->nb_mbuf - 1, targ->mbuf_size, targ->nb_cache_mbuf, sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, NULL, prox_pktmbuf_init, lconf, socket, flags); } PROX_PANIC(targ->pool == NULL, "\tError: cannot create mempool for core %u port %u: %s\n", lconf->id, targ->id, rte_strerror(rte_errno)); plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool, targ->nb_mbuf, targ->mbuf_size, targ->nb_cache_mbuf, socket); if (prox_cfg.flags & DSF_SHUFFLE) { shuffle_mempool(targ->pool, targ->nb_mbuf); } } static void setup_mempools_multiple_per_socket(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; while (core_targ_next_early(&lconf, &targ, 0) == 0) { PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id); if (targ->rx_port_queue[0].port == OUT_DISCARD) continue; setup_mempool_for_rx_task(lconf, targ); } } static void setup_mempools(void) { if (prox_cfg.flags & UNIQUE_MEMPOOL_PER_SOCKET) setup_mempools_unique_per_socket(); else setup_mempools_multiple_per_socket(); } static void set_task_lconf(void) { struct lcore_cfg *lconf; uint32_t lcore_id = -1; while(prox_core_next(&lcore_id, 1) == 0) { lconf = &lcore_cfg[lcore_id]; for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { lconf->targs[task_id].lconf = lconf; } } } static void set_dest_threads(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; while (core_targ_next(&lconf, &targ, 0) == 0) { for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) { for (uint8_t ring_idx = 0; ring_idx < targ->core_task_set[idx].n_elems; ++ring_idx) { struct core_task ct = targ->core_task_set[idx].core_task[ring_idx]; struct task_args *dest_task = core_targ_get(ct.core, ct.task); dest_task->prev_tasks[dest_task->n_prev_tasks++] = targ; } } } } static void setup_all_task_structs_early_init(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; plog_info("\t*** Calling early init on all tasks ***\n"); while (core_targ_next(&lconf, &targ, 0) == 0) { if (targ->task_init->early_init) { targ->task_init->early_init(targ); } } } static void setup_all_task_structs(void) { struct lcore_cfg *lconf; uint32_t lcore_id = -1; struct task_base *tmaster = NULL; while(prox_core_next(&lcore_id, 1) == 0) { lconf = &lcore_cfg[lcore_id]; for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { if (task_is_master(&lconf->targs[task_id])) { plog_info("\tInitializing MASTER struct for core %d task %d\n", lcore_id, task_id); lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]); tmaster = lconf->tasks_all[task_id]; } } } PROX_PANIC(tmaster == NULL, "Can't initialize master task\n"); lcore_id = -1; while(prox_core_next(&lcore_id, 1) == 0) { lconf = &lcore_cfg[lcore_id]; plog_info("\tInitializing struct for core %d with %d task\n", lcore_id, lconf->n_tasks_all); for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) { if (!task_is_master(&lconf->targs[task_id])) { plog_info("\tInitializing struct for core %d task %d\n", lcore_id, task_id); lconf->targs[task_id].tmaster = tmaster; lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]); } } } } static void init_port_activate(void) { struct lcore_cfg *lconf = NULL; struct task_args *targ; uint8_t port_id = 0; while (core_targ_next_early(&lconf, &targ, 0) == 0) { for (int i = 0; i < targ->nb_rxports; i++) { port_id = targ->rx_port_queue[i].port; prox_port_cfg[port_id].active = 1; } for (int i = 0; i < targ->nb_txports; i++) { port_id = targ->tx_port_queue[i].port; prox_port_cfg[port_id].active = 1; } } } /* Initialize cores and allocate mempools */ static void init_lcores(void) { struct lcore_cfg *lconf = 0; uint32_t lcore_id = -1; while(prox_core_next(&lcore_id, 0) == 0) { uint8_t socket = rte_lcore_to_socket_id(lcore_id); PROX_PANIC(socket + 1 > MAX_SOCKETS, "Can't configure core %u (on socket %u). MAX_SOCKET is set to %d\n", lcore_id, socket, MAX_SOCKETS); } /* need to allocate mempools as the first thing to use the lowest possible address range */ plog_info("=== Initializing mempools ===\n"); setup_mempools(); lcore_cfg_alloc_hp(); set_dest_threads(); set_task_lconf(); plog_info("=== Initializing port addresses ===\n"); init_port_addr(); plog_info("=== Initializing queue numbers on cores ===\n"); configure_if_queues(); plog_info("=== Initializing rings on cores ===\n"); init_rings(); configure_multi_segments(); configure_tx_queue_flags(); plog_info("=== Checking configuration consistency ===\n"); check_cfg_consistent(); plog_all_rings(); } static int setup_prox(int argc, char **argv) { if (prox_read_config_file() != 0 || prox_setup_rte(argv[0]) != 0) { return -1; } if (prox_cfg.flags & DSF_CHECK_SYNTAX) { plog_info("=== Configuration file syntax has been checked ===\n\n"); exit(EXIT_SUCCESS); } init_port_activate(); plog_info("=== Initializing rte devices ===\n"); if (!(prox_cfg.flags & DSF_USE_DUMMY_DEVICES)) init_rte_ring_dev(); init_rte_dev(prox_cfg.flags & DSF_USE_DUMMY_DEVICES); plog_info("=== Calibrating TSC overhead ===\n"); clock_init(); plog_info("\tTSC running at %"PRIu64" Hz\n", rte_get_tsc_hz()); init_lcores(); plog_info("=== Initializing ports ===\n"); init_port_all(); setup_all_task_structs_early_init(); plog_info("=== Initializing tasks ===\n"); setup_all_task_structs(); if (prox_cfg.logbuf_size) { prox_cfg.logbuf = prox_zmalloc(prox_cfg.logbuf_size, rte_socket_id()); PROX_PANIC(prox_cfg.logbuf == NULL, "Failed to allocate memory for logbuf with size = %d\n", prox_cfg.logbuf_size); } if (prox_cfg.flags & DSF_CHECK_INIT) { plog_info("=== Initialization sequence completed ===\n\n"); exit(EXIT_SUCCESS); } /* Current way that works to disable DPDK logging */ FILE *f = fopen("/dev/null", "r"); rte_openlog_stream(f); plog_info("=== PROX started ===\n"); return 0; } static int success = 0; static void siguser_handler(int signal) { if (signal == SIGUSR1) success = 1; else success = 0; } static void sigabrt_handler(__attribute__((unused)) int signum) { /* restore default disposition for SIGABRT and SIGPIPE */ signal(SIGABRT, SIG_DFL); signal(SIGPIPE, SIG_DFL); /* ignore further Ctrl-C */ signal(SIGINT, SIG_IGN); /* more drastic exit on tedious termination signal */ plog_info("Aborting...\n"); if (lcore_cfg != NULL) { uint32_t lcore_id; pthread_t thread_id, tid0, tid = pthread_self(); memset(&tid0, 0, sizeof(tid0)); /* cancel all threads except current one */ lcore_id = -1; while (prox_core_next(&lcore_id, 1) == 0) { thread_id = lcore_cfg[lcore_id].thread_id; if (pthread_equal(thread_id, tid0)) continue; if (pthread_equal(thread_id, tid)) continue; pthread_cancel(thread_id); } /* wait for cancelled threads to terminate */ lcore_id = -1; while (prox_core_next(&lcore_id, 1) == 0) { thread_id = lcore_cfg[lcore_id].thread_id; if (pthread_equal(thread_id, tid0)) continue; if (pthread_equal(thread_id, tid)) continue; pthread_join(thread_id, NULL); } } /* close ncurses */ display_end(); /* close ports on termination signal */ close_ports_atexit(); /* terminate now */ abort(); } static void sigterm_handler(int signum) { /* abort on second Ctrl-C */ if (signum == SIGINT) signal(SIGINT, sigabrt_handler); /* gracefully quit on harmless termination signal */ /* ports will subsequently get closed at resulting exit */ quit(); } static void set_term_env(void) { static const char var[] = "TERM"; static char str[] = "TERM=putty"; char *old_value, *new_value; int max_ver = 0, min_ver = 0, n; old_value = getenv(var); const char *ncurses_version = curses_version(); n = sscanf(ncurses_version, "ncurses %d.%d", &max_ver, &min_ver); if (n != 2) { plog_info("\tUnable to extract ncurses version from %s. TERM left unchanged to %s\n", ncurses_version, old_value); return; } else { plog_info("\tncurses version = %d.%d (%s)\n", max_ver, min_ver, ncurses_version); } if (((max_ver > 6) || ((max_ver == 6) && (min_ver >= 1))) && (strcmp(old_value, "xterm") == 0)) { // On recent OSes such as RHEL 8.0, ncurses(6.1) introduced support // for ECMA-48 repeat character control. // Some terminal emulators use TERM=xterm but do not support this feature. // In this case, printing repeating character such as "22000000 Hz" might // display as 220 Hz. // Other emulattors, such as tmux, use TERM=screen, and do not exhibit the issue. plog_info("\tChanged TERM from %s ", old_value); putenv(str); new_value = getenv(var); plog_info("to %s\n", new_value); } else { plog_info("\tTERM left unchanged to %s\n", old_value); } } int main(int argc, char **argv) { /* set en_US locale to print big numbers with ',' */ setlocale(LC_NUMERIC, "en_US.utf-8"); if (prox_parse_args(argc, argv) != 0){ prox_usage(argv[0]); } plog_init(prox_cfg.log_name, prox_cfg.log_name_pid); plog_info("=== " PROGRAM_NAME " %s ===\n", VERSION_STR()); plog_info("\tUsing DPDK %s\n", rte_version() + sizeof(RTE_VER_PREFIX)); set_term_env(); read_rdt_info(); if (prox_cfg.flags & DSF_LIST_TASK_MODES) { /* list supported task modes and exit */ tasks_list(); return EXIT_SUCCESS; } /* close ports at normal exit */ atexit(close_ports_atexit); /* gracefully quit on harmless termination signals */ signal(SIGHUP, sigterm_handler); signal(SIGINT, sigterm_handler); signal(SIGQUIT, sigterm_handler); signal(SIGTERM, sigterm_handler); signal(SIGUSR1, sigterm_handler); signal(SIGUSR2, sigterm_handler); /* more drastic exit on tedious termination signals */ signal(SIGABRT, sigabrt_handler); signal(SIGPIPE, sigabrt_handler); if (prox_cfg.flags & DSF_DAEMON) { signal(SIGUSR1, siguser_handler); signal(SIGUSR2, siguser_handler); plog_info("=== Running in Daemon mode ===\n"); plog_info("\tForking child and waiting for setup completion\n"); pid_t ppid = getpid(); pid_t pid = fork(); if (pid < 0) { plog_err("Failed to fork process to run in daemon mode\n"); return EXIT_FAILURE; } if (pid == 0) { fclose(stdin); fclose(stdout); fclose(stderr); if (setsid() < 0) { kill(ppid, SIGUSR2); return EXIT_FAILURE; } if (setup_prox(argc, argv) != 0) { kill(ppid, SIGUSR2); return EXIT_FAILURE; } else { kill(ppid, SIGUSR1); run(prox_cfg.flags); return EXIT_SUCCESS; } } else { /* Before exiting the parent, wait until the child process has finished setting up */ pause(); if (prox_cfg.logbuf) { file_print(prox_cfg.logbuf); } return success? EXIT_SUCCESS : EXIT_FAILURE; } } if (setup_prox(argc, argv) != 0) return EXIT_FAILURE; run(prox_cfg.flags); return EXIT_SUCCESS; }