summaryrefslogtreecommitdiffstats
path: root/qemu/tests/qemu-iotests/059
blob: 0ded0c3da4fb0d81dc243e3339d693cd2ddf84a0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/bin/bash
#
# Test case for vmdk
#
# Copyright (C) 2013 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

# creator
owner=famz@redhat.com

seq=`basename $0`
echo "QA output created by $seq"

here=`pwd`
tmp=/tmp/$$
status=1	# failure is the default!

_cleanup()
{
	_cleanup_test_img
}
trap "_cleanup; exit \$status" 0 1 2 3 15

# get standard environment, filters and checks
. ./common.rc
. ./common.filter

# This tests vmdk-specific low-level functionality
_supported_fmt vmdk
_supported_proto file
_supported_os Linux
_unsupported_imgopts "subformat=monolithicFlat" \
                     "subformat=twoGbMaxExtentFlat" \
                     "subformat=twoGbMaxExtentSparse"

capacity_offset=16
granularity_offset=20
grain_table_size_offset=44

echo
echo "=== Testing invalid granularity ==="
_make_test_img 64M
poke_file "$TEST_IMG" "$granularity_offset" "\xff\xff\xff\xff\xff\xff\xff\xff"
{ $QEMU_IO -c "read 0 512" "$TEST_IMG"; } 2>&1 | _filter_qemu_io | _filter_testdir

echo
echo "=== Testing too big L2 table size ==="
_make_test_img 64M
poke_file "$TEST_IMG" "$grain_table_size_offset" "\xff\xff\xff\xff"
{ $QEMU_IO -c "read 0 512" "$TEST_IMG"; } 2>&1 | _filter_qemu_io | _filter_testdir

echo
echo "=== Testing too big L1 table size ==="
_make_test_img 64M
poke_file "$TEST_IMG" "$capacity_offset" "\xff\xff\xff\xff"
poke_file "$TEST_IMG" "$grain_table_size_offset" "\x01\x00\x00\x00"
{ $QEMU_IO -c "read 0 512" "$TEST_IMG"; } 2>&1 | _filter_qemu_io | _filter_testdir

echo
echo "=== Testing monolithicFlat creation and opening ==="
IMGOPTS="subformat=monolithicFlat" _make_test_img 2G
_img_info

echo
echo "=== Testing monolithicFlat with zeroed_grain ==="
IMGOPTS="subformat=monolithicFlat,zeroed_grain=on" _make_test_img 2G

echo
echo "=== Testing big twoGbMaxExtentFlat ==="
IMGOPTS="subformat=twoGbMaxExtentFlat" _make_test_img 1000G
$QEMU_IMG info $TEST_IMG | _filter_testdir | sed -e 's/cid: [0-9]*/cid: XXXXXXXX/'

echo
echo "=== Testing malformed VMFS extent description line ==="
cat >"$TEST_IMG" <<EOF
# Disk DescriptorFile
version=1
CID=58ab4847
parentCID=ffffffff
createType="vmfs"

# Extent description
RW 12582912 VMFS "dummy.vmdk" 1
EOF
_img_info

echo
echo "=== Testing truncated sparse ==="
IMGOPTS="subformat=monolithicSparse" _make_test_img 100G
truncate -s 10M $TEST_IMG
_img_info

echo
echo "=== Converting to streamOptimized from image with small cluster size==="
TEST_IMG="$TEST_IMG.qcow2" IMGFMT=qcow2 IMGOPTS="cluster_size=4096" _make_test_img 1G
$QEMU_IO -f qcow2 -c "write -P 0xa 0 512" "$TEST_IMG.qcow2" | _filter_qemu_io
$QEMU_IO -f qcow2 -c "write -P 0xb 10240 512" "$TEST_IMG.qcow2" | _filter_qemu_io
$QEMU_IMG convert -f qcow2 -O vmdk -o subformat=streamOptimized "$TEST_IMG.qcow2" "$TEST_IMG" 2>&1

echo
echo "=== Testing monolithicFlat with internally generated JSON file name ==="
IMGOPTS="subformat=monolithicFlat" _make_test_img 64M
$QEMU_IO -c "open -o driver=$IMGFMT,file.driver=blkdebug,file.image.filename=$TEST_IMG,file.inject-error.0.event=read_aio" 2>&1 \
    | _filter_testdir | _filter_imgfmt

echo
echo "=== Testing version 3 ==="
_use_sample_img iotest-version3.vmdk.bz2
_img_info
for i in {0..99}; do
    $QEMU_IO -r -c "read -P $(( i % 10 + 0x30 )) $(( i * 64 * 1024 * 10 + i * 512 )) 512" $TEST_IMG \
        | _filter_qemu_io
done

echo
echo "=== Testing 4TB monolithicFlat creation and IO ==="
IMGOPTS="subformat=monolithicFlat" _make_test_img 4T
_img_info
$QEMU_IO -c "write -P 0xa 900G 512" "$TEST_IMG" | _filter_qemu_io
$QEMU_IO -c "read -v 900G 1024" "$TEST_IMG" | _filter_qemu_io

echo
echo "=== Testing afl image with a very large capacity ==="
_use_sample_img afl9.vmdk.bz2
_img_info

# success, all done
echo "*** done"
rm -f $seq.full
status=0
a id='n817' href='#n817'>817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292
/*
// Copyright (c) 2010-2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
*/

#include <string.h>
#include <locale.h>
#include <unistd.h>
#include <signal.h>
#include <curses.h>

#include <rte_cycles.h>
#include <rte_atomic.h>
#include <rte_table_hash.h>
#include <rte_memzone.h>
#include <rte_errno.h>

#include "prox_malloc.h"
#include "run.h"
#include "main.h"
#include "log.h"
#include "quit.h"
#include "clock.h"
#include "defines.h"
#include "version.h"
#include "prox_args.h"
#include "prox_assert.h"
#include "prox_cfg.h"
#include "prox_shared.h"
#include "prox_port_cfg.h"
#include "toeplitz.h"
#include "hash_utils.h"
#include "handle_lb_net.h"
#include "prox_cksum.h"
#include "thread_nop.h"
#include "thread_generic.h"
#include "thread_pipeline.h"
#include "cqm.h"
#include "handle_master.h"

#if RTE_VERSION < RTE_VERSION_NUM(1,8,0,0)
#define RTE_CACHE_LINE_SIZE CACHE_LINE_SIZE
#endif

uint8_t lb_nb_txrings = 0xff;
extern const char *git_version;
struct rte_ring *ctrl_rings[RTE_MAX_LCORE*MAX_TASKS_PER_CORE];

