summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/handle_gen.c
diff options
context:
space:
mode:
authorXavier Simonart <simonartxavier@gmail.com>2021-12-30 10:24:15 +0000
committerXavier Simonart <simonartxavier@gmail.com>2022-01-03 15:10:00 +0000
commit971d163b345ab3f74d6a5252722371dc17a8fe7a (patch)
treef9103fac18a5ef0fc9e99638d5abc26f08bb9a82 /VNFs/DPPD-PROX/handle_gen.c
parentba5ffcd0031bdf1f4781d9dcf224b5cb75aee922 (diff)
Add support for detection of misordered packets per flow
Only flows from ranges are supported so far. This means that flows generated through randoms are not supported yet. Misordered packets can now be detected per flow. flow definition is hard-coded as 5 tuple (proto, ip/udp src/dst). The generator writes flow_id and packet_id within the packets (4 + 5 bytes). This means that, if all latency, packet_id, signature, accuracy, flow_id and packet_id_in_flow parameters are set, the minimum packet size is 72 bytes. This feature is enabled through the following parameters: On the gen side: - packet id in flow pos: position (5 bytes) for packet id - flow id pos: flow id pos (4 bytes) On the lat side - flow_count: the number of flows received by the latency core. If not enough flows are configured, then an error message is displayed (once) and packet_reordering count is only measured for those flows - packet id in flow pos: position (5 bytes) for packet id - flow id pos: flow id pos (4 bytes) Misordered packets (and extent) are reported per core as before. No change related to duplicate packets (not measured per flow as a duplicate packet is not per flow...). Signed-off-by: Xavier Simonart <simonartxavier@gmail.com> Change-Id: I07517df87dfebec81408baf1decb647d9a0edd94
Diffstat (limited to 'VNFs/DPPD-PROX/handle_gen.c')
-rw-r--r--VNFs/DPPD-PROX/handle_gen.c216
1 files changed, 198 insertions, 18 deletions
diff --git a/VNFs/DPPD-PROX/handle_gen.c b/VNFs/DPPD-PROX/handle_gen.c
index 363ee17d..9cacdb54 100644
--- a/VNFs/DPPD-PROX/handle_gen.c
+++ b/VNFs/DPPD-PROX/handle_gen.c
@@ -21,6 +21,7 @@
#include <rte_version.h>
#include <rte_byteorder.h>
#include <rte_ether.h>
+#include <rte_hash.h>
#include <rte_hash_crc.h>
#include <rte_malloc.h>
@@ -49,6 +50,7 @@
#include "handle_master.h"
#include "defines.h"
#include "prox_ipv6.h"
+#include "handle_lb_5tuple.h"
struct pkt_template {
uint16_t len;
@@ -99,6 +101,10 @@ struct task_gen_pcap {
uint32_t socket_id;
};
+struct flows {
+ uint32_t packet_id;
+};
+
struct task_gen {
struct task_base base;
uint64_t hz;
@@ -119,6 +125,8 @@ struct task_gen {
uint16_t packet_id_pos;
uint16_t accur_pos;
uint16_t sig_pos;
+ uint16_t flow_id_pos;
+ uint16_t packet_id_in_flow_pos;
uint32_t sig;
uint32_t socket_id;
uint8_t generator_id;
@@ -151,6 +159,8 @@ struct task_gen {
uint32_t store_msk;
struct packet *store_buf;
FILE *fp;
+ struct rte_hash *flow_id_table;
+ struct flows*flows;
} __rte_cache_aligned;
static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes);
@@ -394,6 +404,23 @@ static void task_gen_apply_all_random_fields(struct task_gen *task, uint8_t **pk
task_gen_apply_random_fields(task, pkt_hdr[i]);
}
+static void task_gen_apply_ranges(struct task_gen *task, uint8_t *pkt_hdr)
+{
+ uint32_t ret;
+ if (!task->n_ranges)
+ return;
+
+ for (uint16_t j = 0; j < task->n_ranges; ++j) {
+ if (unlikely(task->ranges[j].value == task->ranges[j].max))
+ task->ranges[j].value = task->ranges[j].min;
+ else
+ task->ranges[j].value++;
+ ret = rte_bswap32(task->ranges[j].value);
+ uint8_t *pret = (uint8_t*)&ret;
+ rte_memcpy(pkt_hdr + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len);
+ }
+}
+
static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
{
uint32_t ret;
@@ -401,14 +428,118 @@ static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr,
return;
for (uint16_t i = 0; i < count; ++i) {
- for (uint16_t j = 0; j < task->n_ranges; ++j) {
- if (unlikely(task->ranges[j].value == task->ranges[j].max))
- task->ranges[j].value = task->ranges[j].min;
- else
- task->ranges[j].value++;
- ret = rte_bswap32(task->ranges[j].value);
- uint8_t *pret = (uint8_t*)&ret;
- rte_memcpy(pkt_hdr[i] + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len);
+ task_gen_apply_ranges(task, pkt_hdr[i]);
+ }
+}
+
+static inline uint32_t gcd(uint32_t a, uint32_t b)
+{
+ // Euclidean algorithm
+ uint32_t t;
+ while (b != 0) {
+ t = b;
+ b = a % b;
+ a = t;
+ }
+ return a;
+}
+
+static inline uint32_t lcm(uint32_t a, uint32_t b)
+{
+ return ((a / gcd(a, b)) * b);
+}
+
+static uint32_t get_n_range_flows(struct task_gen *task)
+{
+ uint32_t t = 1;
+ for (int i = 0; i < task->n_ranges; i++) {
+ t = lcm((task->ranges[i].max - task->ranges[i].min) + 1, t);
+ }
+ return t;
+}
+
+static uint32_t get_n_rand_flows(struct task_gen *task)
+{
+ uint32_t t = 0;
+ for (int i = 0; i < task->n_rands; i++) {
+ t += __builtin_popcount(task->rand[i].rand_mask);
+ }
+ PROX_PANIC(t > 31, "Too many random bits - maximum 31 supported\n");
+ return 1 << t;
+}
+
+//void add_to_hash_table(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val, uint32_t fixed_bits, uint32_t rand_offset) {
+// uint32_t ret_tmp = val | fixed_bits;
+// ret_tmp = rte_bswap32(ret_tmp);
+// uint8_t *pret_tmp = (uint8_t*)&ret_tmp;
+// rte_memcpy(buf + rand_offset, pret_tmp + 4 - rand_len, rand_len);
+//
+// init idx
+// alloc buffer
+// init/alloc hash_table
+//void build_buffer(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val)
+//{
+// if (mask == 0) {
+// buffer[*idx] = val;
+// *idx = (*idx) + 1;
+// return;
+// }
+// build_buffer(task, but, mask >> 1, bit_pos + 1, val);
+// if (mask & 1) {
+// build_buffer(task, but, mask >> 1, bit_pos + 1, val | (1 << bit_pos));
+//}
+
+static void build_flow_table(struct task_gen *task)
+{
+ uint8_t buf[2048], *key_fields;
+ union ipv4_5tuple_host key;
+ struct pkt_template *pkt_template;
+ uint32_t n_range_flows = get_n_range_flows(task);
+ // uint32_t n_rand_flows = get_n_rand_flows(task);
+ // uint32_t n_flows= n_range_flows * n_rand_flows * task->orig_n_pkts;
+ // for (int i = 0; i < task->n_rands; i++) {
+ // build_buffer(task, task->values_buf[i], &task->values_idx[i], task->rand[i].rand_mask, 0, 0);
+ // }
+
+ uint32_t n_flows = n_range_flows * task->orig_n_pkts;
+
+ for (uint32_t k = 0; k < task->orig_n_pkts; k++) {
+ memcpy(buf, task->pkt_template[k].buf, task->pkt_template[k].len);
+ for (uint32_t j = 0; j < n_range_flows; j++) {
+ task_gen_apply_ranges(task, buf);
+ key_fields = buf + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live);
+ key.xmm = _mm_loadu_si128((__m128i*)(key_fields));
+ key.pad0 = key.pad1 = 0;
+ int idx = rte_hash_add_key(task->flow_id_table, (const void *)&key);
+ PROX_PANIC(idx < 0, "Unable to add key in table\n");
+ if (idx >= 0)
+ plog_dbg("Added key %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst);
+ }
+ }
+}
+
+static int32_t task_gen_get_flow_id(struct task_gen *task, uint8_t *pkt_hdr)
+{
+ int ret = 0;
+ union ipv4_5tuple_host key;
+ uint8_t *hdr = pkt_hdr + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live);
+ // __m128i data = _mm_loadu_si128((__m128i*)(hdr));
+ // key.xmm = _mm_and_si128(data, mask0);
+ key.xmm = _mm_loadu_si128((__m128i*)(hdr));
+ key.pad0 = key.pad1 = 0;
+ ret = rte_hash_lookup(task->flow_id_table, (const void *)&key);
+ if (ret < 0) {
+ plog_err("Flow not found: %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst);
+ }
+ return ret;
+}
+
+static void task_gen_apply_all_flow_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *flow_id)
+{
+ if (task->flow_id_pos) {
+ for (uint16_t j = 0; j < count; ++j) {
+ flow_id[j] = task_gen_get_flow_id(task, pkt_hdr[j]);
+ *(int32_t *)(pkt_hdr[j] + task->flow_id_pos) = flow_id[j];
}
}
}
@@ -424,7 +555,7 @@ static void task_gen_apply_sig(struct task_gen *task, struct pkt_template *dst)
*(uint32_t *)(dst->buf + task->sig_pos) = task->sig;
}
-static void task_gen_apply_all_accur_pos(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
+static void task_gen_apply_all_accur_pos(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
{
if (!task->accur_pos)
return;
@@ -445,7 +576,7 @@ static void task_gen_apply_unique_id(struct task_gen *task, uint8_t *pkt_hdr, co
*dst = *id;
}
-static void task_gen_apply_all_unique_id(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
+static void task_gen_apply_all_unique_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count)
{
if (!task->packet_id_pos)
return;
@@ -457,6 +588,26 @@ static void task_gen_apply_all_unique_id(struct task_gen *task, struct rte_mbuf
}
}
+static void task_gen_apply_id_in_flows(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id)
+{
+ struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_in_flow_pos);
+ *dst = *id;
+}
+
+static void task_gen_apply_all_id_in_flows(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *idx)
+{
+ if (!task->packet_id_in_flow_pos)
+ return;
+
+ for (uint16_t i = 0; i < count; ++i) {
+ struct unique_id id;
+ if (idx[i] >= 0 ) {
+ unique_id_init(&id, task->generator_id, task->flows[idx[i]].packet_id++);
+ task_gen_apply_id_in_flows(task, pkt_hdr[i], &id);
+ }
+ }
+}
+
static void task_gen_checksum_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count)
{
if (!(task->runtime_flags & TASK_TX_CRC))
@@ -746,7 +897,7 @@ static int check_fields_in_bounds(struct task_gen *task, uint32_t pkt_size, int
return 0;
}
-static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t n_orig_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes)
+static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t orig_n_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes)
{
size_t k;
uint32_t l4_len;
@@ -754,8 +905,8 @@ static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t n_orig_
struct pkt_template *template;
for (size_t j = 0; j < nb_pkt_sizes; ++j) {
- for (size_t i = 0; i < n_orig_pkts; ++i) {
- k = j * n_orig_pkts + i;
+ for (size_t i = 0; i < orig_n_pkts; ++i) {
+ k = j * orig_n_pkts + i;
template = &task->pkt_template[k];
if (template->l2_len == 0)
continue;
@@ -983,13 +1134,15 @@ static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin
if (new_pkts == NULL)
return 0;
uint8_t *pkt_hdr[MAX_RING_BURST];
-
+ int32_t flow_id[MAX_RING_BURST];
task_gen_load_and_prefetch(new_pkts, pkt_hdr, send_bulk);
task_gen_build_packets(task, new_pkts, pkt_hdr, send_bulk);
task_gen_apply_all_random_fields(task, pkt_hdr, send_bulk);
task_gen_apply_all_ranges(task, pkt_hdr, send_bulk);
- task_gen_apply_all_accur_pos(task, new_pkts, pkt_hdr, send_bulk);
- task_gen_apply_all_unique_id(task, new_pkts, pkt_hdr, send_bulk);
+ task_gen_apply_all_accur_pos(task, pkt_hdr, send_bulk);
+ task_gen_apply_all_flow_id(task, pkt_hdr, send_bulk, flow_id);
+ task_gen_apply_all_unique_id(task, pkt_hdr, send_bulk);
+ task_gen_apply_all_id_in_flows(task, pkt_hdr, send_bulk, flow_id);
uint64_t tsc_before_tx;
@@ -1311,8 +1464,8 @@ static struct rte_mempool *task_gen_create_mempool(struct task_args *targ, uint1
PROX_PANIC(ret == NULL, "Failed to allocate dummy memory pool on socket %u with %u elements\n",
sock_id, targ->nb_mbuf - 1);
- plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret,
- targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id);
+ plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret,
+ targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id);
return ret;
}
@@ -1621,6 +1774,8 @@ static void init_task_gen(struct task_base *tbase, struct task_args *targ)
task->lat_pos = targ->lat_pos;
task->accur_pos = targ->accur_pos;
task->sig_pos = targ->sig_pos;
+ task->flow_id_pos = targ->flow_id_pos;
+ task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
task->sig = targ->sig;
task->new_rate_bps = targ->rate_bps;
@@ -1714,6 +1869,31 @@ static void init_task_gen(struct task_base *tbase, struct task_args *targ)
} else {
task->store_msk = 0;
}
+ uint32_t n_entries = get_n_range_flows(task) * task->orig_n_pkts * 4;
+#ifndef RTE_HASH_BUCKET_ENTRIES
+#define RTE_HASH_BUCKET_ENTRIES 8
+#endif
+ // cuckoo hash requires at least RTE_HASH_BUCKET_ENTRIES (8) entries
+ if (n_entries < RTE_HASH_BUCKET_ENTRIES)
+ n_entries = RTE_HASH_BUCKET_ENTRIES;
+
+ static char hash_name[30];
+ sprintf(hash_name, "A%03d_hash_gen_table", targ->lconf->id);
+ struct rte_hash_parameters hash_params = {
+ .name = hash_name,
+ .entries = n_entries,
+ .key_len = sizeof(union ipv4_5tuple_host),
+ .hash_func = rte_hash_crc,
+ .hash_func_init_val = 0,
+ };
+ plog_info("\t\thash table name = %s\n", hash_params.name);
+ task->flow_id_table = rte_hash_create(&hash_params);
+ PROX_PANIC(task->flow_id_table == NULL, "Failed to set up flow_id hash table for gen\n");
+ plog_info("\t\tflow_id hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len);
+ build_flow_table(task);
+ task->flows = (struct flows *)prox_zmalloc(n_entries * sizeof(struct flows), task->socket_id);
+ PROX_PANIC(task->flows == NULL, "Failed to allocate flows\n");
+ plog_info("\t\t%d flows allocated\n", n_entries);
}
static struct task_init task_init_gen = {