summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/handle_lat.c
diff options
context:
space:
mode:
Diffstat (limited to 'VNFs/DPPD-PROX/handle_lat.c')
-rw-r--r--VNFs/DPPD-PROX/handle_lat.c573
1 files changed, 433 insertions, 140 deletions
diff --git a/VNFs/DPPD-PROX/handle_lat.c b/VNFs/DPPD-PROX/handle_lat.c
index 0b7ad561..04a4848b 100644
--- a/VNFs/DPPD-PROX/handle_lat.c
+++ b/VNFs/DPPD-PROX/handle_lat.c
@@ -1,5 +1,5 @@
/*
-// Copyright (c) 2010-2017 Intel Corporation
+// Copyright (c) 2010-2019 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -32,12 +32,14 @@
#include "quit.h"
#include "eld.h"
#include "prox_shared.h"
+#include "prox_port_cfg.h"
-#define DEFAULT_BUCKET_SIZE 10
+#define DEFAULT_BUCKET_SIZE 11
+#define ACCURACY_BUFFER_SIZE (2 * ACCURACY_WINDOW)
struct lat_info {
uint32_t rx_packet_index;
- uint32_t tx_packet_index;
+ uint64_t tx_packet_index;
uint32_t tx_err;
uint32_t rx_err;
uint64_t rx_time;
@@ -53,28 +55,29 @@ struct lat_info {
};
struct delayed_latency_entry {
- uint32_t rx_packet_idx;
+ uint32_t rx_packet_id;
+ uint32_t tx_packet_id;
+ uint32_t packet_id;
+ uint8_t generator_id;
uint64_t pkt_rx_time;
- uint64_t pkt_tx_time;
+ uint64_t pkt_tx_time; // Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
uint64_t rx_time_err;
};
-struct delayed_latency {
- struct delayed_latency_entry entries[64];
-};
-
-static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
+static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
{
- if (delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx == rx_packet_idx)
- return &delayed_latency->entries[rx_packet_idx % 64];
+ struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
+ if (delayed_latency_entry->packet_id == packet_id)
+ return delayed_latency_entry;
else
return NULL;
}
-static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency *delayed_latency, uint32_t rx_packet_idx)
+static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
{
- delayed_latency->entries[rx_packet_idx % 64].rx_packet_idx = rx_packet_idx;
- return &delayed_latency->entries[rx_packet_idx % 64];
+ struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
+ delayed_latency_entry->packet_id = packet_id;
+ return delayed_latency_entry;
}
struct rx_pkt_meta_data {
@@ -83,12 +86,21 @@ struct rx_pkt_meta_data {
uint32_t bytes_after_in_bulk;
};
+struct loss_buffer {
+ uint32_t packet_id;
+ uint32_t n;
+};
+
+struct flows {
+ uint32_t packet_id;
+};
+
struct task_lat {
struct task_base base;
uint64_t limit;
uint64_t rx_packet_index;
uint64_t last_pkts_tsc;
- struct delayed_latency delayed_latency;
+ struct delayed_latency_entry **delayed_latency_entries;
struct lat_info *latency_buffer;
uint32_t latency_buffer_idx;
uint32_t latency_buffer_size;
@@ -103,15 +115,40 @@ struct task_lat {
struct lat_test lt[2];
struct lat_test *lat_test;
uint32_t generator_count;
+ uint16_t min_pkt_len;
struct early_loss_detect *eld;
struct rx_pkt_meta_data *rx_pkt_meta;
+ // Following fields are only used when starting or stopping, not in general runtime
+ uint64_t *prev_tx_packet_index;
+ FILE *fp_loss;
FILE *fp_rx;
FILE *fp_tx;
+ struct prox_port_cfg *port;
+ uint64_t *bytes_to_tsc;
+ uint64_t *previous_packet;
+ uint32_t loss_buffer_size;
+ struct loss_buffer *loss_buffer;
+ uint32_t loss_id;
+ uint32_t packet_id_in_flow_pos;
+ int32_t flow_id_pos;
+ uint32_t flow_count;
+ struct flows *flows;
};
+/* This function calculate the difference between rx and tx_time
+ * Both values are uint32_t (see handle_lat_bulk)
+ * rx time should be higher than tx_time...except every UINT32_MAX
+ * cycles, when rx_time overflows.
+ * As the return value is also uint32_t, returning (rx_time - tx_time)
+ * is also fine when it overflows.
+ */
+static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
+{
+ return rx_time - tx_time;
+}
-static uint32_t abs_diff(uint32_t a, uint32_t b)
+uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
{
- return a < b? UINT32_MAX - (b - a - 1) : a - b;
+ return task->lat_test->bucket_size;
}
struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
@@ -140,34 +177,81 @@ static int compare_tx_time(const void *val1, const void *val2)
const struct lat_info *ptr1 = val1;
const struct lat_info *ptr2 = val2;
- return ptr1->tx_time - ptr2->tx_time;
+ return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
}
-static int compare_queue_id(const void *val1, const void *val2)
+static int compare_tx_packet_index(const void *val1, const void *val2)
{
- return compare_tx_time(val1, val2);
+ const struct lat_info *ptr1 = val1;
+ const struct lat_info *ptr2 = val2;
+
+ return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
}
-static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
+static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
{
- uint32_t id, time, old_id = 0, old_time = 0, n_overflow = 0;
+ uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
+ uint32_t small = UINT32_MAX >> 1;
- for (uint32_t i = 0; i < count; i++) {
- id = lat->port_queue_id;
- time = lat->tx_time;
- if (id == old_id) {
- // Same queue id as previous entry; time should always increase
- if (time < old_time) {
+ lat++;
+
+ /* Buffer is sorted so far by RX time.
+ * We might have packets being reordered by SUT.
+ * => consider small differences as re-order and big ones as overflow of tx_packet_index.
+ * Note that:
+ * - overflow only happens if receiving and storing 4 billions packets...
+ * - a absolute difference of less than 2 billion packets is not considered as an overflow
+ */
+ for (uint32_t i = 1; i < count; i++) {
+ tx_packet_index = lat->tx_packet_index;
+ if (tx_packet_index > old_tx_packet_index) {
+ if (tx_packet_index - old_tx_packet_index < small) {
+ // The diff is small => increasing index count
+ } else {
+ // The diff is big => it is more likely that the previous packet was overflow
+ n_overflow--;
+ }
+ } else {
+ if (old_tx_packet_index - tx_packet_index < small) {
+ // The diff is small => packet reorder
+ } else {
+ // The diff is big => it is more likely that this is an overflow
n_overflow++;
}
- lat->tx_time += UINT32_MAX * n_overflow;
- old_time = time;
+ }
+ lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
+ old_tx_packet_index = tx_packet_index;
+ lat++;
+ }
+}
+
+static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
+{
+ uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
+ uint32_t small = UINT32_MAX >> 1;
+ lat++;
+
+ /*
+ * Same algorithm as above, but with time.
+ * Note that:
+ * - overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
+ * - a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
+ * => algorithm does not work if receiving less than 1 packet every 2 seconds
+ */
+ for (uint32_t i = 1; i < count; i++) {
+ tx_time = lat->tx_time;
+ if (tx_time > old_tx_time) {
+ if (tx_time - old_tx_time > small) {
+ n_overflow--;
+ }
} else {
- // Different queue_id, time starts again at 0
- old_id = id;
- old_time = 0;
- n_overflow = 0;
+ if (old_tx_time - tx_time > small) {
+ n_overflow++;
+ }
}
+ lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
+ old_tx_time = tx_time;
+ lat++;
}
}
@@ -203,7 +287,7 @@ static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
{
- uint64_t lat = abs_diff(lat_info->rx_time, lat_info->tx_time);
+ uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);
return lat << LATENCY_ACCURACY;
}
@@ -220,18 +304,18 @@ static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
{
- return ((uint64_t)lat_info) << LATENCY_ACCURACY;
+ return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
}
static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
{
- return ((uint64_t)lat_info) << LATENCY_ACCURACY;
+ return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
}
static void lat_write_latency_to_file(struct task_lat *task)
{
uint64_t min_tsc;
- uint32_t n_loss;
+ uint64_t n_loss;
min_tsc = lat_latency_buffer_get_min_tsc(task);
@@ -244,7 +328,7 @@ static void lat_write_latency_to_file(struct task_lat *task)
uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
- fprintf(task->fp_rx, "%u%d;%d;%ld;%lu;%lu\n",
+ fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
lat_info->rx_packet_index,
lat_info->port_queue_id,
lat_info->tx_packet_index,
@@ -254,19 +338,27 @@ static void lat_write_latency_to_file(struct task_lat *task)
}
// To detect dropped packets, we need to sort them based on TX
- plogx_info("Sorting packets based on queue_id\n");
- qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_queue_id);
- plogx_info("Adapting tx_time\n");
- fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
- plogx_info("Sorting packets based on tx_time\n");
- qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
- plogx_info("Sorted packets based on tx_time\n");
+ if (task->unique_id_pos) {
+ plogx_info("Adapting tx_packet_index\n");
+ fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
+ plogx_info("Sorting packets based on tx_packet_index\n");
+ qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
+ plogx_info("Sorted packets based on packet_index\n");
+ } else {
+ plogx_info("Adapting tx_time\n");
+ fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
+ plogx_info("Sorting packets based on tx_time\n");
+ qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
+ plogx_info("Sorted packets based on packet_time\n");
+ }
// A packet is marked as dropped if 2 packets received from the same queue are not consecutive
fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");
- uint32_t prev_tx_packet_index = -1;
+ for (uint32_t i = 0; i < task->generator_count;i++)
+ task->prev_tx_packet_index[i] = -1;
+
for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
struct lat_info *lat_info = &task->latency_buffer[i];
uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
@@ -275,19 +367,25 @@ static void lat_write_latency_to_file(struct task_lat *task)
uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);
- /* Packet n + 64 delivers the TX error for packet n,
- hence the last 64 packets do no have TX error. */
- if (i + 64 >= task->latency_buffer_idx) {
+ /* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
+ hence the last ACCURACY_WINDOW packets do no have TX error. */
+ if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
tx_err_tsc = 0;
}
+
+ if (lat_info->port_queue_id >= task->generator_count) {
+ plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
+ lat_info->port_queue_id, lat_info->tx_packet_index);
+ continue;
+ }
// Log dropped packet
- n_loss = lat_info->tx_packet_index - prev_tx_packet_index - 1;
+ n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
if (n_loss)
- fprintf(task->fp_tx, "===> %d;%d;0;0;0;0; lost %d packets <===\n",
+ fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
lat_info->port_queue_id,
lat_info->tx_packet_index - n_loss, n_loss);
// Log next packet
- fprintf(task->fp_tx, "%d;%d;%u;%lu;%lu;%lu;%lu;%lu\n",
+ fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
lat_info->port_queue_id,
lat_info->tx_packet_index,
lat_info->rx_packet_index,
@@ -297,7 +395,7 @@ static void lat_write_latency_to_file(struct task_lat *task)
tsc_to_nsec(tx_err_tsc),
tsc_to_nsec(rx_err_tsc));
#ifdef LAT_DEBUG
- fprintf(task->fp_tx, ";%d from %d;%lu;%lu;%lu",
+ fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
lat_info->id_in_bulk,
lat_info->bulk_size,
tsc_to_nsec(lat_info->begin - min_tsc),
@@ -305,7 +403,7 @@ static void lat_write_latency_to_file(struct task_lat *task)
tsc_to_nsec(lat_info->after - min_tsc));
#endif
fprintf(task->fp_tx, "\n");
- prev_tx_packet_index = lat_info->tx_packet_index;
+ task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
}
fflush(task->fp_rx);
fflush(task->fp_tx);
@@ -319,7 +417,14 @@ static void lat_stop(struct task_base *tbase)
if (task->unique_id_pos) {
task_lat_count_remaining_lost_packets(task);
task_lat_reset_eld(task);
+ memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
+ }
+ if (task->loss_id && task->fp_loss) {
+ for (uint i = 0; i < task->loss_id; i++) {
+ fprintf(task->fp_loss, "packet %d: %d\n", task->loss_buffer[i].packet_id, task->loss_buffer[i].n);
+ }
}
+ task->lat_test->lost_packets = 0;
if (task->latency_buffer)
lat_write_latency_to_file(task);
}
@@ -337,21 +442,16 @@ static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_i
}
#endif
-static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, struct unique_id *unique_id, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err)
+static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
{
struct lat_info *lat_info;
- uint8_t generator_id = 0;
- uint32_t packet_index = 0;
-
- if (unique_id)
- unique_id_get(unique_id, &generator_id, &packet_index);
/* If unique_id_pos is specified then latency is stored per
packet being sent. Lost packets are detected runtime, and
latency stored for those packets will be 0 */
lat_info = &task->latency_buffer[task->latency_buffer_idx++];
- lat_info->rx_packet_index = task->latency_buffer_idx - 1;
- lat_info->tx_packet_index = packet_index;
+ lat_info->rx_packet_index = rx_packet_index;
+ lat_info->tx_packet_index = packet_id;
lat_info->port_queue_id = generator_id;
lat_info->rx_time = rx_time;
lat_info->tx_time = tx_time;
@@ -359,25 +459,30 @@ static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_ind
lat_info->tx_err = tx_err;
}
-static uint32_t task_lat_early_loss_detect(struct task_lat *task, struct unique_id *unique_id)
+static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
{
- struct early_loss_detect *eld;
- uint8_t generator_id;
- uint32_t packet_index;
-
- unique_id_get(unique_id, &generator_id, &packet_index);
-
- if (generator_id >= task->generator_count)
- return 0;
+ struct early_loss_detect *eld = &task->eld[generator_id];
+ return early_loss_detect_add(eld, packet_id);
+}
- eld = &task->eld[generator_id];
+static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
+{
+ struct early_loss_detect *eld = &task->eld[generator_id];
+ uint32_t old_queue_id, queue_pos;
- return early_loss_detect_add(eld, packet_index);
+ queue_pos = packet_id & PACKET_QUEUE_MASK;
+ old_queue_id = eld->entries[queue_pos];
+ if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
+ lat_test->duplicate++;
}
-static uint64_t tsc_extrapolate_backward(uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
+static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
{
- uint64_t tsc = tsc_from - rte_get_tsc_hz()*bytes/1250000000;
+#ifdef NO_LAT_EXTRAPOLATION
+ uint64_t tsc = tsc_from;
+#else
+ uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
+#endif
if (likely(tsc > tsc_minimum))
return tsc;
else
@@ -389,10 +494,28 @@ static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);
- bucket_id = bucket_id < bucket_count? bucket_id : bucket_count;
+ bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
lat_test->buckets[bucket_id]++;
}
+static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id)
+{
+ if (packet_id < task->flows[flow_id].packet_id) {
+ lat_test->mis_ordered++;
+ lat_test->extent += task->flows[flow_id].packet_id - packet_id;
+ }
+ task->flows[flow_id].packet_id = packet_id;
+}
+
+static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
+{
+ if (packet_id < task->previous_packet[generator_id]) {
+ lat_test->mis_ordered++;
+ lat_test->extent += task->previous_packet[generator_id] - packet_id;
+ }
+ task->previous_packet[generator_id] = packet_id;
+}
+
static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
{
lat_test->lost_packets += lost_packets;
@@ -400,8 +523,6 @@ static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
{
- lat_test->tot_all_pkts++;
-
if (error > lat_test->accuracy_limit_tsc)
return;
lat_test->tot_pkts++;
@@ -433,25 +554,22 @@ static int task_lat_can_store_latency(struct task_lat *task)
return task->latency_buffer_idx < task->latency_buffer_size;
}
-static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, struct unique_id *unique_id)
+static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
{
- if (tx_time == 0)
- return;
- uint32_t lat_tsc = abs_diff(rx_time, tx_time) << LATENCY_ACCURACY;
+ uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;
lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);
if (task_lat_can_store_latency(task)) {
- task_lat_store_lat_buf(task, rx_packet_index, unique_id, rx_time, tx_time, rx_error, tx_error);
+ task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
}
}
static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
{
struct task_lat *task = (struct task_lat *)tbase;
- uint64_t rx_time_err;
-
- uint32_t pkt_rx_time, pkt_tx_time;
+ static int max_flows_printed = 0;
+ int rc;
if (n_pkts == 0) {
task->begin = tbase->aux->tsc_rx.before;
@@ -460,8 +578,12 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin
task_lat_update_lat_test(task);
- const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
- uint32_t tx_time_err = 0;
+ // Remember those packets with bad length or bad signature
+ uint32_t non_dp_count = 0;
+ uint64_t pkt_bad_len_sig = 0;
+#define BIT64_SET(a64, bit) a64 |= (((uint64_t)1) << (bit & 63))
+#define BIT64_CLR(a64, bit) a64 &= ~(((uint64_t)1) << (bit & 63))
+#define BIT64_TEST(a64, bit) a64 & (((uint64_t)1) << (bit & 63))
/* Go once through all received packets and read them. If
packet has just been modified by another core, the cost of
@@ -469,19 +591,31 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin
for (uint16_t j = 0; j < n_pkts; ++j) {
struct rte_mbuf *mbuf = mbufs[j];
task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);
- }
- for (uint16_t j = 0; j < n_pkts; ++j) {
+
+ // Remember those packets which are too short to hold the values that we expect
+ if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
+ BIT64_SET(pkt_bad_len_sig, j);
+ non_dp_count++;
+ } else
+ BIT64_CLR(pkt_bad_len_sig, j);
}
- if (task->sig) {
+ if (task->sig_pos) {
for (uint16_t j = 0; j < n_pkts; ++j) {
- if (*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig)
+ if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
+ continue;
+ // Remember those packets with bad signature
+ if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
- else
- task->rx_pkt_meta[j].pkt_tx_time = 0;
+ else {
+ BIT64_SET(pkt_bad_len_sig, j);
+ non_dp_count++;
+ }
}
} else {
for (uint16_t j = 0; j < n_pkts; ++j) {
+ if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
+ continue;
task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
}
}
@@ -495,71 +629,125 @@ static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uin
bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
}
- pkt_rx_time = tsc_extrapolate_backward(rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
- if ((uint32_t)((task->begin >> LATENCY_ACCURACY)) > pkt_rx_time) {
+ const uint64_t rx_tsc = tbase->aux->tsc_rx.after;
+
+ uint64_t rx_time_err;
+ uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
+ if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
// Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
- rx_time_err = pkt_rx_time - (uint32_t)(task->last_pkts_tsc >> LATENCY_ACCURACY);
+ rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
} else {
- rx_time_err = pkt_rx_time - (uint32_t)(task->begin >> LATENCY_ACCURACY);
+ rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
}
- struct unique_id *unique_id = NULL;
- struct delayed_latency_entry *delayed_latency_entry;
-
+ TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
for (uint16_t j = 0; j < n_pkts; ++j) {
+ // Used to display % of packets within accuracy limit vs. total number of packets (used_col)
+ task->lat_test->tot_all_pkts++;
+
+ // Skip those packets with bad length or bad signature
+ if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
+ continue;
+
struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
uint8_t *hdr = rx_pkt_meta->hdr;
- pkt_rx_time = tsc_extrapolate_backward(rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
- pkt_tx_time = rx_pkt_meta->pkt_tx_time;
+ uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
+ uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;
+
+ uint8_t generator_id;
+ uint32_t packet_id;
+ int32_t flow_id = -1;
+ if (task->flow_id_pos) {
+ flow_id = *(int32_t *)(hdr + task->flow_id_pos);
+ if (unlikely(flow_id >= (int32_t)(task->flow_count))) {
+ flow_id = -1;
+ if (!max_flows_printed) {
+ plog_info("Too many flows - increase flow count (only printed once)\n");
+ max_flows_printed = 1;
+ }
+ }
+ }
+ if (task->packet_id_in_flow_pos && (flow_id != -1)) {
+ uint32_t packet_id_in_flow;
+ struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos);
+ unique_id_get(unique_id, &generator_id, &packet_id_in_flow);
+ lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow);
+ }
if (task->unique_id_pos) {
- unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
-
- uint32_t n_loss = task_lat_early_loss_detect(task, unique_id);
- lat_test_add_lost(task->lat_test, n_loss);
+ struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
+ unique_id_get(unique_id, &generator_id, &packet_id);
+
+ if (unlikely(generator_id >= task->generator_count)) {
+ /* No need to remember unexpected packet at this stage
+ BIT64_SET(pkt_bad_len_sig, j);
+ */
+ // Skip unexpected packet
+ continue;
+ }
+ if (flow_id == -1) {
+ lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
+ }
+ lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
+ uint32_t loss = task_lat_early_loss_detect(task, packet_id, generator_id);
+ if (loss) {
+ lat_test_add_lost(task->lat_test, loss);
+ if (task->loss_id < task->loss_buffer_size) {
+ task->loss_buffer[task->loss_id].packet_id = packet_id;
+ task->loss_buffer[task->loss_id++].n = loss;
+ }
+ }
+ } else {
+ generator_id = 0;
+ packet_id = task->rx_packet_index;
}
/* If accuracy is enabled, latency is reported with a
- delay of 64 packets since the generator puts the
- accuracy for packet N into packet N + 64. The delay
+ delay of ACCURACY_WINDOW packets since the generator puts the
+ accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
ensures that all reported latencies have both rx
and tx error. */
if (task->accur_pos) {
- tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
+ uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);
- delayed_latency_entry = delayed_latency_get(&task->delayed_latency, task->rx_packet_index - 64);
+ struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);
if (delayed_latency_entry) {
task_lat_store_lat(task,
- task->rx_packet_index,
+ delayed_latency_entry->rx_packet_id,
delayed_latency_entry->pkt_rx_time,
delayed_latency_entry->pkt_tx_time,
delayed_latency_entry->rx_time_err,
tx_time_err,
- unique_id);
+ delayed_latency_entry->tx_packet_id,
+ delayed_latency_entry->generator_id);
}
- delayed_latency_entry = delayed_latency_create(&task->delayed_latency, task->rx_packet_index);
+ delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
delayed_latency_entry->pkt_rx_time = pkt_rx_time;
delayed_latency_entry->pkt_tx_time = pkt_tx_time;
delayed_latency_entry->rx_time_err = rx_time_err;
+ delayed_latency_entry->rx_packet_id = task->rx_packet_index;
+ delayed_latency_entry->tx_packet_id = packet_id;
+ delayed_latency_entry->generator_id = generator_id;
} else {
- task_lat_store_lat(task,
- task->rx_packet_index,
- pkt_rx_time,
- pkt_tx_time,
- 0,
- 0,
- unique_id);
+ task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
}
+
+ // Bad/unexpected packets do not need to be indexed
task->rx_packet_index++;
}
- int ret;
- ret = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
- task->begin = tbase->aux->tsc_rx.before;
+
+ if (n_pkts < MAX_PKT_BURST)
+ task->begin = tbase->aux->tsc_rx.before;
task->last_pkts_tsc = tbase->aux->tsc_rx.after;
- return ret;
+
+ rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
+ // non_dp_count should not be drop-handled, as there are all by definition considered as not handled
+ // RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
+ TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
+ return rc;
}
static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
@@ -574,29 +762,39 @@ static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id
latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;
task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
- PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %ld kbytes for %s\n", latency_buffer_mem_size / 1024, name);
+ PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);
- sprintf(name, "latency.rx_%d.txt", core_id);
+ sprintf(name, "latency.rx_%u.txt", core_id);
task->fp_rx = fopen(name, "w+");
PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);
- sprintf(name, "latency.tx_%d.txt", core_id);
+ sprintf(name, "latency.tx_%u.txt", core_id);
task->fp_tx = fopen(name, "w+");
PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);
+
+ task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
+ PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
}
-static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
+static void task_init_generator_count(struct task_lat *task)
{
uint8_t *generator_count = prox_sh_find_system("generator_count");
- size_t eld_mem_size;
- if (generator_count == NULL)
- task->generator_count = 0;
- else
+ if (generator_count == NULL) {
+ task->generator_count = 1;
+ plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
+ } else
task->generator_count = *generator_count;
+ plog_info("\t\tLatency using %u generators\n", task->generator_count);
+}
+
+static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
+{
+ size_t eld_mem_size;
eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
task->eld = prox_zmalloc(eld_mem_size, socket_id);
+ PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
}
void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
@@ -604,6 +802,12 @@ void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_
task->limit = nsec_to_tsc(accuracy_limit_nsec);
}
+static void lat_start(struct task_base *tbase)
+{
+ struct task_lat *task = (struct task_lat *)tbase;
+
+}
+
static void init_task_lat(struct task_base *tbase, struct task_args *targ)
{
struct task_lat *task = (struct task_lat *)tbase;
@@ -611,36 +815,125 @@ static void init_task_lat(struct task_base *tbase, struct task_args *targ)
task->lat_pos = targ->lat_pos;
task->accur_pos = targ->accur_pos;
+ task->sig_pos = targ->sig_pos;
+ task->sig = targ->sig;
+ task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
+ task->flow_id_pos = targ->flow_id_pos;
+
task->unique_id_pos = targ->packet_id_pos;
task->latency_buffer_size = targ->latency_buffer_size;
+ PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
+ uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
+ if (task->unique_id_pos && (
+ min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
+ min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
+ if (task->accur_pos && (
+ min_pkt_len < task->accur_pos + sizeof(uint32_t)))
+ min_pkt_len = task->accur_pos + sizeof(uint32_t);
+ if (task->sig_pos && (
+ min_pkt_len < task->sig_pos + sizeof(uint32_t)))
+ min_pkt_len = task->sig_pos + sizeof(uint32_t);
+ task->min_pkt_len = min_pkt_len;
+
+ task_init_generator_count(task);
+
if (task->latency_buffer_size) {
init_task_lat_latency_buffer(task, targ->lconf->id);
}
- if (targ->bucket_size < LATENCY_ACCURACY) {
+ if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
targ->bucket_size = DEFAULT_BUCKET_SIZE;
}
- task->lt[0].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
- task->lt[1].bucket_size = targ->bucket_size - LATENCY_ACCURACY;
+ if (task->accur_pos) {
+ task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
+ PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
+ for (uint i = 0; i < task->generator_count; i++) {
+ task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
+ PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
+ }
+ if (task->unique_id_pos == 0) {
+ /* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
+ * We can only retrieve the good packet if a packet id is written to it.
+ * Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
+ * packets are not re-ordered. If packets are re-ordered, then the matching between
+ * the TX accuracy and the latency is wrong.
+ */
+ plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
+ }
+ }
+
+ task->lt[0].min_lat = -1;
+ task->lt[1].min_lat = -1;
+ task->lt[0].bucket_size = targ->bucket_size;
+ task->lt[1].bucket_size = targ->bucket_size;
if (task->unique_id_pos) {
task_lat_init_eld(task, socket_id);
task_lat_reset_eld(task);
+ task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
+ PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
}
task->lat_test = &task->lt[task->using_lt];
task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
- task->rx_pkt_meta = prox_zmalloc(MAX_RX_PKT_ALL * sizeof(*task->rx_pkt_meta), socket_id);
+ task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");
+
+ uint32_t max_frame_size = MAX_PKT_SIZE;
+ uint64_t bytes_per_hz = UINT64_MAX;
+ if (targ->nb_rxports) {
+ struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
+ max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;
+
+ // port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
+ // It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
+ if (port->max_link_speed != UINT32_MAX) {
+ bytes_per_hz = port->max_link_speed * 125000L;
+ plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
+ (uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
+ }
+ }
+ task->loss_buffer_size = targ->loss_buffer_size;
+ if (task->loss_buffer_size) {
+ char name[256];
+ sprintf(name, "loss_%u.txt", targ->lconf->id);
+ task->fp_loss = fopen(name, "w+");
+ PROX_PANIC(task->fp_loss == NULL, "Failed to open %s\n", name);
+
+ task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id));
+ PROX_PANIC(task->loss_buffer == NULL,
+ "Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer));
+ }
+ task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
+ PROX_PANIC(task->bytes_to_tsc == NULL,
+ "Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST);
+
+ // There are cases where hz estimate might be slighly over-estimated
+ // This results in too much extrapolation
+ // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
+ for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
+ if (bytes_per_hz == UINT64_MAX)
+ task->bytes_to_tsc[i] = 0;
+ else
+ task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
+ }
+ task->flow_count = targ->flow_count;
+ PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n");
+ if (task->flow_count) {
+ task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id));
+ PROX_PANIC(task->flows == NULL,
+ "Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count);
+ }
}
static struct task_init task_init_lat = {
.mode_str = "lat",
.init = init_task_lat,
.handle = handle_lat_bulk,
+ .start = lat_start,
.stop = lat_stop,
- .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_RX_ALL | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
+ .flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
.size = sizeof(struct task_lat)
};