static void __attribute__((noreturn)) prox_usage(const char *prgname)
{
	plog_info("\nUsage: %s [-f CONFIG_FILE] [-a|-e] [-m|-s|-i] [-w DEF] [-u] [-t]\n"
		  "\t-f CONFIG_FILE : configuration file to load, ./prox.cfg by default\n"
		  "\t-l LOG_FILE : log file name, ./prox.log by default\n"
		  "\t-p : include PID in log file name if default log file is used\n"
		  "\t-o DISPLAY: Set display to use, can be 'curses' (default), 'cli' or 'none'\n"
		  "\t-v verbosity : initial logging verbosity\n"
		  "\t-a : autostart all cores (by default)\n"
		  "\t-e : don't autostart\n"
		  "\t-n : Create NULL devices instead of using PCI devices, useful together with -i\n"
		  "\t-m : list supported task modes and exit\n"
		  "\t-s : check configuration file syntax and exit\n"
		  "\t-i : check initialization sequence and exit\n"
		  "\t-u : Listen on UDS /tmp/prox.sock\n"
		  "\t-t : Listen on TCP port 8474\n"
		  "\t-q : Pass argument to Lua interpreter, useful to define variables\n"
		  "\t-w : define variable using syntax varname=value\n"
		  "\t     takes precedence over variables defined in CONFIG_FILE\n"
		  "\t-k : Log statistics to file \"stats_dump\" in current directory\n"
		  "\t-d : Run as daemon, the parent process will block until PROX is not initialized\n"
		  "\t-z : Ignore CPU topology, implies -i\n"
		  "\t-r : Change initial screen refresh rate. If set to a lower than 0.001 seconds,\n"
		  "\t	  screen refreshing will be disabled\n"
		  , prgname);
	exit(EXIT_FAILURE);
}

static void check_mixed_normal_pipeline(void)
{
	struct lcore_cfg *lconf = NULL;
	uint32_t lcore_id = -1;

	while (prox_core_next(&lcore_id, 0) == 0) {
		lconf = &lcore_cfg[lcore_id];

		int all_thread_nop = 1;
		int generic = 0;
		int pipeline = 0;
		int l3 = 0;
		for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
			struct task_args *targ = &lconf->targs[task_id];
			l3 = !strcmp("l3", targ->sub_mode_str);
			all_thread_nop = all_thread_nop && !l3 &&
				targ->task_init->thread_x == thread_nop;

			pipeline = pipeline || targ->task_init->thread_x == thread_pipeline;
			generic = generic || targ->task_init->thread_x == thread_generic || l3;
		}
		PROX_PANIC(generic && pipeline, "Can't run both pipeline and normal thread on same core\n");

		if (all_thread_nop)
			lconf->thread_x = thread_nop;
		else {
			lconf->thread_x = thread_generic;
		}
	}
}

static void check_zero_rx(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		if (targ->nb_rxports != 0) {
			PROX_PANIC(task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
			   "\tCore %u task %u: rx_ports configured while mode %s does not use it\n", lconf->id, targ->id, targ->task_init->mode_str);
		}
	}
}

static void check_nb_mbuf(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ = NULL;
	uint8_t port_id;
	int n_txd = 0, n_rxd = 0;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		for (uint8_t i = 0; i < targ->nb_txports; ++i) {
			port_id = targ->tx_port_queue[i].port;
			n_txd = prox_port_cfg[port_id].n_txd;
		}
		for (uint8_t i = 0; i < targ->nb_rxports; ++i) {
			port_id = targ->rx_port_queue[i].port;
			n_rxd = prox_port_cfg[port_id].n_rxd;
		}
		if (targ->nb_mbuf <= n_rxd + n_txd + targ->nb_cache_mbuf + MAX_PKT_BURST) {
			plog_warn("Core %d, task %d might not have enough mbufs (%d) to support %d txd, %d rxd and %d cache_mbuf\n",
				lconf->id, targ->id, targ->nb_mbuf, n_txd, n_rxd, targ->nb_cache_mbuf);
		}
	}
}

static void check_missing_rx(void)
{
	struct lcore_cfg *lconf = NULL, *rx_lconf = NULL, *tx_lconf = NULL;
	struct task_args *targ, *rx_targ = NULL, *tx_targ = NULL;
	uint8_t port_id, rx_port_id, ok, l3, ndp;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		PROX_PANIC((targ->flags & TASK_ARG_RX_RING) && targ->rx_rings[0] == 0 && !targ->tx_opt_ring_task,
			   "Configuration Error - Core %u task %u Receiving from ring, but nobody xmitting to this ring\n", lconf->id, targ->id);
		if (targ->nb_rxports == 0 && targ->nb_rxrings == 0) {
			PROX_PANIC(!task_init_flag_set(targ->task_init, TASK_FEATURE_NO_RX),
				   "\tCore %u task %u: no rx_ports and no rx_rings configured while required by mode %s\n", lconf->id, targ->id, targ->task_init->mode_str);
		}
	}

	lconf = NULL;
	while (core_targ_next(&lconf, &targ, 0) == 0) {
		l3 = ndp = 0;
		if (strcmp(targ->sub_mode_str, "l3") == 0)
			l3 = 1;
		else if (strcmp(targ->sub_mode_str, "ndp") == 0)
			ndp = 1;
		else
			continue;

		PROX_PANIC((targ->nb_rxports == 0) && (targ->nb_txports == 0), "L3/NDP task must have a RX or a TX port\n");
		// If the L3/NDP sub_mode receives from a port, check that there is at least one core/task
		// transmitting to this port in L3/NDP sub_mode
		for (uint8_t i = 0; i < targ->nb_rxports; ++i) {
			rx_port_id = targ->rx_port_queue[i].port;
			ok = 0;
			tx_lconf = NULL;
			while (core_targ_next(&tx_lconf, &tx_targ, 0) == 0) {
				if ((port_id = tx_targ->tx_port_queue[0].port) == OUT_DISCARD)
					continue;
				if ((rx_port_id == port_id) &&
					( ((tx_targ->flags & TASK_ARG_L3) && l3) ||
					  ((tx_targ->flags & TASK_ARG_NDP) && ndp) ) ) {
					ok = 1;
					break;
				}
			}
			PROX_PANIC(ok == 0, "RX %s sub mode for port %d on core %d task %d, but no core/task transmitting on that port\n", l3 ? "l3":"ndp", rx_port_id, lconf->id, targ->id);
		}

		// If the L3/NDP sub_mode transmits to a port, check that there is at least one core/task
		// receiving from that port in L3/NDP sub_mode.
		if ((port_id = targ->tx_port_queue[0].port) == OUT_DISCARD)
			continue;
		rx_lconf = NULL;
		ok = 0;
		plog_info("\tCore %d task %d transmitting to port %d in %s submode\n", lconf->id, targ->id, port_id, l3 ? "l3":"ndp");
		while (core_targ_next(&rx_lconf, &rx_targ, 0) == 0) {
			for (uint8_t i = 0; i < rx_targ->nb_rxports; ++i) {
				rx_port_id = rx_targ->rx_port_queue[i].port;
				if ((rx_port_id == port_id) &&
					( ((rx_targ->flags & TASK_ARG_L3) && l3) ||
					((rx_targ->flags & TASK_ARG_NDP) && ndp) ) ){
					ok = 1;
					break;
				}
			}
			if (ok == 1) {
				plog_info("\tCore %d task %d has found core %d task %d receiving from port %d in %s submode\n", lconf->id, targ->id, rx_lconf->id, rx_targ->id, port_id,
					((rx_targ->flags & TASK_ARG_L3) && l3) ? "l3":"ndp");
				break;
			}
		}
		PROX_PANIC(ok == 0, "%s sub mode for port %d on core %d task %d, but no core/task receiving on that port\n", l3 ? "l3":"ndp", port_id, lconf->id, targ->id);
	}
}

