summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/handle_lat.c
blob: d6943070a345ab7f49e97ef1cc8d79a81f232828 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

@media only all and (prefers-color-scheme: dark) {
.highlight .hll { background-color: #49483e }
.highlight .c { color: #75715e } /* Comment */
.highlight .err { color: #960050; background-color: #1e0010 } /* Error */
.highlight .k { color: #66d9ef } /* Keyword */
.highlight .l { color: #ae81ff } /* Literal */
.highlight .n { color: #f8f8f2 } /* Name */
.highlight .o { color: #f92672 } /* Operator */
.highlight .p { color: #f8f8f2 } /* Punctuation */
.highlight .ch { color: #75715e } /* Comment.Hashbang */
.highlight .cm { color: #75715e } /* Comment.Multiline */
.highlight .cp { color: #75715e } /* Comment.Preproc */
.highlight .cpf { color: #75715e } /* Comment.PreprocFile */
.highlight .c1 { color: #75715e } /* Comment.Single */
.highlight .cs { color: #75715e } /* Comment.Special */
.highlight .gd { color: #f92672 } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gi { color: #a6e22e } /* Generic.Inserted */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #75715e } /* Generic.Subheading */
.highlight .kc { color: #66d9ef } /* Keyword.Constant */
.highlight .kd { color: #66d9ef } /* Keyword.Declaration */
.highlight .kn { color: #f92672 } /* Keyword.Namespace */
.highlight .kp { color: #66d9ef } /* Keyword.Pseudo */
.highlight .kr { color: #66d9ef } /* Keyword.Reserved */
.highlight .kt { color: #66d9ef } /* Keyword.Type */
.highlight .ld { color: #e6db74 } /* Literal.Date */
.highlight .m { color: #ae81ff } /* Literal.Number */
.highlight .s { color: #e6db74 } /* Literal.String */
.highlight .na { color: #a6e22e } /* Name.Attribute */
.highlight .nb { color: #f8f8f2 } /* Name.Builtin */
.highlight .nc { color: #a6e22e } /* Name.Class */
.highlight .no { color: #66d9ef } /* Name.Constant */
.highlight .nd { color: #a6e22e } /* Name.Decorator */
.highlight .ni { color: #f8f8f2 } /* Name.Entity */
.highlight .ne { color: #a6e22e } /* Name.Exception */
.highlight .nf { color: #a6e22e } /* Name.Function */
.highlight .nl { color: #f8f8f2 } /* Name.Label */
.highlight .nn { color: #f8f8f2 } /* Name.Namespace */
.highlight .nx { color: #a6e22e } /* Name.Other */
.highlight .py { color: #f8f8f2 } /* Name.Property */
.highlight .nt { color: #f92672 } /* Name.Tag */
.highlight .nv { color: #f8f8f2 } /* Name.Variable */
.highlight .ow { color: #f92672 } /* Operator.Word */
.highlight .w { color: #f8f8f2 } /* Text.Whitespace */
.highlight .mb { color: #ae81ff } /* Literal.Number.Bin */
.highlight .mf { color: #ae81ff } /* Literal.Number.Float */
.highlight .mh { color: #ae81ff } /* Literal.Number.Hex */
.highlight .mi { color: #ae81ff } /* Literal.Number.Integer */
.highlight .mo { color: #ae81ff } /* Literal.Number.Oct */
.highlight .sa { color: #e6db74 } /* Literal.String.Affix */
.highlight .sb { color: #e6db74 } /* Literal.String.Backtick */
.highlight .sc { color: #e6db74 } /* Literal.String.Char */
.highlight .dl { color: #e6db74 } /* Literal.String.Delimiter */
.highlight .sd { color: #e6db74 } /* Literal.String.Doc */
.highlight .s2 { color: #e6db74 } /* Literal.String.Double */
.highlight .se { color: #ae81ff } /* Literal.String.Escape */
.highlight .sh { color: #e6db74 } /* Literal.String.Heredoc */
.highlight .si { color: #e6db74 } /* Literal.String.Interpol */
.highlight .sx { color: #e6db74 } /* Literal.String.Other */
.highlight .sr { color: #e6db74 } /* Literal.String.Regex */
.highlight .s1 { color: #e6db74 } /* Literal.String.Single */
.highlight .ss { color: #e6db74 } /* Literal.String.Symbol */
.highlight .bp { color: #f8f8f2 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #a6e22e } /* Name.Function.Magic */
.highlight .vc { color: #f8f8f2 } /* Name.Variable.Class */
.highlight .vg { color: #f8f8f2 } /* Name.Variable.Global */
.highlight .vi { color: #f8f8f2 } /* Name.Variable.Instance */
.highlight .vm { color: #f8f8f2 } /* Name.Variable.Magic */
.highlight .il { color: #ae81ff } /* Literal.Number.Integer.Long */
}
@media (prefers-color-scheme: light) {
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */
.highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */
.highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */
.highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */
.highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */
.highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */
.highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */
.highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */
.highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */
.highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */
.highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */
.highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */
.highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */
.highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */
.highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */
.highlight .vc { color: #336699 } /* Name.Variable.Class */
.highlight .vg { color: #dd7700 } /* Name.Variable.Global */
.highlight .vi { color: #3333bb } /* Name.Variable.Instance */
.highlight .vm { color: #336699 } /* Name.Variable.Magic */
.highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
}
#include <linux/module.h>

#include "notifier-error-inject.h"

static int debugfs_errno_set(void *data, u64 val)
{
	*(int *)data = clamp_t(int, val, -MAX_ERRNO, 0);
	return 0;
}

static int debugfs_errno_get(void *data, u64 *val)
{
	*val = *(int *)data;
	return 0;
}

DEFINE_SIMPLE_ATTRIBUTE(fops_errno, debugfs_errno_get, debugfs_errno_set,
			"%lld\n");

static struct dentry *debugfs_create_errno(const char *name, umode_t mode,
				struct dentry *parent, int *value)
{
	return debugfs_create_file(name, mode, parent, value, &fops_errno);
}

static int notifier_err_inject_callback(struct notifier_block *nb,
				unsigned long val, void *p)
{
	int err = 0;
	struct notifier_err_inject *err_inject =
		container_of(nb, struct notifier_err_inject, nb);
	struct notifier_err_inject_action *action;

	for (action = err_inject->actions; action->name; action++) {
		if (action->val == val) {
			err = action->error;
			break;
		}
	}
	if (err)
		pr_info("Injecting error (%d) to %s\n", err, action->name);

	return notifier_from_errno(err);
}

struct dentry *notifier_err_inject_dir;
EXPORT_SYMBOL_GPL(notifier_err_inject_dir);

struct dentry *notifier_err_inject_init(const char *name, struct dentry *parent,
			struct notifier_err_inject *err_inject, int priority)
{
	struct notifier_err_inject_action *action;
	umode_t mode = S_IFREG | S_IRUSR | S_IWUSR;
	struct dentry *dir;
	struct dentry *actions_dir;

	err_inject->nb.notifier_call = notifier_err_inject_callback;
	err_inject->nb.priority = priority;

	dir = debugfs_create_dir(name, parent);
	if (!dir)
		return ERR_PTR(-ENOMEM);

	actions_dir = debugfs_create_dir("actions", dir);
	if (!actions_dir)
		goto fail;

	for (action = err_inject->actions; action->name; action++) {
		struct dentry *action_dir;

		action_dir = debugfs_create_dir(action->name, actions_dir);
		if (!action_dir)
			goto fail;

		/*
		 * Create debugfs r/w file containing action->error. If
		 * notifier call chain is called with action->val, it will
		 * fail with the error code
		 */
		if (!debugfs_create_errno("error", mode, action_dir,
					&action->error))
			goto fail;
	}
	return dir;
fail:
	debugfs_remove_recursive(dir);
	return ERR_PTR(-ENOMEM);
}
EXPORT_SYMBOL_GPL(notifier_err_inject_init);

static int __init err_inject_init(void)
{
	notifier_err_inject_dir =
		debugfs_create_dir("notifier-error-inject", NULL);

	if (!notifier_err_inject_dir)
		return -ENOMEM;

	return 0;
}

static void __exit err_inject_exit(void)
{
	debugfs_remove_recursive(notifier_err_inject_dir);
}

module_init(err_inject_init);
module_exit(err_inject_exit);

MODULE_DESCRIPTION("Notifier error injection module");
MODULE_LICENSE("GPL");
MODULE_AUTHOR("Akinobu Mita <akinobu.mita@gmail.com>");
18'>818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
/*
// Copyright (c) 2010-2019 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
*/

//#define LAT_DEBUG

#include <rte_cycles.h>
#include <stdio.h>
#include <math.h>

#include "handle_gen.h"
#include "prox_malloc.h"
#include "mbuf_utils.h"
#include "handle_lat.h"
#include "log.h"
#include "task_init.h"
#include "task_base.h"
#include "stats.h"
#include "lconf.h"
#include "quit.h"
#include "eld.h"
#include "prox_shared.h"
#include "prox_port_cfg.h"

#define DEFAULT_BUCKET_SIZE	11
#define ACCURACY_BUFFER_SIZE	(2 * ACCURACY_WINDOW)

struct lat_info {
	uint32_t rx_packet_index;
	uint64_t tx_packet_index;
	uint32_t tx_err;
	uint32_t rx_err;
	uint64_t rx_time;
	uint64_t tx_time;
	uint16_t port_queue_id;
#ifdef LAT_DEBUG
	uint16_t id_in_bulk;
	uint16_t bulk_size;
	uint64_t begin;
	uint64_t after;
	uint64_t before;
#endif
};

struct delayed_latency_entry {
	uint32_t rx_packet_id;
	uint32_t tx_packet_id;
	uint32_t packet_id;
	uint8_t generator_id;
	uint64_t pkt_rx_time;
	uint64_t pkt_tx_time;	// Time written into packets by gen. Unit is TSC >> LATENCY_ACCURACY
	uint64_t rx_time_err;
};

static struct delayed_latency_entry *delayed_latency_get(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
{
	struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
	if (delayed_latency_entry->packet_id == packet_id)
		return delayed_latency_entry;
	else
		return NULL;
}

static struct delayed_latency_entry *delayed_latency_create(struct delayed_latency_entry **delayed_latency_entries, uint8_t generator_id, uint32_t packet_id)
{
	struct delayed_latency_entry *delayed_latency_entry = &delayed_latency_entries[generator_id][packet_id % ACCURACY_BUFFER_SIZE];
	delayed_latency_entry->packet_id = packet_id;
	return delayed_latency_entry;
}

struct rx_pkt_meta_data {
	uint8_t  *hdr;
	uint32_t pkt_tx_time;
	uint32_t bytes_after_in_bulk;
};

struct loss_buffer {
	uint32_t packet_id;
	uint32_t n;
};

struct flows {
	uint32_t packet_id;
};

struct task_lat {
	struct task_base base;
	uint64_t limit;
	uint64_t rx_packet_index;
	uint64_t last_pkts_tsc;
	struct delayed_latency_entry **delayed_latency_entries;
	struct lat_info *latency_buffer;
	uint32_t latency_buffer_idx;
	uint32_t latency_buffer_size;
	uint64_t begin;
	uint16_t lat_pos;
	uint16_t unique_id_pos;
	uint16_t accur_pos;
	uint16_t sig_pos;
	uint32_t sig;
	volatile uint16_t use_lt; /* which lt to use, */
	volatile uint16_t using_lt; /* 0 or 1 depending on which of the 2 measurements are used */
	struct lat_test lt[2];
	struct lat_test *lat_test;
	uint32_t generator_count;
	uint16_t min_pkt_len;
	struct early_loss_detect *eld;
	struct rx_pkt_meta_data *rx_pkt_meta;
	// Following fields are only used when starting or stopping, not in general runtime
	uint64_t *prev_tx_packet_index;
	FILE *fp_loss;
	FILE *fp_rx;
	FILE *fp_tx;
	struct prox_port_cfg *port;
	uint64_t *bytes_to_tsc;
	uint64_t *previous_packet;
	uint32_t loss_buffer_size;
	struct loss_buffer *loss_buffer;
	uint32_t loss_id;
	uint32_t packet_id_in_flow_pos;
	int32_t flow_id_pos;
	uint32_t flow_count;
	struct flows *flows;
};
/* This function calculate the difference between rx and tx_time
 * Both values are uint32_t (see handle_lat_bulk)
 * rx time should be higher than tx_time...except every UINT32_MAX
 * cycles, when rx_time overflows.
 * As the return value is also uint32_t, returning (rx_time - tx_time)
 * is also fine when it overflows.
 */
static uint32_t diff_time(uint32_t rx_time, uint32_t tx_time)
{
	return rx_time - tx_time;
}

uint32_t task_lat_get_latency_bucket_size(struct task_lat *task)
{
	return task->lat_test->bucket_size;
}

struct lat_test *task_lat_get_latency_meassurement(struct task_lat *task)
{
	if (task->use_lt == task->using_lt)
		return &task->lt[!task->using_lt];
	return NULL;
}

void task_lat_use_other_latency_meassurement(struct task_lat *task)
{
	task->use_lt = !task->using_lt;
}

static void task_lat_update_lat_test(struct task_lat *task)
{
	if (task->use_lt != task->using_lt) {
		task->using_lt = task->use_lt;
		task->lat_test = &task->lt[task->using_lt];
		task->lat_test->accuracy_limit_tsc = task->limit;
	}
}

static int compare_tx_time(const void *val1, const void *val2)
{
	const struct lat_info *ptr1 = val1;
	const struct lat_info *ptr2 = val2;

	return ptr1->tx_time > ptr2->tx_time ? 1 : -1;
}

static int compare_tx_packet_index(const void *val1, const void *val2)
{
	const struct lat_info *ptr1 = val1;
	const struct lat_info *ptr2 = val2;

	return ptr1->tx_packet_index > ptr2->tx_packet_index ? 1 : -1;
}

static void fix_latency_buffer_tx_packet_index(struct lat_info *lat, uint32_t count)
{
	uint32_t tx_packet_index, old_tx_packet_index = lat->tx_packet_index, n_overflow = 0;
	uint32_t small = UINT32_MAX >> 1;

	lat++;

	/* Buffer is sorted so far by RX time.
	 * We might have packets being reordered by SUT.
	 *     => consider small differences as re-order and big ones as overflow of tx_packet_index.
	 * Note that:
	 *	- overflow only happens if receiving and storing 4 billions packets...
	 *	- a absolute difference of less than 2 billion packets is not considered as an overflow
	 */
	for (uint32_t i = 1; i < count; i++) {
		tx_packet_index = lat->tx_packet_index;
		if (tx_packet_index > old_tx_packet_index) {
			if (tx_packet_index - old_tx_packet_index < small) {
				// The diff is small => increasing index count
			} else {
				// The diff is big => it is more likely that the previous packet was overflow
				n_overflow--;
			}
		} else {
			if (old_tx_packet_index - tx_packet_index < small) {
				// The diff is small => packet reorder
			} else {
				// The diff is big => it is more likely that this is an overflow
				n_overflow++;
			}
		}
		lat->tx_packet_index += ((uint64_t)UINT32_MAX + 1) * n_overflow;
		old_tx_packet_index = tx_packet_index;
		lat++;
	}
}

static void fix_latency_buffer_tx_time(struct lat_info *lat, uint32_t count)
{
	uint32_t tx_time, old_tx_time = lat->tx_time, n_overflow = 0;
	uint32_t small = UINT32_MAX >> 1;
	lat++;

	/*
	 * Same algorithm as above, but with time.
	 * Note that:
	 *	- overflow happens after 4 billions "cycles" (shifted by LATENCY_ACCURACY) = ~4sec
	 *	- a absolute difference up to 2 billion (shifted) cycles (~=2sec) is not considered as an overflow
	 *		=> algorithm does not work if receiving less than 1 packet every 2 seconds
	 */
	for (uint32_t i = 1; i < count; i++) {
		tx_time = lat->tx_time;
		if (tx_time > old_tx_time) {
			if (tx_time - old_tx_time > small) {
				n_overflow--;
			}
		} else {
			if (old_tx_time - tx_time > small) {
				n_overflow++;
			}
		}
		lat->tx_time += ((uint64_t)UINT32_MAX + 1) * n_overflow;
		old_tx_time = tx_time;
		lat++;
	}
}

static void task_lat_count_remaining_lost_packets(struct task_lat *task)
{
	struct lat_test *lat_test = task->lat_test;

	for (uint32_t j = 0; j < task->generator_count; j++) {
		struct early_loss_detect *eld = &task->eld[j];

		lat_test->lost_packets += early_loss_detect_count_remaining_loss(eld);
	}
}

static void task_lat_reset_eld(struct task_lat *task)
{
	for (uint32_t j = 0; j < task->generator_count; j++) {
		early_loss_detect_reset(&task->eld[j]);
	}
}

static uint64_t lat_latency_buffer_get_min_tsc(struct task_lat *task)
{
	uint64_t min_tsc = UINT64_MAX;

	for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
		if (min_tsc > task->latency_buffer[i].tx_time)
			min_tsc = task->latency_buffer[i].tx_time;
	}

	return min_tsc << LATENCY_ACCURACY;
}

static uint64_t lat_info_get_lat_tsc(struct lat_info *lat_info)
{
	uint64_t lat = diff_time(lat_info->rx_time, lat_info->tx_time);

	return lat << LATENCY_ACCURACY;
}

static uint64_t lat_info_get_tx_err_tsc(const struct lat_info *lat_info)
{
	return ((uint64_t)lat_info->tx_err) << LATENCY_ACCURACY;
}

static uint64_t lat_info_get_rx_err_tsc(const struct lat_info *lat_info)
{
	return ((uint64_t)lat_info->rx_err) << LATENCY_ACCURACY;
}

static uint64_t lat_info_get_rx_tsc(const struct lat_info *lat_info)
{
	return ((uint64_t)lat_info->rx_time) << LATENCY_ACCURACY;
}

static uint64_t lat_info_get_tx_tsc(const struct lat_info *lat_info)
{
	return ((uint64_t)lat_info->tx_time) << LATENCY_ACCURACY;
}

static void lat_write_latency_to_file(struct task_lat *task)
{
	uint64_t min_tsc;
	uint64_t n_loss;

	min_tsc = lat_latency_buffer_get_min_tsc(task);

	// Dumping all packet statistics
	fprintf(task->fp_rx, "Latency stats for %u packets, ordered by rx time\n", task->latency_buffer_idx);
	fprintf(task->fp_rx, "rx index; queue; tx index; lat (nsec);tx time;\n");
	for (uint32_t i = 0; i < task->latency_buffer_idx ; i++) {
		struct lat_info *lat_info = &task->latency_buffer[i];
		uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
		uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
		uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);

		fprintf(task->fp_rx, "%u;%u;%lu;%lu;%lu;%lu\n",
			lat_info->rx_packet_index,
			lat_info->port_queue_id,
			lat_info->tx_packet_index,
			tsc_to_nsec(lat_tsc),
			tsc_to_nsec(rx_tsc - min_tsc),
			tsc_to_nsec(tx_tsc - min_tsc));
	}

	// To detect dropped packets, we need to sort them based on TX
	if (task->unique_id_pos) {
		plogx_info("Adapting tx_packet_index\n");
		fix_latency_buffer_tx_packet_index(task->latency_buffer, task->latency_buffer_idx);
		plogx_info("Sorting packets based on tx_packet_index\n");
		qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_packet_index);
		plogx_info("Sorted packets based on packet_index\n");
	} else {
		plogx_info("Adapting tx_time\n");
		fix_latency_buffer_tx_time(task->latency_buffer, task->latency_buffer_idx);
		plogx_info("Sorting packets based on tx_time\n");
		qsort (task->latency_buffer, task->latency_buffer_idx, sizeof(struct lat_info), compare_tx_time);
		plogx_info("Sorted packets based on packet_time\n");
	}

	// A packet is marked as dropped if 2 packets received from the same queue are not consecutive
	fprintf(task->fp_tx, "Latency stats for %u packets, sorted by tx time\n", task->latency_buffer_idx);
	fprintf(task->fp_tx, "queue;tx index; rx index; lat (nsec);tx time; rx time; tx_err;rx_err\n");

	for (uint32_t i = 0; i < task->generator_count;i++)
		task->prev_tx_packet_index[i] = -1;

	for (uint32_t i = 0; i < task->latency_buffer_idx; i++) {
		struct lat_info *lat_info = &task->latency_buffer[i];
		uint64_t lat_tsc = lat_info_get_lat_tsc(lat_info);
		uint64_t tx_err_tsc = lat_info_get_tx_err_tsc(lat_info);
		uint64_t rx_err_tsc = lat_info_get_rx_err_tsc(lat_info);
		uint64_t rx_tsc = lat_info_get_rx_tsc(lat_info);
		uint64_t tx_tsc = lat_info_get_tx_tsc(lat_info);

		/* Packet n + ACCURACY_WINDOW delivers the TX error for packet n,
		   hence the last ACCURACY_WINDOW packets do no have TX error. */
		if (i + ACCURACY_WINDOW >= task->latency_buffer_idx) {
			tx_err_tsc = 0;
		}

		if (lat_info->port_queue_id >= task->generator_count) {
			plog_err("Unexpected generator id %u for packet %lu - skipping packet\n",
				lat_info->port_queue_id, lat_info->tx_packet_index);
			continue;
		}
		// Log dropped packet
		n_loss = lat_info->tx_packet_index - task->prev_tx_packet_index[lat_info->port_queue_id] - 1;
		if (n_loss)
			fprintf(task->fp_tx, "===> %u;%lu;0;0;0;0;0;0 lost %lu packets <===\n",
				lat_info->port_queue_id,
				lat_info->tx_packet_index - n_loss, n_loss);
		// Log next packet
		fprintf(task->fp_tx, "%u;%lu;%u;%lu;%lu;%lu;%lu;%lu",
			lat_info->port_queue_id,
			lat_info->tx_packet_index,
			lat_info->rx_packet_index,
			tsc_to_nsec(lat_tsc),
			tsc_to_nsec(tx_tsc - min_tsc),
			tsc_to_nsec(rx_tsc - min_tsc),
			tsc_to_nsec(tx_err_tsc),
			tsc_to_nsec(rx_err_tsc));
#ifdef LAT_DEBUG
		fprintf(task->fp_tx, ";%u from %u;%lu;%lu;%lu",
			lat_info->id_in_bulk,
			lat_info->bulk_size,
			tsc_to_nsec(lat_info->begin - min_tsc),
			tsc_to_nsec(lat_info->before - min_tsc),
			tsc_to_nsec(lat_info->after - min_tsc));
#endif
		fprintf(task->fp_tx, "\n");
		task->prev_tx_packet_index[lat_info->port_queue_id] = lat_info->tx_packet_index;
	}
	fflush(task->fp_rx);
	fflush(task->fp_tx);
	task->latency_buffer_idx = 0;
}

static void lat_stop(struct task_base *tbase)
{
	struct task_lat *task = (struct task_lat *)tbase;

	if (task->unique_id_pos) {
		task_lat_count_remaining_lost_packets(task);
		task_lat_reset_eld(task);
		memset(task->previous_packet, 0, sizeof(task->previous_packet) * task->generator_count);
	}
	if (task->loss_id) {
		for (uint i = 0; i < task->loss_id; i++) {
			fprintf(task->fp_loss, "packet %d: %d\n", task->loss_buffer[i].packet_id, task->loss_buffer[i].n);
		}
	}
	task->lat_test->lost_packets = 0;
	if (task->latency_buffer)
		lat_write_latency_to_file(task);
}

#ifdef LAT_DEBUG
static void task_lat_store_lat_debug(struct task_lat *task, uint32_t rx_packet_index, uint32_t id_in_bulk, uint32_t bulk_size)
{
	struct lat_info *lat_info = &task->latency_buffer[rx_packet_index];

	lat_info->bulk_size = bulk_size;
	lat_info->id_in_bulk = id_in_bulk;
	lat_info->begin = task->begin;
	lat_info->before = task->base.aux->tsc_rx.before;
	lat_info->after = task->base.aux->tsc_rx.after;
}
#endif

static void task_lat_store_lat_buf(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_err, uint64_t tx_err, uint32_t packet_id, uint8_t generator_id)
{
	struct lat_info *lat_info;

	/* If unique_id_pos is specified then latency is stored per
	   packet being sent. Lost packets are detected runtime, and
	   latency stored for those packets will be 0 */
	lat_info = &task->latency_buffer[task->latency_buffer_idx++];
	lat_info->rx_packet_index = rx_packet_index;
	lat_info->tx_packet_index = packet_id;
	lat_info->port_queue_id = generator_id;
	lat_info->rx_time = rx_time;
	lat_info->tx_time = tx_time;
	lat_info->rx_err = rx_err;
	lat_info->tx_err = tx_err;
}

static uint32_t task_lat_early_loss_detect(struct task_lat *task, uint32_t packet_id, uint8_t generator_id)
{
	struct early_loss_detect *eld = &task->eld[generator_id];
	return early_loss_detect_add(eld, packet_id);
}

static void lat_test_check_duplicate(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
{
	struct early_loss_detect *eld = &task->eld[generator_id];
	uint32_t old_queue_id, queue_pos;

	queue_pos = packet_id & PACKET_QUEUE_MASK;
	old_queue_id = eld->entries[queue_pos];
	if ((packet_id >> PACKET_QUEUE_BITS) == old_queue_id)
		lat_test->duplicate++;
}

static uint64_t tsc_extrapolate_backward(struct task_lat *task, uint64_t tsc_from, uint64_t bytes, uint64_t tsc_minimum)
{
#ifdef NO_LAT_EXTRAPOLATION
	uint64_t tsc = tsc_from;
#else
	uint64_t tsc = tsc_from - task->bytes_to_tsc[bytes];
#endif
	if (likely(tsc > tsc_minimum))
		return tsc;
	else
		return tsc_minimum;
}

static void lat_test_histogram_add(struct lat_test *lat_test, uint64_t lat_tsc)
{
	uint64_t bucket_id = (lat_tsc >> lat_test->bucket_size);
	size_t bucket_count = sizeof(lat_test->buckets)/sizeof(lat_test->buckets[0]);

	bucket_id = bucket_id < bucket_count? bucket_id : (bucket_count - 1);
	lat_test->buckets[bucket_id]++;
}

static void lat_test_check_flow_ordering(struct task_lat *task, struct lat_test *lat_test, int32_t flow_id, uint32_t packet_id)
{
	if (packet_id < task->flows[flow_id].packet_id) {
		lat_test->mis_ordered++;
		lat_test->extent += task->flows[flow_id].packet_id - packet_id;
	}
	task->flows[flow_id].packet_id = packet_id;
}

static void lat_test_check_ordering(struct task_lat *task, struct lat_test *lat_test, uint32_t packet_id, uint8_t generator_id)
{
	if (packet_id < task->previous_packet[generator_id]) {
		lat_test->mis_ordered++;
		lat_test->extent += task->previous_packet[generator_id] - packet_id;
	}
	task->previous_packet[generator_id] = packet_id;
}

static void lat_test_add_lost(struct lat_test *lat_test, uint64_t lost_packets)
{
	lat_test->lost_packets += lost_packets;
}

static void lat_test_add_latency(struct lat_test *lat_test, uint64_t lat_tsc, uint64_t error)
{
	if (error > lat_test->accuracy_limit_tsc)
		return;
	lat_test->tot_pkts++;

	lat_test->tot_lat += lat_tsc;
	lat_test->tot_lat_error += error;

	/* (a +- b)^2 = a^2 +- (2ab + b^2) */
	lat_test->var_lat += lat_tsc * lat_tsc;
	lat_test->var_lat_error += 2 * lat_tsc * error;
	lat_test->var_lat_error += error * error;

	if (lat_tsc > lat_test->max_lat) {
		lat_test->max_lat = lat_tsc;
		lat_test->max_lat_error = error;
	}
	if (lat_tsc < lat_test->min_lat) {
		lat_test->min_lat = lat_tsc;
		lat_test->min_lat_error = error;
	}

#ifdef LATENCY_HISTOGRAM
	lat_test_histogram_add(lat_test, lat_tsc);
#endif
}

static int task_lat_can_store_latency(struct task_lat *task)
{
	return task->latency_buffer_idx < task->latency_buffer_size;
}

static void task_lat_store_lat(struct task_lat *task, uint64_t rx_packet_index, uint64_t rx_time, uint64_t tx_time, uint64_t rx_error, uint64_t tx_error, uint32_t packet_id, uint8_t generator_id)
{
	uint32_t lat_tsc = diff_time(rx_time, tx_time) << LATENCY_ACCURACY;

	lat_test_add_latency(task->lat_test, lat_tsc, rx_error + tx_error);

	if (task_lat_can_store_latency(task)) {
		task_lat_store_lat_buf(task, rx_packet_index, rx_time, tx_time, rx_error, tx_error, packet_id, generator_id);
	}
}

static int handle_lat_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
{
	struct task_lat *task = (struct task_lat *)tbase;
	static int max_flows_printed = 0;
	int rc;

	if (n_pkts == 0) {
		task->begin = tbase->aux->tsc_rx.before;
		return 0;
	}

	task_lat_update_lat_test(task);

	// Remember those packets with bad length or bad signature
	uint32_t non_dp_count = 0;
	uint64_t pkt_bad_len_sig = 0;
#define BIT64_SET(a64, bit)	a64 |=  (((uint64_t)1) << (bit & 63))
#define BIT64_CLR(a64, bit)	a64 &= ~(((uint64_t)1) << (bit & 63))
#define BIT64_TEST(a64, bit)	a64  &  (((uint64_t)1) << (bit & 63))

	/* Go once through all received packets and read them.  If
	   packet has just been modified by another core, the cost of
	   latency will be partialy amortized though the bulk size */
	for (uint16_t j = 0; j < n_pkts; ++j) {
		struct rte_mbuf *mbuf = mbufs[j];
		task->rx_pkt_meta[j].hdr = rte_pktmbuf_mtod(mbuf, uint8_t *);

		// Remember those packets which are too short to hold the values that we expect
		if (unlikely(rte_pktmbuf_pkt_len(mbuf) < task->min_pkt_len)) {
			BIT64_SET(pkt_bad_len_sig, j);
			non_dp_count++;
		} else
			BIT64_CLR(pkt_bad_len_sig, j);
	}

	if (task->sig_pos) {
		for (uint16_t j = 0; j < n_pkts; ++j) {
			if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
				continue;
			// Remember those packets with bad signature
			if (likely(*(uint32_t *)(task->rx_pkt_meta[j].hdr + task->sig_pos) == task->sig))
				task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
			else {
				BIT64_SET(pkt_bad_len_sig, j);
				non_dp_count++;
			}
		}
	} else {
		for (uint16_t j = 0; j < n_pkts; ++j) {
			if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
				continue;
			task->rx_pkt_meta[j].pkt_tx_time = *(uint32_t *)(task->rx_pkt_meta[j].hdr + task->lat_pos);
		}
	}

	uint32_t bytes_total_in_bulk = 0;
	// Find RX time of first packet, for RX accuracy
	for (uint16_t j = 0; j < n_pkts; ++j) {
		uint16_t flipped = n_pkts - 1 - j;

		task->rx_pkt_meta[flipped].bytes_after_in_bulk = bytes_total_in_bulk;
		bytes_total_in_bulk += mbuf_wire_size(mbufs[flipped]);
	}

	const uint64_t rx_tsc = tbase->aux->tsc_rx.after;

	uint64_t rx_time_err;
	uint64_t pkt_rx_time64 = tsc_extrapolate_backward(task, rx_tsc, task->rx_pkt_meta[0].bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
	if (unlikely((task->begin >> LATENCY_ACCURACY) > pkt_rx_time64)) {
		// Extrapolation went up to BEFORE begin => packets were stuck in the NIC but we were not seeing them
		rx_time_err = pkt_rx_time64 - (task->last_pkts_tsc >> LATENCY_ACCURACY);
	} else {
		rx_time_err = pkt_rx_time64 - (task->begin >> LATENCY_ACCURACY);
	}

	TASK_STATS_ADD_RX_NON_DP(&tbase->aux->stats, non_dp_count);
	for (uint16_t j = 0; j < n_pkts; ++j) {
		// Used to display % of packets within accuracy limit vs. total number of packets (used_col)
		task->lat_test->tot_all_pkts++;

		// Skip those packets with bad length or bad signature
		if (unlikely(BIT64_TEST(pkt_bad_len_sig, j)))
			continue;

		struct rx_pkt_meta_data *rx_pkt_meta = &task->rx_pkt_meta[j];
		uint8_t *hdr = rx_pkt_meta->hdr;

		uint32_t pkt_rx_time = tsc_extrapolate_backward(task, rx_tsc, rx_pkt_meta->bytes_after_in_bulk, task->last_pkts_tsc) >> LATENCY_ACCURACY;
		uint32_t pkt_tx_time = rx_pkt_meta->pkt_tx_time;

		uint8_t generator_id;
		uint32_t packet_id;
		int32_t flow_id = -1;
		if (task->flow_id_pos) {
			flow_id = *(int32_t *)(hdr + task->flow_id_pos);
			if (unlikely(flow_id >= (int32_t)(task->flow_count))) {
				flow_id = -1;
				if (!max_flows_printed) {
					plog_info("Too many flows - increase flow count (only printed once)\n");
					max_flows_printed = 1;
				}
			}

		}
		if (task->packet_id_in_flow_pos && (flow_id != -1)) {
			uint32_t packet_id_in_flow;
			struct unique_id *unique_id = (struct unique_id *)(hdr + task->packet_id_in_flow_pos);
			unique_id_get(unique_id, &generator_id, &packet_id_in_flow);
			lat_test_check_flow_ordering(task, task->lat_test, flow_id + generator_id * task->generator_count, packet_id_in_flow);
		}
		if (task->unique_id_pos) {
			struct unique_id *unique_id = (struct unique_id *)(hdr + task->unique_id_pos);
			unique_id_get(unique_id, &generator_id, &packet_id);

			if (unlikely(generator_id >= task->generator_count)) {
				/* No need to remember unexpected packet at this stage
				BIT64_SET(pkt_bad_len_sig, j);
				*/
				// Skip unexpected packet
				continue;
			}
			if (flow_id == -1) {
				lat_test_check_ordering(task, task->lat_test, packet_id, generator_id);
			}
			lat_test_check_duplicate(task, task->lat_test, packet_id, generator_id);
			uint32_t loss =  task_lat_early_loss_detect(task, packet_id, generator_id);
			if (loss) {
				lat_test_add_lost(task->lat_test, loss);
				if (task->loss_id < task->loss_buffer_size) {
					task->loss_buffer[task->loss_id].packet_id = packet_id;
					task->loss_buffer[task->loss_id++].n = loss;
				}
			}
		} else {
			generator_id = 0;
			packet_id = task->rx_packet_index;
		}

		/* If accuracy is enabled, latency is reported with a
		   delay of ACCURACY_WINDOW packets since the generator puts the
		   accuracy for packet N into packet N + ACCURACY_WINDOW. The delay
		   ensures that all reported latencies have both rx
		   and tx error. */
		if (task->accur_pos) {
			uint32_t tx_time_err = *(uint32_t *)(hdr + task->accur_pos);

			struct delayed_latency_entry *delayed_latency_entry = delayed_latency_get(task->delayed_latency_entries, generator_id, packet_id - ACCURACY_WINDOW);

			if (delayed_latency_entry) {
				task_lat_store_lat(task,
						   delayed_latency_entry->rx_packet_id,
						   delayed_latency_entry->pkt_rx_time,
						   delayed_latency_entry->pkt_tx_time,
						   delayed_latency_entry->rx_time_err,
						   tx_time_err,
						   delayed_latency_entry->tx_packet_id,
						   delayed_latency_entry->generator_id);
			}

			delayed_latency_entry = delayed_latency_create(task->delayed_latency_entries, generator_id, packet_id);
			delayed_latency_entry->pkt_rx_time = pkt_rx_time;
			delayed_latency_entry->pkt_tx_time = pkt_tx_time;
			delayed_latency_entry->rx_time_err = rx_time_err;
			delayed_latency_entry->rx_packet_id = task->rx_packet_index;
			delayed_latency_entry->tx_packet_id = packet_id;
			delayed_latency_entry->generator_id = generator_id;
		} else {
			task_lat_store_lat(task, task->rx_packet_index, pkt_rx_time, pkt_tx_time, 0, 0, packet_id, generator_id);
		}

		// Bad/unexpected packets do not need to be indexed
		task->rx_packet_index++;
	}

	if (n_pkts < MAX_PKT_BURST)
		task->begin = tbase->aux->tsc_rx.before;
	task->last_pkts_tsc = tbase->aux->tsc_rx.after;

	rc = task->base.tx_pkt(&task->base, mbufs, n_pkts, NULL);
	// non_dp_count should not be drop-handled, as there are all by definition considered as not handled
	// RX = DISCARDED + HANDLED + NON_DP + (TX - TX_NON_DP) + TX_FAIL
	TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, -non_dp_count);
	return rc;
}

static void init_task_lat_latency_buffer(struct task_lat *task, uint32_t core_id)
{
	const int socket_id = rte_lcore_to_socket_id(core_id);
	char name[256];
	size_t latency_buffer_mem_size = 0;

	if (task->latency_buffer_size > UINT32_MAX - MAX_RING_BURST)
		task->latency_buffer_size = UINT32_MAX - MAX_RING_BURST;

	latency_buffer_mem_size = sizeof(struct lat_info) * task->latency_buffer_size;

	task->latency_buffer = prox_zmalloc(latency_buffer_mem_size, socket_id);
	PROX_PANIC(task->latency_buffer == NULL, "Failed to allocate %zu kbytes for latency_buffer\n", latency_buffer_mem_size / 1024);

	sprintf(name, "latency.rx_%u.txt", core_id);
	task->fp_rx = fopen(name, "w+");
	PROX_PANIC(task->fp_rx == NULL, "Failed to open %s\n", name);

	sprintf(name, "latency.tx_%u.txt", core_id);
	task->fp_tx = fopen(name, "w+");
	PROX_PANIC(task->fp_tx == NULL, "Failed to open %s\n", name);

	task->prev_tx_packet_index = prox_zmalloc(sizeof(task->prev_tx_packet_index[0]) * task->generator_count, socket_id);
	PROX_PANIC(task->prev_tx_packet_index == NULL, "Failed to allocated prev_tx_packet_index\n");
}

static void task_init_generator_count(struct task_lat *task)
{
	uint8_t *generator_count = prox_sh_find_system("generator_count");

	if (generator_count == NULL) {
		task->generator_count = 1;
		plog_info("\tNo generators found, hard-coding to %u generators\n", task->generator_count);
	} else
		task->generator_count = *generator_count;
	plog_info("\t\tLatency using %u generators\n", task->generator_count);
}

static void task_lat_init_eld(struct task_lat *task, uint8_t socket_id)
{
	size_t eld_mem_size;

	eld_mem_size = sizeof(task->eld[0]) * task->generator_count;
	task->eld = prox_zmalloc(eld_mem_size, socket_id);
	PROX_PANIC(task->eld == NULL, "Failed to allocate eld\n");
}

void task_lat_set_accuracy_limit(struct task_lat *task, uint32_t accuracy_limit_nsec)
{
	task->limit = nsec_to_tsc(accuracy_limit_nsec);
}

static void lat_start(struct task_base *tbase)
{
	struct task_lat *task = (struct task_lat *)tbase;

}

static void init_task_lat(struct task_base *tbase, struct task_args *targ)
{
	struct task_lat *task = (struct task_lat *)tbase;
	const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);

	task->lat_pos = targ->lat_pos;
	task->accur_pos = targ->accur_pos;
	task->sig_pos = targ->sig_pos;
	task->sig = targ->sig;
	task->packet_id_in_flow_pos = targ->packet_id_in_flow_pos;
	task->flow_id_pos = targ->flow_id_pos;

	task->unique_id_pos = targ->packet_id_pos;
	task->latency_buffer_size = targ->latency_buffer_size;

	PROX_PANIC(task->lat_pos == 0, "Missing 'lat pos' parameter in config file\n");
	uint16_t min_pkt_len = task->lat_pos + sizeof(uint32_t);
	if (task->unique_id_pos && (
		min_pkt_len < task->unique_id_pos + sizeof(struct unique_id)))
		min_pkt_len = task->unique_id_pos + sizeof(struct unique_id);
	if (task->accur_pos && (
		min_pkt_len < task->accur_pos + sizeof(uint32_t)))
		min_pkt_len = task->accur_pos + sizeof(uint32_t);
	if (task->sig_pos && (
		min_pkt_len < task->sig_pos + sizeof(uint32_t)))
		min_pkt_len = task->sig_pos + sizeof(uint32_t);
	task->min_pkt_len = min_pkt_len;

	task_init_generator_count(task);

	if (task->latency_buffer_size) {
		init_task_lat_latency_buffer(task, targ->lconf->id);
	}

	char name[256];
	sprintf(name, "loss_%u.txt", targ->lconf->id);
	task->fp_loss = fopen(name, "w+");
	PROX_PANIC(task->fp_loss == NULL, "Failed to open %s\n", name);

	if (targ->bucket_size < DEFAULT_BUCKET_SIZE) {
		targ->bucket_size = DEFAULT_BUCKET_SIZE;
	}

	if (task->accur_pos) {
		task->delayed_latency_entries = prox_zmalloc(sizeof(*task->delayed_latency_entries) * task->generator_count , socket_id);
		PROX_PANIC(task->delayed_latency_entries == NULL, "Failed to allocate array for storing delayed latency entries\n");
		for (uint i = 0; i < task->generator_count; i++) {
			task->delayed_latency_entries[i] = prox_zmalloc(sizeof(**task->delayed_latency_entries) * ACCURACY_BUFFER_SIZE, socket_id);
			PROX_PANIC(task->delayed_latency_entries[i] == NULL, "Failed to allocate array for storing delayed latency entries\n");
		}
		if (task->unique_id_pos == 0) {
			/* When using accuracy feature, the accuracy from TX is written ACCURACY_WINDOW packets later
			* We can only retrieve the good packet if a packet id is written to it.
			* Otherwise we will use the packet RECEIVED ACCURACY_WINDOW packets ago which is OK if
			* packets are not re-ordered. If packets are re-ordered, then the matching between
			* the TX accuracy and the latency is wrong.
			*/
			plog_warn("\tWhen accuracy feature is used, a unique id should ideally also be used\n");
		}
	}

	task->lt[0].min_lat = -1;
	task->lt[1].min_lat = -1;
	task->lt[0].bucket_size = targ->bucket_size;
	task->lt[1].bucket_size = targ->bucket_size;
        if (task->unique_id_pos) {
		task_lat_init_eld(task, socket_id);
		task_lat_reset_eld(task);
		task->previous_packet = prox_zmalloc(sizeof(task->previous_packet) * task->generator_count , socket_id);
		PROX_PANIC(task->previous_packet == NULL, "Failed to allocate array for storing previous packet\n");
        }
	task->lat_test = &task->lt[task->using_lt];

	task_lat_set_accuracy_limit(task, targ->accuracy_limit_nsec);
	task->rx_pkt_meta = prox_zmalloc(MAX_PKT_BURST * sizeof(*task->rx_pkt_meta), socket_id);
	PROX_PANIC(task->rx_pkt_meta == NULL, "unable to allocate memory to store RX packet meta data");

	uint32_t max_frame_size = MAX_PKT_SIZE;
	uint64_t bytes_per_hz = UINT64_MAX;
	if (targ->nb_rxports) {
		struct prox_port_cfg *port = &prox_port_cfg[targ->rx_port_queue[0].port];
		max_frame_size = port->mtu + PROX_RTE_ETHER_HDR_LEN + PROX_RTE_ETHER_CRC_LEN + 2 * PROX_VLAN_TAG_SIZE;

		// port->max_link_speed reports the maximum, non negotiated ink speed in Mbps e.g. 40k for a 40 Gbps NIC.
		// It can be UINT32_MAX (virtual devices or not supported by DPDK < 16.04)
		if (port->max_link_speed != UINT32_MAX) {
			bytes_per_hz = port->max_link_speed * 125000L;
			plog_info("\t\tPort %u: max link speed is %ld Mbps\n",
				(uint8_t)(port - prox_port_cfg), 8 * bytes_per_hz / 1000000);
		}
	}
	task->loss_buffer_size = targ->loss_buffer_size;
	if (task->loss_buffer_size) {
		task->loss_buffer = prox_zmalloc(task->loss_buffer_size * sizeof(struct loss_buffer), rte_lcore_to_socket_id(targ->lconf->id));
		PROX_PANIC(task->loss_buffer == NULL,
			"Failed to allocate %lu bytes (in huge pages) for loss_buffer\n", task->loss_buffer_size * sizeof(struct loss_buffer));
	}
	task->bytes_to_tsc = prox_zmalloc(max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST, rte_lcore_to_socket_id(targ->lconf->id));
	PROX_PANIC(task->bytes_to_tsc == NULL,
		"Failed to allocate %lu bytes (in huge pages) for bytes_to_tsc\n", max_frame_size * sizeof(task->bytes_to_tsc[0]) * MAX_PKT_BURST);

        // There are cases where hz estimate might be slighly over-estimated
        // This results in too much extrapolation
        // Only account for 99% of extrapolation to handle cases with up to 1% error clocks
	for (unsigned int i = 0; i < max_frame_size * MAX_PKT_BURST ; i++) {
		if (bytes_per_hz == UINT64_MAX)
			task->bytes_to_tsc[i] = 0;
		else
			task->bytes_to_tsc[i] = (rte_get_tsc_hz() * i * 0.99) / bytes_per_hz;
	}
	task->flow_count = targ->flow_count;
	PROX_PANIC(task->flow_id_pos && (task->flow_count == 0), "flow_count must be configured when flow_id_pos is set\n");
	if (task->flow_count) {
		task->flows = prox_zmalloc(task->flow_count * sizeof(struct flows) * task->generator_count, rte_lcore_to_socket_id(targ->lconf->id));
		PROX_PANIC(task->flows == NULL,
			"Failed to allocate %lu bytes (in huge pages) for flows\n", task->flow_count * sizeof(struct flows) * task->generator_count);
	}
}

static struct task_init task_init_lat = {
	.mode_str = "lat",
	.init = init_task_lat,
	.handle = handle_lat_bulk,
	.start = lat_start,
	.stop = lat_stop,
	.flag_features = TASK_FEATURE_TSC_RX | TASK_FEATURE_ZERO_RX | TASK_FEATURE_NEVER_DISCARDS,
	.size = sizeof(struct task_lat)
};

__attribute__((constructor)) static void reg_task_lat(void)
{
	reg_task(&task_init_lat);
}