diff options
author | Xavier Simonart <simonartxavier@gmail.com> | 2021-12-30 10:24:15 +0000 |
---|---|---|
committer | Xavier Simonart <simonartxavier@gmail.com> | 2022-01-03 15:10:00 +0000 |
commit | 971d163b345ab3f74d6a5252722371dc17a8fe7a (patch) | |
tree | f9103fac18a5ef0fc9e99638d5abc26f08bb9a82 | |
parent | ba5ffcd0031bdf1f4781d9dcf224b5cb75aee922 (diff) |
Add support for detection of misordered packets per flow
Only flows from ranges are supported so far. This means that
flows generated through randoms are not supported yet.
Misordered packets can now be detected per flow.
flow definition is hard-coded as 5 tuple (proto, ip/udp src/dst).
The generator writes flow_id and packet_id within the packets (4 + 5 bytes).
This means that, if all latency, packet_id, signature, accuracy, flow_id and packet_id_in_flow
parameters are set, the minimum packet size is 72 bytes.
This feature is enabled through the following parameters:
On the gen side:
- packet id in flow pos: position (5 bytes) for packet id
- flow id pos: flow id pos (4 bytes)
On the lat side
- flow_count: the number of flows received by the latency core. If not enough
flows are configured, then an error message is displayed (once) and packet_reordering
count is only measured for those flows
- packet id in flow pos: position (5 bytes) for packet id
- flow id pos: flow id pos (4 bytes)
Misordered packets (and extent) are reported per core as before.
No change related to duplicate packets (not measured per flow as a duplicate packet is not per flow...).
Signed-off-by: Xavier Simonart <simonartxavier@gmail.com>
Change-Id: I07517df87dfebec81408baf1decb647d9a0edd94
-rw-r--r-- | VNFs/DPPD-PROX/handle_gen.c | 216 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/handle_lat.c | 58 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/prox_args.c | 9 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/task_init.h | 3 |
4 files changed, 263 insertions, 23 deletions
diff --git a/VNFs/DPPD-PROX/handle_gen.c b/VNFs/DPPD-PROX/handle_gen.c index 363ee17d..9cacdb54 100644 --- a/VNFs/DPPD-PROX/handle_gen.c +++ b/VNFs/DPPD-PROX/handle_gen.c @@ -21,6 +21,7 @@ #include <rte_version.h> #include <rte_byteorder.h> #include <rte_ether.h> +#include <rte_hash.h> #include <rte_hash_crc.h> #include <rte_malloc.h> @@ -49,6 +50,7 @@ #include "handle_master.h" #include "defines.h" #include "prox_ipv6.h" +#include "handle_lb_5tuple.h" struct pkt_template { uint16_t len; @@ -99,6 +101,10 @@ struct task_gen_pcap { uint32_t socket_id; }; +struct flows { + uint32_t packet_id; +}; + struct task_gen { struct task_base base; uint64_t hz; @@ -119,6 +125,8 @@ struct task_gen { uint16_t packet_id_pos; uint16_t accur_pos; uint16_t sig_pos; + uint16_t flow_id_pos; + uint16_t packet_id_in_flow_pos; uint32_t sig; uint32_t socket_id; uint8_t generator_id; @@ -151,6 +159,8 @@ struct task_gen { uint32_t store_msk; struct packet *store_buf; FILE *fp; + struct rte_hash *flow_id_table; + struct flows*flows; } __rte_cache_aligned; static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes); @@ -394,6 +404,23 @@ static void task_gen_apply_all_random_fields(struct task_gen *task, uint8_t **pk task_gen_apply_random_fields(task, pkt_hdr[i]); } +static void task_gen_apply_ranges(struct task_gen *task, uint8_t *pkt_hdr) +{ + uint32_t ret; + if (!task->n_ranges) + return; + + for (uint16_t j = 0; j < task->n_ranges; ++j) { + if (unlikely(task->ranges[j].value == task->ranges[j].max)) + task->ranges[j].value = task->ranges[j].min; + else + task->ranges[j].value++; + ret = rte_bswap32(task->ranges[j].value); + uint8_t *pret = (uint8_t*)&ret; + rte_memcpy(pkt_hdr + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len); + } +} + static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count) { uint32_t ret; @@ -401,14 +428,118 @@ static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, return; for (uint16_t i = 0; i < count; ++i) { - for (uint16_t j = 0; j < task->n_ranges; ++j) { - if (unlikely(task->ranges[j].value == task->ranges[j].max)) - task->ranges[j].value = task->ranges[j].min; - else - task->ranges[j].value++; - ret = rte_bswap32(task->ranges[j].value); - uint8_t *pret = (uint8_t*)&ret; - rte_memcpy(pkt_hdr[i] + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len); + task_gen_apply_ranges(task, pkt_hdr[i]); + } +} + +static inline uint32_t gcd(uint32_t a, uint32_t b) +{ + // Euclidean algorithm + uint32_t t; + while (b != 0) { + t = b; + b = a % b; + a = t; + } + return a; +} + +static inline uint32_t lcm(uint32_t a, uint32_t b) +{ + return ((a / gcd(a, b)) * b); +} + +static uint32_t get_n_range_flows(struct task_gen *task) +{ + uint32_t t = 1; + for (int i = 0; i < task->n_ranges; i++) { + t = lcm((task->ranges[i].max - task->ranges[i].min) + 1, t); + } + return t; +} + +static uint32_t get_n_rand_flows(struct task_gen *task) +{ + uint32_t t = 0; + for (int i = 0; i < task->n_rands; i++) { + t += __builtin_popcount(task->rand[i].rand_mask); + } + PROX_PANIC(t > 31, "Too many random bits - maximum 31 supported\n"); + return 1 << t; +} + +//void add_to_hash_table(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val, uint32_t fixed_bits, uint32_t rand_offset) { +// uint32_t ret_tmp = val | fixed_bits; +// ret_tmp = rte_bswap32(ret_tmp); +// uint8_t *pret_tmp = (uint8_t*)&ret_tmp; +// rte_memcpy(buf + rand_offset, pret_tmp + 4 - rand_len, rand_len); +// +// init idx +// alloc buffer +// init/alloc hash_table +//void build_buffer(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val) +//{ +// if (mask == 0) { +// buffer[*idx] = val; +// *idx = (*idx) + 1; +// return; +// } +// build_buffer(task, but, mask >> 1, bit_pos + 1, val); +// if (mask & 1) { +// build_buffer(task, but, mask >> 1, bit_pos + 1, val | (1 << bit_pos)); +//} + +static void build_flow_table(struct task_gen *task) +{ + uint8_t buf[2048], *key_fields; + union ipv4_5tuple_host key; + struct pkt_template *pkt_template; + uint32_t n_range_flows = get_n_range_flows(task); + // uint32_t n_rand_flows = get_n_rand_flows(task); + // uint32_t n_flows= n_range_flows * n_rand_flows * task->orig_n_pkts; + // for (int i = 0; i < task->n_rands; i++) { + // build_buffer(task, task->values_buf[i], &task->values_idx[i], task->rand[i].rand_mask, 0, 0); + // } + + uint32_t n_flows = n_range_flows * task->orig_n_pkts; + + for (uint32_t k = 0; k < task->orig_n_pkts; k++) { + memcpy(buf, task->pkt_template[k].buf, task->pkt_template[k].len); + for (uint32_t j = 0; j < n_range_flows; j++) { + task_gen_apply_ranges(task, buf); + key_fields = buf + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live); + key.xmm = _mm_loadu_si128((__m128i*)(key_fields)); + key.pad0 = key.pad1 = 0; + int idx = rte_hash_add_key(task->flow_id_table, (const void *)&key); + PROX_PANIC(idx < 0, "Unable to add key in table\n"); + if (idx >= 0) + plog_dbg("Added key %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst); + } + } +} + +static int32_t task_gen_get_flow_id(struct task_gen *task, uint8_t *pkt_hdr) +{ + int ret = 0; + union ipv4_5tuple_host key; + uint8_t *hdr = pkt_hdr + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live); + // __m128i data = _mm_loadu_si128((__m128i*)(hdr)); + // key.xmm = _mm_and_si128(data, mask0); + key.xmm = _mm_loadu_si128((__m128i*)(hdr)); + key.pad0 = key.pad1 = 0; + ret = rte_hash_lookup(task->flow_id_table, (const void *)&key); + if (ret < 0) { + plog_err("Flow not found: %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst); + } + return ret; +} + +static void task_gen_apply_all_flow_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *flow_id) +{ + if (task->flow_id_pos) { + for (uint16_t j = 0; j < count; ++j) { + flow_id[j] = task_gen_get_flow_id(task, pkt_hdr[j]); + *(int32_t *)(pkt_hdr[j] + task->flow_id_pos) = flow_id[j]; } } } @@ -424,7 +555,7 @@ static void task_gen_apply_sig(struct task_gen *task, struct pkt_template *dst) *(uint32_t *)(dst->buf + task->sig_pos) = task->sig; } -static void task_gen_apply_all_accur_pos(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count) +static void task_gen_apply_all_accur_pos(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count) { if (!task->accur_pos) return; @@ -445,7 +576,7 @@ static void task_gen_apply_unique_id(struct task_gen *task, uint8_t *pkt_hdr, co *dst = *id; } -static void task_gen_apply_all_unique_id(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count) +static void task_gen_apply_all_unique_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count) { if (!task->packet_id_pos) return; @@ -457,6 +588,26 @@ static void task_gen_apply_all_unique_id(struct task_gen *task, struct rte_mbuf } } +static void task_gen_apply_id_in_flows(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id) +{ + struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_in_flow_pos); + *dst = *id; +} + +static void task_gen_apply_all_id_in_flows(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *idx) +{ + if (!task->packet_id_in_flow_pos) + return; + + for (uint16_t i = 0; i < count; ++i) { + struct unique_id id; + if (idx[i] >= 0 ) { + unique_id_init(&id, task->generator_id, task->flows[idx[i]].packet_id++); + task_gen_apply_id_in_flows(task, pkt_hdr[i], &id); + } + } +} + static void task_gen_checksum_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count) { if (!(task->runtime_flags & TASK_TX_CRC)) @@ -746,7 +897,7 @@ static int check_fields_in_bounds(struct task_gen *task, uint32_t pkt_size, int return 0; } -static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t n_orig_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes) +static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t orig_n_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes) { size_t k; uint32_t l4_len; @@ -754,8 +905,8 @@ static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t n_orig_ struct pkt_template *template; for (size_t j = 0; j < nb_pkt_sizes; ++j) { - for (size_t i = 0; i < n_orig_pkts; ++i) { - k = j * n_orig_pkts + i; + for (size_t i = 0; i < orig_n_pkts; ++i) { + k = j * orig_n_pkts + i; template = &task->pkt_template[k]; if (template->l2_len == 0) continue; @@ -983,13 +1134,15 @@ static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin if (new_pkts == NULL) return 0; uint8_t *pkt_hdr[MAX_RING_BURST]; - + int32_t flow_id[MAX_RING_BURST]; task_gen_load_and_prefetch(new_pkts, pkt_hdr, send_bulk); task_gen_build_packets(task, new_pkts, pkt_hdr, send_bulk); task_gen_apply_all_random_fields(task, pkt_hdr, send_bulk); task_gen_apply_all_ranges(task, pkt_hdr, send_bulk); - task_gen_apply_all_accur_pos(task, new_pkts, pkt_hdr, send_bulk); - task_gen_apply_all_unique_id(task, new_pkts, pkt_hdr, send_bulk); + task_gen_apply_all_accur_pos(task, pkt_hdr, send_bulk); + task_gen_apply_all_flow_id(task, pkt_hdr, send_bulk, flow_id); + task_gen_apply_all_unique_id(task, pkt_hdr, send_bulk); + task_gen_apply_all_id_in_flows(task, pkt_hdr, send_bulk, flow_id); uint64_t tsc_before_tx; @@ -1311,8 +1464,8 @@ static struct rte_mempool *task_gen_create_mempool(struct task_args *targ, uint1 PROX_PANIC(ret == NULL, "Failed to allocate dummy memory pool on socket %u with %u elements\n", sock_id, targ->nb_mbuf - 1); - plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret, - targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id); + plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret, + targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id); return ret; } @@ -1621,6 +1774,8 @@ static void init_task_gen(struct task_base *tbase, struct task_args *targ) task->lat_pos = targ->lat_pos; task->accur_pos = targ->accur_pos; task->sig_pos = targ->sig_pos; + task->flow_id_pos = targ->flow_id_pos; + task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos; task->sig = targ->sig; task->new_rate_bps = targ->rate_bps; @@ -1714,6 +1869,31 @@ static void init_task_gen(struct task_base *tbase, struct task_args *targ) } else { task->store_msk = 0; } + uint32_t n_entries = get_n_range_flows(task) * task->orig_n_pkts * 4; +#ifndef RTE_HASH_BUCKET_ENTRIES +#define RTE_HASH_BUCKET_ENTRIES 8 +#endif + // cuckoo hash requires at least RTE_HASH_BUCKET_ENTRIES (8) entries + if (n_entries < RTE_HASH_BUCKET_ENTRIES) + n_entries = RTE_HASH_BUCKET_ENTRIES; + + static char hash_name[30]; + sprintf(hash_name, "A%03d_hash_gen_table", targ->lconf->id); + struct rte_hash_parameters hash_params = { + .name = hash_name, + .entries = n_entries, + .key_len = sizeof(union ipv4_5tuple_host), + .hash_func = rte_hash_crc, + .hash_func_init_val = 0, + }; + plog_info("\t\thash table name = %s\n", hash_params.name); + task->flow_id_table = rte_hash_create(&hash_params); + PROX_PANIC(task->flow_id_table == NULL, "Failed to set up flow_id hash table for gen\n"); + plog_info("\t\tflow_id hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len); + build_flow_table(task); + task->flows = (struct flows *)prox_zmalloc(n_entries * sizeof(struct flows), task->socket_id); + PROX_PANIC(task->flows == NULL, "Failed to allocate flows\n"); + plog_info("\t\t%d flows allocated\n", n_entries); } static struct task_init task_init_gen = { diff --git a/VNFs/DPPD-PROX/handle_lat.c b/VNFs/DPPD-PROX/handle_lat.c index 550f3f55..d6943070 100644 --- a/VNFs/DPPD-PROX/handle_lat.c +++ b/VNFs/DPPD-PROX/handle_lat.c @@ -91,6 +91,10 @@ struct loss_buffer { uint32_t n; }; +struct flows { + uint32_t packet_id; +}; + struct task_lat { struct task_base base; uint64_t limit; @@ -125,6 +129,10 @@ struct task_lat { uint32_t loss_buffer_size; struct loss_buffer *loss_buffer; uint32_t loss_id; + uint32_t packet_id_in_flow_pos; + int32_t flow_id_pos; + uint32_t flow_count; + struct flows *flows; }; /* This function calculate the difference between rx and tx_time * Both values are uint32_t (see handle_lat_bulk) @@ -490,6 +498,15 @@ static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc) lat_test->buckets[bucket_id]++; } +static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id) +{ + if (packet_id < task->flows[flow_id].packet_id) { + lat_test->mis_ordered++; + lat_test->extent += task->flows[flow_id].packet_id - packet_id; + } + task->flows[flow_id].packet_id = packet_id; +} + static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id) { if (packet_id < task->previous_packet[generator_id]) { @@ -551,6 +568,7 @@ static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts) { struct task_lat *task = (struct task_lat *)tbase; + static int max_flows_printed = 0; int rc; if (n_pkts == 0) { @@ -639,6 +657,24 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin uint8_t generator_id; uint32_t packet_id; + int32_t flow_id = -1; + if (task->flow_id_pos) { + flow_id = *(int32_t *)(hdr + task->flow_id_pos); + if (unlikely(flow_id >= (int32_t)(task->flow_count))) { + flow_id = -1; + if (!max_flows_printed) { + plog_info("Too many flows - increase flow count (only printed once)\n"); + max_flows_printed = 1; + } + } + + } + if (task->packet_id_in_flow_pos && (flow_id != -1)) { + uint32_t packet_id_in_flow; + struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos); + unique_id_get(unique_id, &generator_id, &packet_id_in_flow); + lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow); + } if (task->unique_id_pos) { struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos); unique_id_get(unique_id, &generator_id, &packet_id); @@ -650,7 +686,9 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin // Skip unexpected packet continue; } - lat_test_check_ordering(task, task->lat_test, packet_id, generator_id); + if (flow_id == -1) { + lat_test_check_ordering(task, task->lat_test, packet_id, generator_id); + } lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id); uint32_t loss = task_lat_early_loss_detect(task, packet_id, generator_id); if (loss) { @@ -779,6 +817,8 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ) task->accur_pos = targ->accur_pos; task->sig_pos = targ->sig_pos; task->sig = targ->sig; + task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos; + task->flow_id_pos = targ->flow_id_pos; task->unique_id_pos = targ->packet_id_pos; task->latency_buffer_size = targ->latency_buffer_size; @@ -860,10 +900,11 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ) } } task->loss_buffer_size = targ->loss_buffer_size; - task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id)); - PROX_PANIC(task->loss_buffer == NULL, - "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer)); - + if (task->loss_buffer_size) { + task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id)); + PROX_PANIC(task->loss_buffer == NULL, + "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer)); + } task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id)); PROX_PANIC(task->bytes_to_tsc == NULL, "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST); @@ -877,6 +918,13 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ) else task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz; } + task->flow_count = targ->flow_count; + PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n"); + if (task->flow_count) { + task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id)); + PROX_PANIC(task->flows == NULL, + "Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count); + } } static struct task_init task_init_lat = { diff --git a/VNFs/DPPD-PROX/prox_args.c b/VNFs/DPPD-PROX/prox_args.c index 274e6c96..e8c275bb 100644 --- a/VNFs/DPPD-PROX/prox_args.c +++ b/VNFs/DPPD-PROX/prox_args.c @@ -1163,6 +1163,15 @@ static int get_core_cfg(unsigned sindex, char *str, void *data) if (STR_EQ(str, "packet id pos")) { return parse_int(&targ->packet_id_pos, pkey); } + if (STR_EQ(str, "flow id pos")) { + return parse_int(&targ->flow_id_pos, pkey); + } + if (STR_EQ(str, "packet id in flow pos")) { + return parse_int(&targ->packet_id_in_flow_pos, pkey); + } + if (STR_EQ(str, "flow count")) { + return parse_int(&targ->flow_count, pkey); + } if (STR_EQ(str, "probability")) { // old - use "probability no drop" instead float probability; int rc = parse_float(&probability, pkey); diff --git a/VNFs/DPPD-PROX/task_init.h b/VNFs/DPPD-PROX/task_init.h index 82a58259..53bfaf35 100644 --- a/VNFs/DPPD-PROX/task_init.h +++ b/VNFs/DPPD-PROX/task_init.h @@ -205,6 +205,9 @@ struct task_args { struct range range[64]; char pcap_file[256]; uint32_t accur_pos; + uint32_t flow_id_pos; + uint32_t packet_id_in_flow_pos; + uint32_t flow_count; uint32_t sig_pos; uint32_t sig; uint32_t lat_pos; |