static void check_cfg_consistent(void)
{
	check_nb_mbuf();
	check_missing_rx();
	check_zero_rx();
	check_mixed_normal_pipeline();
}

static void plog_all_rings(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		for (uint8_t ring_idx = 0; ring_idx < targ->nb_rxrings; ++ring_idx) {
			plog_info("\tCore %u, task %u, rx_ring[%u] %p\n", lconf->id, targ->id, ring_idx, targ->rx_rings[ring_idx]);
		}
	}
}

static int chain_flag_state(struct task_args *targ, uint64_t flag, int is_set)
{
	if (task_init_flag_set(targ->task_init, flag) == is_set)
		return 1;

	int ret = 0;

	for (uint32_t i = 0; i < targ->n_prev_tasks; ++i) {
		ret = chain_flag_state(targ->prev_tasks[i], flag, is_set);
		if (ret)
			return 1;
	}
	return 0;
}

static int chain_flag_always_set(struct task_args *targ, uint64_t flag)
{
	return (!chain_flag_state(targ, flag, 0));
}

static int chain_flag_never_set(struct task_args *targ, uint64_t flag)
{
	return (!chain_flag_state(targ, flag, 1));
}

static int chain_flag_sometimes_set(struct task_args *targ, uint64_t flag)
{
	return (chain_flag_state(targ, flag, 1));
}

static void configure_if_tx_queues(struct task_args *targ, uint8_t socket)
{
	uint8_t if_port;

	for (uint8_t i = 0; i < targ->nb_txports; ++i) {
		if_port = targ->tx_port_queue[i].port;

		PROX_PANIC(if_port == OUT_DISCARD, "port misconfigured, exiting\n");

		PROX_PANIC(!prox_port_cfg[if_port].active, "\tPort %u not used, skipping...\n", if_port);

		int dsocket = prox_port_cfg[if_port].socket;
		if (dsocket != -1 && dsocket != socket) {
			plog_warn("TX core on socket %d while device on socket %d\n", socket, dsocket);
		}

		if (prox_port_cfg[if_port].tx_ring[0] == '\0') {  // Rings-backed port can use single queue
			targ->tx_port_queue[i].queue = prox_port_cfg[if_port].n_txq;
			prox_port_cfg[if_port].n_txq++;
		} else {
			prox_port_cfg[if_port].n_txq = 1;
			targ->tx_port_queue[i].queue = 0;
		}
		/* By default OFFLOAD is enabled, but if the whole
		   chain has NOOFFLOADS set all the way until the
		   first task that receives from a port, it will be
		   disabled for the destination port. */
#if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1)
		if (chain_flag_always_set(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS)) {
			prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOOFFLOADS;
		}
#else
		if (chain_flag_always_set(targ, TASK_FEATURE_TXQ_FLAGS_NOOFFLOADS)) {
			prox_port_cfg[if_port].requested_tx_offload &= ~(RTE_ETH_TX_OFFLOAD_IPV4_CKSUM | RTE_ETH_TX_OFFLOAD_UDP_CKSUM);
		}
#endif
	}
}

static void configure_if_rx_queues(struct task_args *targ, uint8_t socket)
{
	struct prox_port_cfg *port;
	uint8_t port_used_counter[PROX_MAX_PORTS] = {0};
	bool multiple_port_reference = false;
	uint8_t total_number_of_queues = 0;
	// Check how many times a port is referenced for this task
	for (uint8_t i = 0; i < targ->nb_rxports; i++) {
		uint8_t if_port = targ->rx_port_queue[i].port;
		port_used_counter[if_port]++;
		if (port_used_counter[if_port] > 1) {
			multiple_port_reference = true;
			port = &prox_port_cfg[if_port];
			PROX_PANIC((port->all_rx_queues), "Multiple queues defined in rx port, but all_rx_queues also set for port %s\n", port->names[0]);
		}
	}
	// If only referenced once, it is possible that we want to use all queues
	// Therefore we will check all_rx_queues for that port
	if (!multiple_port_reference) {
		for (uint8_t i = 0; i < PROX_MAX_PORTS; i++) {
			uint8_t if_port = targ->rx_port_queue[i].port;
			if (port_used_counter[if_port]) {
				port = &prox_port_cfg[if_port];
				if (port->all_rx_queues) {
					port_used_counter[if_port] = port->max_rxq;
					total_number_of_queues += port->max_rxq;
					plog_info("\tall_rx_queues for Port %s: %u rx_queues will be applied\n", port->names[0], port_used_counter[if_port]);
				}
			}
		}
	}
	if (total_number_of_queues) {
		PROX_PANIC((total_number_of_queues > PROX_MAX_PORTS), "%u queues using the all_rx_queues. PROX_MAX_PORTS is set to %u\n", total_number_of_queues, PROX_MAX_PORTS);
		uint8_t index = 0;
		for (uint8_t i = 0; i < PROX_MAX_PORTS; i++) {
			if (port_used_counter[i]) {
				for (uint8_t j = 0; j < port_used_counter[i]; j++) {
					targ->rx_port_queue[index].port = i;
					index ++;
				}
				port = &prox_port_cfg[i];
				plog_info("\t\tConfiguring task to use port %s with %u rx_queues\n", port->names[0], port_used_counter[i]);
			}
		}
		targ->nb_rxports = index;
	}
	for (int i = 0; i < targ->nb_rxports; i++) {
		uint8_t if_port = targ->rx_port_queue[i].port;

		if (if_port == OUT_DISCARD) {
			return;
		}

		port = &prox_port_cfg[if_port];
		PROX_PANIC(!port->active, "Port %u not used, aborting...\n", if_port);

		if(port->rx_ring[0] != '\0') {
			port->n_rxq = 0;
		}

		// If the mbuf size (of the rx task) is not big enough, we might receive multiple segments
		// This is usually the case when setting a big mtu size i.e. enabling jumbo frames.
		// If the packets get transmitted, then multi segments will have to be enabled on the TX port
		uint16_t max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
		if (max_frame_size + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM > targ->mbuf_size) {
			targ->task_init->flag_features |= TASK_FEATURE_TXQ_FLAGS_MULTSEGS;
		}
		targ->rx_port_queue[i].queue = port->n_rxq;
		port->pool[targ->rx_port_queue[i].queue] = targ->pool;
		port->pool_size[targ->rx_port_queue[i].queue] = targ->nb_mbuf - 1;
		port->n_rxq++;

		int dsocket = port->socket;
		if (dsocket != -1 && dsocket != socket) {
			plog_warn("RX core on socket %d while device on socket %d\n", socket, dsocket);
		}
	}
}

