diff options
-rw-r--r-- | VNFs/DPPD-PROX/handle_gen.c | 216 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/handle_lat.c | 58 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/prox_args.c | 9 | ||||
-rw-r--r-- | VNFs/DPPD-PROX/task_init.h | 3 |
4 files changed, 263 insertions, 23 deletions
diff --git a/VNFs/DPPD-PROX/handle_gen.c b/VNFs/DPPD-PROX/handle_gen.c index 363ee17d..9cacdb54 100644 --- a/VNFs/DPPD-PROX/handle_gen.c +++ b/VNFs/DPPD-PROX/handle_gen.c @@ -21,6 +21,7 @@ #include <rte_version.h> #include <rte_byteorder.h> #include <rte_ether.h> +#include <rte_hash.h> #include <rte_hash_crc.h> #include <rte_malloc.h> @@ -49,6 +50,7 @@ #include "handle_master.h" #include "defines.h" #include "prox_ipv6.h" +#include "handle_lb_5tuple.h" struct pkt_template { uint16_t len; @@ -99,6 +101,10 @@ struct task_gen_pcap { uint32_t socket_id; }; +struct flows { + uint32_t packet_id; +}; + struct task_gen { struct task_base base; uint64_t hz; @@ -119,6 +125,8 @@ struct task_gen { uint16_t packet_id_pos; uint16_t accur_pos; uint16_t sig_pos; + uint16_t flow_id_pos; + uint16_t packet_id_in_flow_pos; uint32_t sig; uint32_t socket_id; uint8_t generator_id; @@ -151,6 +159,8 @@ struct task_gen { uint32_t store_msk; struct packet *store_buf; FILE *fp; + struct rte_hash *flow_id_table; + struct flows*flows; } __rte_cache_aligned; static void task_gen_set_pkt_templates_len(struct task_gen *task, uint32_t *pkt_sizes); @@ -394,6 +404,23 @@ static void task_gen_apply_all_random_fields(struct task_gen *task, uint8_t **pk task_gen_apply_random_fields(task, pkt_hdr[i]); } +static void task_gen_apply_ranges(struct task_gen *task, uint8_t *pkt_hdr) +{ + uint32_t ret; + if (!task->n_ranges) + return; + + for (uint16_t j = 0; j < task->n_ranges; ++j) { + if (unlikely(task->ranges[j].value == task->ranges[j].max)) + task->ranges[j].value = task->ranges[j].min; + else + task->ranges[j].value++; + ret = rte_bswap32(task->ranges[j].value); + uint8_t *pret = (uint8_t*)&ret; + rte_memcpy(pkt_hdr + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len); + } +} + static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count) { uint32_t ret; @@ -401,14 +428,118 @@ static void task_gen_apply_all_ranges(struct task_gen *task, uint8_t **pkt_hdr, return; for (uint16_t i = 0; i < count; ++i) { - for (uint16_t j = 0; j < task->n_ranges; ++j) { - if (unlikely(task->ranges[j].value == task->ranges[j].max)) - task->ranges[j].value = task->ranges[j].min; - else - task->ranges[j].value++; - ret = rte_bswap32(task->ranges[j].value); - uint8_t *pret = (uint8_t*)&ret; - rte_memcpy(pkt_hdr[i] + task->ranges[j].offset, pret + 4 - task->ranges[j].range_len, task->ranges[j].range_len); + task_gen_apply_ranges(task, pkt_hdr[i]); + } +} + +static inline uint32_t gcd(uint32_t a, uint32_t b) +{ + // Euclidean algorithm + uint32_t t; + while (b != 0) { + t = b; + b = a % b; + a = t; + } + return a; +} + +static inline uint32_t lcm(uint32_t a, uint32_t b) +{ + return ((a / gcd(a, b)) * b); +} + +static uint32_t get_n_range_flows(struct task_gen *task) +{ + uint32_t t = 1; + for (int i = 0; i < task->n_ranges; i++) { + t = lcm((task->ranges[i].max - task->ranges[i].min) + 1, t); + } + return t; +} + +static uint32_t get_n_rand_flows(struct task_gen *task) +{ + uint32_t t = 0; + for (int i = 0; i < task->n_rands; i++) { + t += __builtin_popcount(task->rand[i].rand_mask); + } + PROX_PANIC(t > 31, "Too many random bits - maximum 31 supported\n"); + return 1 << t; +} + +//void add_to_hash_table(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val, uint32_t fixed_bits, uint32_t rand_offset) { +// uint32_t ret_tmp = val | fixed_bits; +// ret_tmp = rte_bswap32(ret_tmp); +// uint8_t *pret_tmp = (uint8_t*)&ret_tmp; +// rte_memcpy(buf + rand_offset, pret_tmp + 4 - rand_len, rand_len); +// +// init idx +// alloc buffer +// init/alloc hash_table +//void build_buffer(struct task_gen *task, uint32_t *buffer, uint32_t *idx, uint32_t mask, uint32_t bit_pos, uint32_t val) +//{ +// if (mask == 0) { +// buffer[*idx] = val; +// *idx = (*idx) + 1; +// return; +// } +// build_buffer(task, but, mask >> 1, bit_pos + 1, val); +// if (mask & 1) { +// build_buffer(task, but, mask >> 1, bit_pos + 1, val | (1 << bit_pos)); +//} + +static void build_flow_table(struct task_gen *task) +{ + uint8_t buf[2048], *key_fields; + union ipv4_5tuple_host key; + struct pkt_template *pkt_template; + uint32_t n_range_flows = get_n_range_flows(task); + // uint32_t n_rand_flows = get_n_rand_flows(task); + // uint32_t n_flows= n_range_flows * n_rand_flows * task->orig_n_pkts; + // for (int i = 0; i < task->n_rands; i++) { + // build_buffer(task, task->values_buf[i], &task->values_idx[i], task->rand[i].rand_mask, 0, 0); + // } + + uint32_t n_flows = n_range_flows * task->orig_n_pkts; + + for (uint32_t k = 0; k < task->orig_n_pkts; k++) { + memcpy(buf, task->pkt_template[k].buf, task->pkt_template[k].len); + for (uint32_t j = 0; j < n_range_flows; j++) { + task_gen_apply_ranges(task, buf); + key_fields = buf + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live); + key.xmm = _mm_loadu_si128((__m128i*)(key_fields)); + key.pad0 = key.pad1 = 0; + int idx = rte_hash_add_key(task->flow_id_table, (const void *)&key); + PROX_PANIC(idx < 0, "Unable to add key in table\n"); + if (idx >= 0) + plog_dbg("Added key %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst); + } + } +} + +static int32_t task_gen_get_flow_id(struct task_gen *task, uint8_t *pkt_hdr) +{ + int ret = 0; + union ipv4_5tuple_host key; + uint8_t *hdr = pkt_hdr + sizeof(prox_rte_ether_hdr) + offsetof(prox_rte_ipv4_hdr, time_to_live); + // __m128i data = _mm_loadu_si128((__m128i*)(hdr)); + // key.xmm = _mm_and_si128(data, mask0); + key.xmm = _mm_loadu_si128((__m128i*)(hdr)); + key.pad0 = key.pad1 = 0; + ret = rte_hash_lookup(task->flow_id_table, (const void *)&key); + if (ret < 0) { + plog_err("Flow not found: %d, %x, %x, %x, %x\n", key.proto, key.ip_src, key.ip_dst, key.port_src, key.port_dst); + } + return ret; +} + +static void task_gen_apply_all_flow_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *flow_id) +{ + if (task->flow_id_pos) { + for (uint16_t j = 0; j < count; ++j) { + flow_id[j] = task_gen_get_flow_id(task, pkt_hdr[j]); + *(int32_t *)(pkt_hdr[j] + task->flow_id_pos) = flow_id[j]; } } } @@ -424,7 +555,7 @@ static void task_gen_apply_sig(struct task_gen *task, struct pkt_template *dst) *(uint32_t *)(dst->buf + task->sig_pos) = task->sig; } -static void task_gen_apply_all_accur_pos(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count) +static void task_gen_apply_all_accur_pos(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count) { if (!task->accur_pos) return; @@ -445,7 +576,7 @@ static void task_gen_apply_unique_id(struct task_gen *task, uint8_t *pkt_hdr, co *dst = *id; } -static void task_gen_apply_all_unique_id(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count) +static void task_gen_apply_all_unique_id(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count) { if (!task->packet_id_pos) return; @@ -457,6 +588,26 @@ static void task_gen_apply_all_unique_id(struct task_gen *task, struct rte_mbuf } } +static void task_gen_apply_id_in_flows(struct task_gen *task, uint8_t *pkt_hdr, const struct unique_id *id) +{ + struct unique_id *dst = (struct unique_id *)(pkt_hdr + task->packet_id_in_flow_pos); + *dst = *id; +} + +static void task_gen_apply_all_id_in_flows(struct task_gen *task, uint8_t **pkt_hdr, uint32_t count, int32_t *idx) +{ + if (!task->packet_id_in_flow_pos) + return; + + for (uint16_t i = 0; i < count; ++i) { + struct unique_id id; + if (idx[i] >= 0 ) { + unique_id_init(&id, task->generator_id, task->flows[idx[i]].packet_id++); + task_gen_apply_id_in_flows(task, pkt_hdr[i], &id); + } + } +} + static void task_gen_checksum_packets(struct task_gen *task, struct rte_mbuf **mbufs, uint8_t **pkt_hdr, uint32_t count) { if (!(task->runtime_flags & TASK_TX_CRC)) @@ -746,7 +897,7 @@ static int check_fields_in_bounds(struct task_gen *task, uint32_t pkt_size, int return 0; } -static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t n_orig_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes) +static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t orig_n_pkts, uint32_t nb_pkt_sizes, uint32_t *pkt_sizes) { size_t k; uint32_t l4_len; @@ -754,8 +905,8 @@ static int task_gen_set_eth_ip_udp_sizes(struct task_gen *task, uint32_t n_orig_ struct pkt_template *template; for (size_t j = 0; j < nb_pkt_sizes; ++j) { - for (size_t i = 0; i < n_orig_pkts; ++i) { - k = j * n_orig_pkts + i; + for (size_t i = 0; i < orig_n_pkts; ++i) { + k = j * orig_n_pkts + i; template = &task->pkt_template[k]; if (template->l2_len == 0) continue; @@ -983,13 +1134,15 @@ static int handle_gen_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin if (new_pkts == NULL) return 0; uint8_t *pkt_hdr[MAX_RING_BURST]; - + int32_t flow_id[MAX_RING_BURST]; task_gen_load_and_prefetch(new_pkts, pkt_hdr, send_bulk); task_gen_build_packets(task, new_pkts, pkt_hdr, send_bulk); task_gen_apply_all_random_fields(task, pkt_hdr, send_bulk); task_gen_apply_all_ranges(task, pkt_hdr, send_bulk); - task_gen_apply_all_accur_pos(task, new_pkts, pkt_hdr, send_bulk); - task_gen_apply_all_unique_id(task, new_pkts, pkt_hdr, send_bulk); + task_gen_apply_all_accur_pos(task, pkt_hdr, send_bulk); + task_gen_apply_all_flow_id(task, pkt_hdr, send_bulk, flow_id); + task_gen_apply_all_unique_id(task, pkt_hdr, send_bulk); + task_gen_apply_all_id_in_flows(task, pkt_hdr, send_bulk, flow_id); uint64_t tsc_before_tx; @@ -1311,8 +1464,8 @@ static struct rte_mempool *task_gen_create_mempool(struct task_args *targ, uint1 PROX_PANIC(ret == NULL, "Failed to allocate dummy memory pool on socket %u with %u elements\n", sock_id, targ->nb_mbuf - 1); - plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret, - targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id); + plog_info("\t\tMempool %p size = %u * %u cache %u, socket %d\n", ret, + targ->nb_mbuf - 1, mbuf_size, targ->nb_cache_mbuf, sock_id); return ret; } @@ -1621,6 +1774,8 @@ static void init_task_gen(struct task_base *tbase, struct task_args *targ) task->lat_pos = targ->lat_pos; task->accur_pos = targ->accur_pos; task->sig_pos = targ->sig_pos; + task->flow_id_pos = targ->flow_id_pos; + task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos; task->sig = targ->sig; task->new_rate_bps = targ->rate_bps; @@ -1714,6 +1869,31 @@ static void init_task_gen(struct task_base *tbase, struct task_args *targ) } else { task->store_msk = 0; } + uint32_t n_entries = get_n_range_flows(task) * task->orig_n_pkts * 4; +#ifndef RTE_HASH_BUCKET_ENTRIES +#define RTE_HASH_BUCKET_ENTRIES 8 +#endif + // cuckoo hash requires at least RTE_HASH_BUCKET_ENTRIES (8) entries + if (n_entries < RTE_HASH_BUCKET_ENTRIES) + n_entries = RTE_HASH_BUCKET_ENTRIES; + + static char hash_name[30]; + sprintf(hash_name, "A%03d_hash_gen_table", targ->lconf->id); + struct rte_hash_parameters hash_params = { + .name = hash_name, + .entries = n_entries, + .key_len = sizeof(union ipv4_5tuple_host), + .hash_func = rte_hash_crc, + .hash_func_init_val = 0, + }; + plog_info("\t\thash table name = %s\n", hash_params.name); + task->flow_id_table = rte_hash_create(&hash_params); + PROX_PANIC(task->flow_id_table == NULL, "Failed to set up flow_id hash table for gen\n"); + plog_info("\t\tflow_id hash table allocated, with %d entries of size %d\n", hash_params.entries, hash_params.key_len); + build_flow_table(task); + task->flows = (struct flows *)prox_zmalloc(n_entries * sizeof(struct flows), task->socket_id); + PROX_PANIC(task->flows == NULL, "Failed to allocate flows\n"); + plog_info("\t\t%d flows allocated\n", n_entries); } static struct task_init task_init_gen = { diff --git a/VNFs/DPPD-PROX/handle_lat.c b/VNFs/DPPD-PROX/handle_lat.c index 550f3f55..d6943070 100644 --- a/VNFs/DPPD-PROX/handle_lat.c +++ b/VNFs/DPPD-PROX/handle_lat.c @@ -91,6 +91,10 @@ struct loss_buffer { uint32_t n; }; +struct flows { + uint32_t packet_id; +}; + struct task_lat { struct task_base base; uint64_t limit; @@ -125,6 +129,10 @@ struct task_lat { uint32_t loss_buffer_size; struct loss_buffer *loss_buffer; uint32_t loss_id; + uint32_t packet_id_in_flow_pos; + int32_t flow_id_pos; + uint32_t flow_count; + struct flows *flows; }; /* This function calculate the difference between rx and tx_time * Both values are uint32_t (see handle_lat_bulk) @@ -490,6 +498,15 @@ static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc) lat_test->buckets[bucket_id]++; } +static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id) +{ + if (packet_id < task->flows[flow_id].packet_id) { + lat_test->mis_ordered++; + lat_test->extent += task->flows[flow_id].packet_id - packet_id; + } + task->flows[flow_id].packet_id = packet_id; +} + static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id) { if (packet_id < task->previous_packet[generator_id]) { @@ -551,6 +568,7 @@ static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts) { struct task_lat *task = (struct task_lat *)tbase; + static int max_flows_printed = 0; int rc; if (n_pkts == 0) { @@ -639,6 +657,24 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin uint8_t generator_id; uint32_t packet_id; + int32_t flow_id = -1; + if (task->flow_id_pos) { + flow_id = *(int32_t *)(hdr + task->flow_id_pos); + if (unlikely(flow_id >= (int32_t)(task->flow_count))) { + flow_id = -1; + if (!max_flows_printed) { + plog_info("Too many flows - increase flow count (only printed once)\n"); + max_flows_printed = 1; + } + } + + } + if (task->packet_id_in_flow_pos && (flow_id != -1)) { + uint32_t packet_id_in_flow; + struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos); + unique_id_get(unique_id, &generator_id, &packet_id_in_flow); + lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow); + } if (task->unique_id_pos) { struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos); unique_id_get(unique_id, &generator_id, &packet_id); @@ -650,7 +686,9 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin // Skip unexpected packet continue; } - lat_test_check_ordering(task, task->lat_test, packet_id, generator_id); + if (flow_id == -1) { + lat_test_check_ordering(task, task->lat_test, packet_id, generator_id); + } lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id); uint32_t loss = task_lat_early_loss_detect(task, packet_id, generator_id); if (loss) { @@ -779,6 +817,8 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ) task->accur_pos = targ->accur_pos; task->sig_pos = targ->sig_pos; task->sig = targ->sig; + task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos; + task->flow_id_pos = targ->flow_id_pos; task->unique_id_pos = targ->packet_id_pos; task->latency_buffer_size = targ->latency_buffer_size; @@ -860,10 +900,11 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ) } } task->loss_buffer_size = targ->loss_buffer_size; - task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id)); - PROX_PANIC(task->loss_buffer == NULL, - "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer)); - + if (task->loss_buffer_size) { + task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id)); + PROX_PANIC(task->loss_buffer == NULL, + "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer)); + } task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id)); PROX_PANIC(task->bytes_to_tsc == NULL, "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST); @@ -877,6 +918,13 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ) else task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz; } + task->flow_count = targ->flow_count; + PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n"); + if (task->flow_count) { + task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id)); + PROX_PANIC(task->flows == NULL, + "Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count); + } } static struct task_init task_init_lat = { diff --git a/VNFs/DPPD-PROX/prox_args.c b/VNFs/DPPD-PROX/prox_args.c index 274e6c96..e8c275bb 100644 --- a/VNFs/DPPD-PROX/prox_args.c +++ b/VNFs/DPPD-PROX/prox_args.c @@ -1163,6 +1163,15 @@ static int get_core_cfg(unsigned sindex, char *str, void *data) if (STR_EQ(str, "packet id pos")) { return parse_int(&targ->packet_id_pos, pkey); } + if (STR_EQ(str, "flow id pos")) { + return parse_int(&targ->flow_id_pos, pkey); + } + if (STR_EQ(str, "packet id in flow pos")) { + return parse_int(&targ->packet_id_in_flow_pos, pkey); + } + if (STR_EQ(str, "flow count")) { + return parse_int(&targ->flow_count, pkey); + } if (STR_EQ(str, "probability")) { // old - use "probability no drop" instead float probability; int rc = parse_float(&probability, pkey); diff --git a/VNFs/DPPD-PROX/task_init.h b/VNFs/DPPD-PROX/task_init.h index 82a58259..53bfaf35 100644 --- a/VNFs/DPPD-PROX/task_init.h +++ b/VNFs/DPPD-PROX/task_init.h @@ -205,6 +205,9 @@ struct task_args { struct range range[64]; char pcap_file[256]; uint32_t accur_pos; + uint32_t flow_id_pos; + uint32_t packet_id_in_flow_pos; + uint32_t flow_count; uint32_t sig_pos; uint32_t sig; uint32_t lat_pos; |