static void configure_if_queues(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;
	uint8_t socket;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		socket = rte_lcore_to_socket_id(lconf->id);

		configure_if_rx_queues(targ, socket);
		configure_if_tx_queues(targ, socket);
	}
}

static void configure_tx_queue_flags(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;
	uint8_t socket;
	uint8_t if_port;

        while (core_targ_next(&lconf, &targ, 0) == 0) {
                socket = rte_lcore_to_socket_id(lconf->id);
                for (uint8_t i = 0; i < targ->nb_txports; ++i) {
                        if_port = targ->tx_port_queue[i].port;
#if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1)
                        /* Set the ETH_TXQ_FLAGS_NOREFCOUNT flag if none of
                        the tasks up to the task transmitting to the port
                        use refcnt. */
                        if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT)) {
                                prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOREFCOUNT;
                        }
#else
                        /* Set the RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE flag if none of
                        the tasks up to the task transmitting to the port
                        use refcnt and per-queue all mbufs comes from the same mempool. */
                        if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_REFCOUNT)) {
                                if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL))
                                        prox_port_cfg[if_port].requested_tx_offload |= RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
                        }
#endif
                }
	}
}

static void configure_multi_segments(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;
	uint8_t if_port;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		for (uint8_t i = 0; i < targ->nb_txports; ++i) {
			if_port = targ->tx_port_queue[i].port;
			// Multi segment is disabled for most tasks. It is only enabled for tasks requiring big packets.
#if RTE_VERSION < RTE_VERSION_NUM(18,8,0,1)
			// We can only enable "no multi segment" if no such task exists in the chain of tasks.
			if (chain_flag_never_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTSEGS)) {
				prox_port_cfg[if_port].tx_conf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
			}
#else
			// We enable "multi segment" if at least one task requires it in the chain of tasks.
			if (chain_flag_sometimes_set(targ, TASK_FEATURE_TXQ_FLAGS_MULTSEGS)) {
				prox_port_cfg[if_port].requested_tx_offload |= RTE_ETH_TX_OFFLOAD_MULTI_SEGS;
			}
#endif
		}
	}
}

static const char *gen_ring_name(void)
{
	static char retval[] = "XX";
	static const char* ring_names =
		"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
		"abcdefghijklmnopqrstuvwxyz"
		"[\\]^_`!\"#$%&'()*+,-./:;<="
		">?@{|}0123456789";
	static int idx2 = 0;

	int idx = idx2;

	retval[0] = ring_names[idx % strlen(ring_names)];
	idx /= strlen(ring_names);
	retval[1] = idx ? ring_names[(idx - 1) % strlen(ring_names)] : 0;

	idx2++;

	return retval;
}

struct ring_init_stats {
	uint32_t n_pkt_rings;
	uint32_t n_ctrl_rings;
	uint32_t n_opt_rings;
};

static uint32_t ring_init_stats_total(const struct ring_init_stats *ris)
{
	return ris->n_pkt_rings + ris->n_ctrl_rings + ris->n_opt_rings;
}

static uint32_t count_incoming_tasks(uint32_t lcore_worker, uint32_t dest_task)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;
	uint32_t ret = 0;
	struct core_task ct;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		for (uint8_t idxx = 0; idxx < MAX_PROTOCOLS; ++idxx) {
			for (uint8_t ridx = 0; ridx < targ->core_task_set[idxx].n_elems; ++ridx) {
				ct = targ->core_task_set[idxx].core_task[ridx];

				if (dest_task == ct.task && lcore_worker == ct.core)
					ret++;
			}
		}
	}
	return ret;
}

static struct rte_ring *get_existing_ring(uint32_t lcore_id, uint32_t task_id)
{
	if (!prox_core_active(lcore_id, 0))
		return NULL;

	struct lcore_cfg *lconf = &lcore_cfg[lcore_id];

	if (task_id >= lconf->n_tasks_all)
		return NULL;

	if (lconf->targs[task_id].nb_rxrings == 0)
		return NULL;

	return lconf->targs[task_id].rx_rings[0];
}

static struct rte_ring *init_ring_between_tasks(struct lcore_cfg *lconf, struct task_args *starg,
				    const struct core_task ct, uint8_t ring_idx, int idx,
				    struct ring_init_stats *ris)
{
	uint8_t socket;
	struct rte_ring *ring = NULL;
	struct lcore_cfg *lworker;
	struct task_args *dtarg;

	PROX_ASSERT(prox_core_active(ct.core, 0));
	lworker = &lcore_cfg[ct.core];

	/* socket used is the one that the sending core resides on */
	socket = rte_lcore_to_socket_id(lconf->id);

	plog_info("\t\tCreating ring on socket %u with size %u\n"
		  "\t\t\tsource core, task and socket = %u, %u, %u\n"
		  "\t\t\tdestination core, task and socket = %u, %u, %u\n"
		  "\t\t\tdestination worker id = %u\n",
		  socket, starg->ring_size,
		  lconf->id, starg->id, socket,
		  ct.core, ct.task, rte_lcore_to_socket_id(ct.core),
		  ring_idx);

	if (ct.type) {
		struct rte_ring **dring = NULL;

		if (ct.type == CTRL_TYPE_MSG)
			dring = &lworker->ctrl_rings_m[ct.task];
		else if (ct.type == CTRL_TYPE_PKT) {
			dring = &lworker->ctrl_rings_p[ct.task];
			starg->flags |= TASK_ARG_CTRL_RINGS_P;
		}

		if (*dring == NULL)
			ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
		else
			ring = *dring;
		PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);

		starg->tx_rings[starg->tot_n_txrings_inited] = ring;
		starg->tot_n_txrings_inited++;
		*dring = ring;
		if (lconf->id == prox_cfg.master) {
			ctrl_rings[ct.core*MAX_TASKS_PER_CORE + ct.task] = ring;
		} else if (ct.core == prox_cfg.master) {
			starg->ctrl_plane_ring = ring;
		}

		plog_info("\t\t\tCore %u task %u to -> core %u task %u ctrl_ring %s %p %s\n",
			  lconf->id, starg->id, ct.core, ct.task, ct.type == CTRL_TYPE_PKT?
			  "pkt" : "msg", ring, ring->name);
		ris->n_ctrl_rings++;
		return ring;
	}

	dtarg = &lworker->targs[ct.task];
	lworker->targs[ct.task].worker_thread_id = ring_idx;
	PROX_ASSERT(dtarg->flags & TASK_ARG_RX_RING);
	PROX_ASSERT(ct.task < lworker->n_tasks_all);

	/* If all the following conditions are met, the ring can be
	   optimized away. */
	if (!task_is_master(starg) && !task_is_master(dtarg) && starg->lconf->id == dtarg->lconf->id &&
	    starg->nb_txrings == 1 && idx == 0 && dtarg->task &&
	    dtarg->tot_rxrings == 1 && starg->task == dtarg->task - 1) {
		plog_info("\t\tOptimizing away ring on core %u from task %u to task %u\n",
			  dtarg->lconf->id, starg->task, dtarg->task);
		/* No need to set up ws_mbuf. */
		starg->tx_opt_ring = 1;
		/* During init of destination task, the buffer in the
		   source task will be initialized. */
		dtarg->tx_opt_ring_task = starg;
		ris->n_opt_rings++;
		++dtarg->nb_rxrings;
		return NULL;
	}

	int ring_created = 1;
	/* Only create multi-producer rings if configured to do so AND
	   there is only one task sending to the task */
	if ((prox_cfg.flags & DSF_MP_RINGS && count_incoming_tasks(ct.core, ct.task) > 1)
		|| (prox_cfg.flags & DSF_ENABLE_BYPASS)) {
		ring = get_existing_ring(ct.core, ct.task);

		if (ring) {
			plog_info("\t\tCore %u task %u creatign MP ring %p to core %u task %u\n",
				  lconf->id, starg->id, ring, ct.core, ct.task);
			ring_created = 0;
		}
		else {
			ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SC_DEQ);
			plog_info("\t\tCore %u task %u using MP ring %p from core %u task %u\n",
				  lconf->id, starg->id, ring, ct.core, ct.task);
		}
	}
	else
		ring = rte_ring_create(gen_ring_name(), starg->ring_size, socket, RING_F_SP_ENQ | RING_F_SC_DEQ);

	PROX_PANIC(ring == NULL, "Cannot create ring to connect I/O core %u with worker core %u\n", lconf->id, ct.core);

	starg->tx_rings[starg->tot_n_txrings_inited] = ring;
	starg->tot_n_txrings_inited++;

	if (ring_created) {
		PROX_ASSERT(dtarg->nb_rxrings < MAX_RINGS_PER_TASK);
		dtarg->rx_rings[dtarg->nb_rxrings] = ring;
		++dtarg->nb_rxrings;
		if (dtarg->nb_rxrings > 1)
			dtarg->task_init->flag_features |= TASK_FEATURE_TXQ_FLAGS_MULTIPLE_MEMPOOL;
	}
	dtarg->nb_slave_threads = starg->core_task_set[idx].n_elems;
	dtarg->lb_friend_core = lconf->id;
	dtarg->lb_friend_task = starg->id;
	plog_info("\t\tWorker thread %d has core %d, task %d as a lb friend\n", ct.core, lconf->id, starg->id);
	plog_info("\t\tCore %u task %u tx_ring[%u] -> core %u task %u rx_ring[%u] %p %s %u WT\n",
		  lconf->id, starg->id, ring_idx, ct.core, ct.task, dtarg->nb_rxrings, ring, ring->name,
		  dtarg->nb_slave_threads);
	++ris->n_pkt_rings;
	return ring;
}

static void init_rings(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *starg;
	struct ring_init_stats ris = {0};

	while (core_targ_next(&lconf, &starg, 1) == 0) {
		plog_info("\t*** Initializing rings on core %u, task %u ***\n", lconf->id, starg->id);
		for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
			for (uint8_t ring_idx = 0; ring_idx < starg->core_task_set[idx].n_elems; ++ring_idx) {
				PROX_ASSERT(ring_idx < MAX_WT_PER_LB);
				PROX_ASSERT(starg->tot_n_txrings_inited < MAX_RINGS_PER_TASK);

				struct core_task ct = starg->core_task_set[idx].core_task[ring_idx];
				init_ring_between_tasks(lconf, starg, ct, ring_idx, idx, &ris);
			}
		}
	}

	plog_info("\tInitialized %d rings:\n"
		  "\t\tNumber of packet rings: %u\n"
		  "\t\tNumber of control rings: %u\n"
		  "\t\tNumber of optimized rings: %u\n",
		  ring_init_stats_total(&ris),
		  ris.n_pkt_rings,
		  ris.n_ctrl_rings,
		  ris.n_opt_rings);

	lconf = NULL;
	struct prox_port_cfg *port;
	while (core_targ_next(&lconf, &starg, 1) == 0) {
		if ((starg->task_init) && (starg->flags & (TASK_ARG_L3|TASK_ARG_NDP))) {
			struct core_task ct;
			ct.core = prox_cfg.master;
			ct.task = 0;
			ct.type = CTRL_TYPE_PKT;
			struct rte_ring *rx_ring = init_ring_between_tasks(lconf, starg, ct, 0, 0, &ris);

			ct.core = lconf->id;
			ct.task = starg->id;;
			struct rte_ring *tx_ring = init_ring_between_tasks(&lcore_cfg[prox_cfg.master], lcore_cfg[prox_cfg.master].targs, ct, 0, 0, &ris);
		}
	}
}

static void shuffle_mempool(struct rte_mempool* mempool, uint32_t nb_mbuf)
{
	struct rte_mbuf** pkts = prox_zmalloc(nb_mbuf * sizeof(*pkts), rte_socket_id());
	uint64_t got = 0;

	while ((got < nb_mbuf) && (rte_mempool_get_bulk(mempool, (void**)(pkts + got), 1) == 0))
		++got;

	nb_mbuf = got;
	while (got) {
		int idx;
		do {
			idx = rand() % nb_mbuf;
		} while (pkts[idx] == 0);

		rte_mempool_put_bulk(mempool, (void**)&pkts[idx], 1);
		pkts[idx] = 0;
		--got;
	};
	prox_free(pkts);
}

static void set_mbuf_size(struct task_args *targ)
{
	/* mbuf size can be set
	 *  - from config file (highest priority, overwriting any other config) - should only be used as workaround
	 *  - defaulted to MBUF_SIZE.
	 * Except if set explicitely, ensure that size is big enough for vmxnet3 driver
	 */
	if (targ->mbuf_size)
		return;

	targ->mbuf_size = MBUF_SIZE;
	struct prox_port_cfg *port;
	uint16_t max_frame_size = 0, min_buffer_size = 0;
	int i40e = 0;
	for (int i = 0; i < targ->nb_rxports; i++) {
		uint8_t if_port = targ->rx_port_queue[i].port;

		if (if_port == OUT_DISCARD) {
			continue;
		}
		port = &prox_port_cfg[if_port];
		if (max_frame_size < port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE)
			max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
		if (min_buffer_size < port->min_rx_bufsize)
			min_buffer_size = port->min_rx_bufsize;

		// Check whether we receive from i40e. This driver have extra mbuf size requirements
		if (strcmp(port->short_name, "i40e") == 0)
			i40e = 1;
	}
	if (i40e) {
		// i40e supports a maximum of 5 descriptors chained
		uint16_t required_mbuf_size = RTE_ALIGN(max_frame_size / 5, 128) + sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
		if (required_mbuf_size > targ->mbuf_size) {
			targ->mbuf_size = required_mbuf_size;
			plog_info("\t\tSetting mbuf_size to %u to support frame_size %u\n", targ->mbuf_size, max_frame_size);
		}
	}
	if (min_buffer_size > targ->mbuf_size) {
		plog_warn("Mbuf size might be too small. This might result in packet segmentation and memory leak\n");
	}

}

static void setup_mempools_unique_per_socket(void)
{
	uint32_t flags = 0;
	char name[64];
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;

	struct rte_mempool     *pool[MAX_SOCKETS];
	uint32_t mbuf_count[MAX_SOCKETS] = {0};
	uint32_t nb_cache_mbuf[MAX_SOCKETS] = {0};
	uint32_t mbuf_size[MAX_SOCKETS] = {0};

	while (core_targ_next_early(&lconf, &targ, 0) == 0) {
		PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
		uint8_t socket = rte_lcore_to_socket_id(lconf->id);
		PROX_ASSERT(socket < MAX_SOCKETS);

		set_mbuf_size(targ);
		if (targ->rx_port_queue[0].port != OUT_DISCARD) {
			struct prox_port_cfg* port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
			PROX_ASSERT(targ->nb_mbuf != 0);
			mbuf_count[socket] += targ->nb_mbuf;
			if (nb_cache_mbuf[socket] == 0)
				nb_cache_mbuf[socket] = targ->nb_cache_mbuf;
			else {
				PROX_PANIC(nb_cache_mbuf[socket] != targ->nb_cache_mbuf,
					   "all mbuf_cache must have the same size if using a unique mempool per socket\n");
			}
			if (mbuf_size[socket] == 0)
				mbuf_size[socket] = targ->mbuf_size;
			else {
				PROX_PANIC(mbuf_size[socket] != targ->mbuf_size,
					   "all mbuf_size must have the same size if using a unique mempool per socket\n");
			}
		}
	}
	for (int i = 0 ; i < MAX_SOCKETS; i++) {
		if (mbuf_count[i] != 0) {
			sprintf(name, "socket_%u_pool", i);
			if ((pool[i] = rte_mempool_lookup(name)) == NULL) {
				pool[i] = rte_mempool_create(name,
					mbuf_count[i] - 1, mbuf_size[i],
					nb_cache_mbuf[i],
					sizeof(struct rte_pktmbuf_pool_private),
					rte_pktmbuf_pool_init, NULL,
					prox_pktmbuf_init, NULL,
					i, flags);
				PROX_PANIC(pool[i] == NULL, "\t\tError: cannot create mempool for socket %u\n", i);
				plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", pool[i],
				  	mbuf_count[i], mbuf_size[i], nb_cache_mbuf[i], i);

				if (prox_cfg.flags & DSF_SHUFFLE) {
					shuffle_mempool(pool[i], mbuf_count[i]);
				}
			}
		}
	}

	lconf = NULL;
	while (core_targ_next_early(&lconf, &targ, 0) == 0) {
		uint8_t socket = rte_lcore_to_socket_id(lconf->id);

		if (targ->rx_port_queue[0].port != OUT_DISCARD) {
			/* use this pool for the interface that the core is receiving from */
			/* If one core receives from multiple ports, all the ports use the same mempool */
			targ->pool = pool[socket];
			/* Set the number of mbuf to the number of the unique mempool, so that the used and free work */
			targ->nb_mbuf = mbuf_count[socket];
			plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
				  targ->nb_mbuf, mbuf_size[socket], targ->nb_cache_mbuf, socket);
		}
	}
}

static void setup_mempool_for_rx_task(struct lcore_cfg *lconf, struct task_args *targ)
{
	const uint8_t socket = rte_lcore_to_socket_id(lconf->id);
	struct prox_port_cfg *port_cfg = &prox_port_cfg[targ->rx_port_queue[0].port];
	const struct rte_memzone *mz;
	struct rte_mempool *mp = NULL;
	uint32_t flags = 0;
	char memzone_name[64];
	char name[64];

	set_mbuf_size(targ);

	/* allocate memory pool for packets */
	PROX_ASSERT(targ->nb_mbuf != 0);

	if (targ->pool_name[0] == '\0') {
		sprintf(name, "core_%u_task_%u_pool", lconf->id, targ->id);
	}

	snprintf(memzone_name, sizeof(memzone_name), "MP_%.*s", (int)(sizeof(memzone_name)-4), targ->pool_name);
	mz = rte_memzone_lookup(memzone_name);

	if (mz != NULL) {
		mp = (struct rte_mempool*)mz->addr;

		targ->nb_mbuf = mp->size;
		targ->pool = mp;
	}

#ifdef RTE_LIBRTE_IVSHMEM_FALSE
	if (mz != NULL && mp != NULL && mp->phys_addr != mz->ioremap_addr) {
		/* Init mbufs with ioremap_addr for dma */
		mp->phys_addr = mz->ioremap_addr;
		mp->elt_pa[0] = mp->phys_addr + (mp->elt_va_start - (uintptr_t)mp);

		struct prox_pktmbuf_reinit_args init_args;
		init_args.mp = mp;
		init_args.lconf = lconf;

		uint32_t elt_sz = mp->elt_size + mp->header_size + mp->trailer_size;
		rte_mempool_obj_iter((void*)mp->elt_va_start, mp->size, elt_sz, 1,
				     mp->elt_pa, mp->pg_num, mp->pg_shift, prox_pktmbuf_reinit, &init_args);
	}
#endif

	/* Use this pool for the interface that the core is
	   receiving from if one core receives from multiple
	   ports, all the ports use the same mempool */
	if (targ->pool == NULL) {
		plog_info("\tCreating mempool with name '%s' on socket %d\n", name, socket);
		targ->pool = rte_mempool_create(name,
						targ->nb_mbuf - 1, targ->mbuf_size,
						targ->nb_cache_mbuf,
						sizeof(struct rte_pktmbuf_pool_private),
						rte_pktmbuf_pool_init, NULL,
						prox_pktmbuf_init, lconf,
						socket, flags);
	}

	PROX_PANIC(targ->pool == NULL,
		   "\tError: cannot create mempool for core %u port %u: %s\n", lconf->id, targ->id, rte_strerror(rte_errno));

	plog_info("\tMempool %p size = %u * %u cache %u, socket %d\n", targ->pool,
		  targ->nb_mbuf, targ->mbuf_size, targ->nb_cache_mbuf, socket);
	if (prox_cfg.flags & DSF_SHUFFLE) {
		shuffle_mempool(targ->pool, targ->nb_mbuf);
	}
}

static void setup_mempools_multiple_per_socket(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;

	while (core_targ_next_early(&lconf, &targ, 0) == 0) {
		PROX_PANIC(targ->task_init == NULL, "task_init = NULL, is mode specified for core %d, task %d ?\n", lconf->id, targ->id);
		if (targ->rx_port_queue[0].port == OUT_DISCARD)
			continue;
		setup_mempool_for_rx_task(lconf, targ);
	}
}

static void setup_mempools(void)
{
	if (prox_cfg.flags & UNIQUE_MEMPOOL_PER_SOCKET)
		setup_mempools_unique_per_socket();
	else
		setup_mempools_multiple_per_socket();
}

static void set_task_lconf(void)
{
	struct lcore_cfg *lconf;
	uint32_t lcore_id = -1;

	while(prox_core_next(&lcore_id, 1) == 0) {
		lconf = &lcore_cfg[lcore_id];
		for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
			lconf->targs[task_id].lconf = lconf;
		}
	}
}

static void set_dest_threads(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;

	while (core_targ_next(&lconf, &targ, 0) == 0) {
		for (uint8_t idx = 0; idx < MAX_PROTOCOLS; ++idx) {
			for (uint8_t ring_idx = 0; ring_idx < targ->core_task_set[idx].n_elems; ++ring_idx) {
				struct core_task ct = targ->core_task_set[idx].core_task[ring_idx];

				struct task_args *dest_task = core_targ_get(ct.core, ct.task);
				dest_task->prev_tasks[dest_task->n_prev_tasks++] = targ;
			}
		}
	}
}

static void setup_all_task_structs_early_init(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;

	plog_info("\t*** Calling early init on all tasks ***\n");
	while (core_targ_next(&lconf, &targ, 0) == 0) {
		if (targ->task_init->early_init) {
			targ->task_init->early_init(targ);
		}
	}
}

static void setup_all_task_structs(void)
{
	struct lcore_cfg *lconf;
	uint32_t lcore_id = -1;
	struct task_base *tmaster = NULL;

	while(prox_core_next(&lcore_id, 1) == 0) {
		lconf = &lcore_cfg[lcore_id];
		for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
			if (task_is_master(&lconf->targs[task_id])) {
				plog_info("\tInitializing MASTER struct for core %d task %d\n", lcore_id, task_id);
				lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
				tmaster = lconf->tasks_all[task_id];
			}
		}
	}
	PROX_PANIC(tmaster == NULL, "Can't initialize master task\n");
	lcore_id = -1;

	while(prox_core_next(&lcore_id, 1) == 0) {
		lconf = &lcore_cfg[lcore_id];
		plog_info("\t*** Initializing core %d (%d task) ***\n", lcore_id, lconf->n_tasks_all);
		for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
			if (!task_is_master(&lconf->targs[task_id])) {
				plog_info("\t\tInitializing struct for core %d task %d\n", lcore_id, task_id);
				lconf->targs[task_id].tmaster = tmaster;
				lconf->tasks_all[task_id] = init_task_struct(&lconf->targs[task_id]);
			}
		}
	}
}

static void init_port_activate(void)
{
	struct lcore_cfg *lconf = NULL;
	struct task_args *targ;
	uint8_t port_id = 0;

	while (core_targ_next_early(&lconf, &targ, 0) == 0) {
		for (int i = 0; i < targ->nb_rxports; i++) {
			port_id = targ->rx_port_queue[i].port;
			prox_port_cfg[port_id].active = 1;
		}

		for (int i = 0; i < targ->nb_txports; i++) {
			port_id = targ->tx_port_queue[i].port;
			prox_port_cfg[port_id].active = 1;
		}
	}
}

/* Initialize cores and allocate mempools */
static void init_lcores(void)
{
	struct lcore_cfg *lconf = 0;
	uint32_t lcore_id = -1;

	while(prox_core_next(&lcore_id, 0) == 0) {
		uint8_t socket = rte_lcore_to_socket_id(lcore_id);
		PROX_PANIC(socket + 1 > MAX_SOCKETS, "Can't configure core %u (on socket %u). MAX_SOCKET is set to %d\n", lcore_id, socket, MAX_SOCKETS);
	}

	/* need to allocate mempools as the first thing to use the lowest possible address range */
	plog_info("=== Initializing mempools ===\n");
	setup_mempools();

	lcore_cfg_alloc_hp();

	set_dest_threads();
	set_task_lconf();

	plog_info("=== Initializing port addresses ===\n");
	init_port_addr();

	plog_info("=== Initializing queue numbers on cores ===\n");
	configure_if_queues();

	plog_info("=== Initializing rings on cores ===\n");
	init_rings();

	configure_multi_segments();
	configure_tx_queue_flags();

	plog_info("=== Checking configuration consistency ===\n");
	check_cfg_consistent();

	plog_all_rings();
}

static int setup_prox(int argc, char **argv)
{
	if (prox_read_config_file() != 0 ||
	    prox_setup_rte(argv[0]) != 0) {
		return -1;
	}

	if (prox_cfg.flags & DSF_CHECK_SYNTAX) {
		plog_info("=== Configuration file syntax has been checked ===\n\n");
		exit(EXIT_SUCCESS);
	}

	init_port_activate();
	plog_info("=== Initializing rte devices ===\n");
	if (!(prox_cfg.flags & DSF_USE_DUMMY_DEVICES))
		init_rte_ring_dev();
	init_rte_dev(prox_cfg.flags & DSF_USE_DUMMY_DEVICES);
	plog_info("=== Calibrating TSC overhead ===\n");
	clock_init();
	plog_info("\tTSC running at %"PRIu64" Hz\n", rte_get_tsc_hz());

	init_lcores();
	plog_info("=== Initializing ports ===\n");
	init_port_all();

	setup_all_task_structs_early_init();
	plog_info("=== Initializing tasks ===\n");
	setup_all_task_structs();

	if (prox_cfg.logbuf_size) {
		prox_cfg.logbuf = prox_zmalloc(prox_cfg.logbuf_size, rte_socket_id());
		PROX_PANIC(prox_cfg.logbuf == NULL, "Failed to allocate memory for logbuf with size = %d\n", prox_cfg.logbuf_size);
	}

	if (prox_cfg.flags & DSF_CHECK_INIT) {
		plog_info("=== Initialization sequence completed ===\n\n");
		exit(EXIT_SUCCESS);
	}

	/* Current way that works to disable DPDK logging */
	FILE *f = fopen("/dev/null", "r");
	rte_openlog_stream(f);
	plog_info("=== PROX started ===\n");
	return 0;
}

static int success = 0;
static void siguser_handler(int signal)
{
	if (signal == SIGUSR1)
		success = 1;
	else
		success = 0;
}

static void sigabrt_handler(__attribute__((unused)) int signum)
{
	/* restore default disposition for SIGABRT and SIGPIPE */
	signal(SIGABRT, SIG_DFL);
	signal(SIGPIPE, SIG_DFL);

	/* ignore further Ctrl-C */
	signal(SIGINT, SIG_IGN);

	/* more drastic exit on tedious termination signal */
	plog_info("Aborting...\n");
	if (lcore_cfg != NULL) {
		uint32_t lcore_id;
		pthread_t thread_id, tid0, tid = pthread_self();
		memset(&tid0, 0, sizeof(tid0));

		/* cancel all threads except current one */
		lcore_id = -1;
		while (prox_core_next(&lcore_id, 1) == 0) {
			thread_id = lcore_cfg[lcore_id].thread_id;
			if (pthread_equal(thread_id, tid0))
				continue;
			if (pthread_equal(thread_id, tid))
				continue;
			pthread_cancel(thread_id);
		}

		/* wait for cancelled threads to terminate */
		lcore_id = -1;
		while (prox_core_next(&lcore_id, 1) == 0) {
			thread_id = lcore_cfg[lcore_id].thread_id;
			if (pthread_equal(thread_id, tid0))
				continue;
			if (pthread_equal(thread_id, tid))
				continue;
			pthread_join(thread_id, NULL);
		}
	}

	/* close ncurses */
	display_end();

	/* close ports on termination signal */
	close_ports_atexit();

	/* terminate now */
	abort();
}

static void sigterm_handler(int signum)
{
	/* abort on second Ctrl-C */
	if (signum == SIGINT)
		signal(SIGINT, sigabrt_handler);

	/* gracefully quit on harmless termination signal */
	/* ports will subsequently get closed at resulting exit */
	quit();
}

static void set_term_env(void)
{
	static const char var[] = "TERM";
	static char str[] = "TERM=putty";
	char *old_value, *new_value;
	int max_ver = 0, min_ver = 0, n;

	old_value = getenv(var);

	const char *ncurses_version = curses_version();
	n = sscanf(ncurses_version, "ncurses %d.%d", &max_ver, &min_ver);
	if (n != 2) {
		plog_info("\tUnable to extract ncurses version from %s. TERM left unchanged to %s\n", ncurses_version, old_value);
		return;
	} else {
		plog_info("\tncurses version = %d.%d (%s)\n", max_ver, min_ver, ncurses_version);
	}

	if ((old_value) && ((max_ver > 6) || ((max_ver == 6) && (min_ver >= 1))) && (strcmp(old_value, "xterm") == 0)) {
		// On recent OSes such as RHEL 8.0, ncurses(6.1)  introduced support
		// for ECMA-48 repeat character control.
		// Some terminal emulators use TERM=xterm but do not support this feature.
		// In this case, printing repeating character such as "22000000 Hz" might
		// display as 220 Hz.
		// Other emulattors, such as tmux, use TERM=screen, and do not exhibit the issue.
		plog_info("\tChanged TERM from %s ", old_value);
		putenv(str);
		new_value = getenv(var);
		plog_info("to %s\n", new_value);
	} else {
		plog_info("\tTERM left unchanged to %s\n", old_value);
	}
}

int main(int argc, char **argv)
{
	/* set en_US locale to print big numbers with ',' */
	setlocale(LC_NUMERIC, "en_US.utf-8");

	if (prox_parse_args(argc, argv) != 0){
		prox_usage(argv[0]);
	}
	plog_init(prox_cfg.log_name, prox_cfg.log_name_pid);
	plog_info("=== " PROGRAM_NAME " %s ===\n", VERSION_STR());
	plog_info("\tUsing DPDK %s\n", rte_version() + sizeof(RTE_VER_PREFIX));
	plog_info("\tgit version %s\n", git_version);
	set_term_env();
	read_rdt_info();

	if (prox_cfg.flags & DSF_LIST_TASK_MODES) {
		/* list supported task modes and exit */
		tasks_list();
		return EXIT_SUCCESS;
	}

	/* close ports at normal exit */
	atexit(close_ports_atexit);
	/* gracefully quit on harmless termination signals */
	signal(SIGHUP, sigterm_handler);
	signal(SIGINT, sigterm_handler);
	signal(SIGQUIT, sigterm_handler);
	signal(SIGTERM, sigterm_handler);
	signal(SIGUSR1, sigterm_handler);
	signal(SIGUSR2, sigterm_handler);
	/* more drastic exit on tedious termination signals */
	signal(SIGABRT, sigabrt_handler);
	signal(SIGPIPE, sigabrt_handler);

	if (prox_cfg.flags & DSF_DAEMON) {
		signal(SIGUSR1, siguser_handler);
		signal(SIGUSR2, siguser_handler);
		plog_info("=== Running in Daemon mode ===\n");
		plog_info("\tForking child and waiting for setup completion\n");

		pid_t ppid = getpid();
		pid_t pid = fork();
		if (pid < 0) {
			plog_err("Failed to fork process to run in daemon mode\n");
			return EXIT_FAILURE;
		}

		if (pid == 0) {
			fclose(stdin);
			fclose(stdout);
			fclose(stderr);
			if (setsid() < 0) {
				kill(ppid, SIGUSR2);
				return EXIT_FAILURE;
			}
			if (setup_prox(argc, argv) != 0) {
				kill(ppid, SIGUSR2);
				return EXIT_FAILURE;
			}
			else {
				kill(ppid, SIGUSR1);
				run(prox_cfg.flags);
				return EXIT_SUCCESS;
			}
		}
		else {
			/* Before exiting the parent, wait until the
			   child process has finished setting up */
			pause();
			if (prox_cfg.logbuf) {
				file_print(prox_cfg.logbuf);
			}
			return success? EXIT_SUCCESS : EXIT_FAILURE;
		}
	}

	if (setup_prox(argc, argv) != 0)
		return EXIT_FAILURE;
	run(prox_cfg.flags);

	return EXIT_SUCCESS;
}