summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/handle_fm.c
blob: c4a10e67f28bca623c42fd7079283af8e85923d3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
/*
// Copyright (c) 2010-2017 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
*/

#include <dlfcn.h>

#include <rte_ip.h>
#include <rte_udp.h>
#include <rte_tcp.h>
#include <rte_cycles.h>
#include <rte_ether.h>
#include <rte_eth_ctrl.h>

#include "log.h"
#include "quit.h"
#include "lconf.h"
#include "task_init.h"
#include "task_base.h"
#include "kv_store_expire.h"
#include "stats.h"
#include "prox_shared.h"
#include "etypes.h"
#include "prox_cfg.h"
#include "dpi/dpi.h"

struct task_dpi_per_core {
	void     *dpi_opaque;
};

struct task_fm {
	struct task_base          base;
	/* FM related fields */
	struct kv_store_expire   *kv_store_expire;
	void                     *dpi_opaque;

	struct dpi_engine        dpi_engine;
	struct task_dpi_per_core *dpi_shared; /* Used only during init */
};

struct eth_ip4_udp {
	struct ether_hdr l2;
	struct ipv4_hdr  l3;
	union {
		struct udp_hdr   udp;
		struct tcp_hdr   tcp;
	} l4;
} __attribute__((packed));

union pkt_type {
	struct {
		uint16_t etype;
		uint8_t  ip_byte;
		uint8_t  next_proto;
	} __attribute__((packed));
	uint32_t val;
};

static union pkt_type pkt_type_udp = {
	.next_proto = IPPROTO_UDP,
	.ip_byte    = 0x45,
	.etype      = ETYPE_IPv4,
};

static union pkt_type pkt_type_tcp = {
	.next_proto = IPPROTO_TCP,
	.ip_byte    = 0x45,
	.etype      = ETYPE_IPv4,
};

static int extract_flow_info(struct eth_ip4_udp *p, struct flow_info *fi, struct flow_info *fi_flipped, uint32_t *len, uint8_t **payload)
{
	union pkt_type pkt_type = {
		.next_proto = p->l3.next_proto_id,
		.ip_byte    = p->l3.version_ihl,
		.etype      = p->l2.ether_type,
	};

	memset(fi->reservered, 0, sizeof(fi->reservered));
	memset(fi_flipped->reservered, 0, sizeof(fi_flipped->reservered));

	if (pkt_type.val == pkt_type_udp.val) {
		fi->ip_src = p->l3.src_addr;
		fi->ip_dst = p->l3.dst_addr;
		fi->ip_proto = p->l3.next_proto_id;
		fi->port_src = p->l4.udp.src_port;
		fi->port_dst = p->l4.udp.dst_port;

		fi_flipped->ip_src = p->l3.dst_addr;
		fi_flipped->ip_dst = p->l3.src_addr;
		fi_flipped->ip_proto = p->l3.next_proto_id;
		fi_flipped->port_src = p->l4.udp.dst_port;
		fi_flipped->port_dst = p->l4.udp.src_port;

		*len = rte_be_to_cpu_16(p->l4.udp.dgram_len) - sizeof(struct udp_hdr);
		*payload = (uint8_t*)(&p->l4.udp) + sizeof(struct udp_hdr);
		return 0;
	}
	else if (pkt_type.val == pkt_type_tcp.val) {
		fi->ip_src = p->l3.src_addr;
		fi->ip_dst = p->l3.dst_addr;
		fi->ip_proto = p->l3.next_proto_id;
		fi->port_src = p->l4.tcp.src_port;
		fi->port_dst = p->l4.tcp.dst_port;

		fi_flipped->ip_src = p->l3.dst_addr;
		fi_flipped->ip_dst = p->l3.src_addr;
		fi_flipped->ip_proto = p->l3.next_proto_id;
		fi_flipped->port_src = p->l4.tcp.dst_port;
		fi_flipped->port_dst = p->l4.tcp.src_port;

		*len = rte_be_to_cpu_16(p->l3.total_length) - sizeof(struct ipv4_hdr) - ((p->l4.tcp.data_off >> 4)*4);
		*payload = ((uint8_t*)&p->l4.tcp) + ((p->l4.tcp.data_off >> 4)*4);
		return 0;
	}

	return -1;
}

static int is_flow_beg(const struct flow_info *fi, const struct eth_ip4_udp *p)
{
	return fi->ip_proto == IPPROTO_UDP ||
		(fi->ip_proto == IPPROTO_TCP && p->l4.tcp.tcp_flags & TCP_SYN_FLAG);
}

static void *lookup_flow(struct task_fm *task, struct flow_info *fi, uint64_t now_tsc)
{
	struct kv_store_expire_entry *entry;

	entry = kv_store_expire_get(task->kv_store_expire, fi, now_tsc);

	return entry ? entry_value(task->kv_store_expire, entry) : NULL;
}

static void *lookup_or_insert_flow(struct task_fm *task, struct flow_info *fi, uint64_t now_tsc)
{
	struct kv_store_expire_entry *entry;

	entry = kv_store_expire_get_or_put(task->kv_store_expire, fi, now_tsc);

	return entry ? entry_value(task->kv_store_expire, entry) : NULL;
}

static int handle_fm(struct task_fm *task, struct rte_mbuf *mbuf, uint64_t now_tsc)
{
	struct eth_ip4_udp *p;
	struct flow_info fi, fi_flipped;
	void *flow_data;
	uint32_t len;
	uint8_t *payload;
	uint32_t res[2];
	size_t res_len = 2;
	int flow_beg;
	struct dpi_payload dpi_payload;
	int is_upstream = 0;

	p = rte_pktmbuf_mtod(mbuf, struct eth_ip4_udp *);

	if (0 != extract_flow_info(p, &fi, &fi_flipped, &len, &payload)) {
		plogx_err("Unknown packet type\n");
		return OUT_DISCARD;
	}

	/* First, try to see if the flow already exists where the
	   current packet is sent by the server. */
	if (!(flow_data = lookup_flow(task, &fi_flipped, now_tsc))) {
		/* Insert a new flow, only if this is the first packet
		   in the flow. */
		is_upstream = 1;
		if (is_flow_beg(&fi, p))
			flow_data = lookup_or_insert_flow(task, &fi, now_tsc);
		else
			flow_data = lookup_flow(task, &fi, now_tsc);
	}

	if (!flow_data)
		return OUT_DISCARD;
	else if (!len)
		return 0;

	dpi_payload.payload = payload;
	dpi_payload.len = len;
	dpi_payload.client_to_server = is_upstream;
	gettimeofday(&dpi_payload.tv, NULL);
	task->dpi_engine.dpi_process(task->dpi_opaque, is_upstream? &fi : &fi_flipped, flow_data, &dpi_payload, res, &res_len);
	return OUT_HANDLED;
}

static int handle_fm_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
{
	struct task_fm *task = (struct task_fm *)tbase;
	uint64_t now_tsc = rte_rdtsc();
	uint16_t handled = 0;
	uint16_t discard = 0;
	int ret;

	for (uint16_t i = 0; i < n_pkts; ++i) {
		ret = handle_fm(task, mbufs[i], now_tsc);
		if (ret == OUT_DISCARD)
			discard++;
		else if (ret == OUT_HANDLED)
			handled++;
	}

	for (uint16_t i = 0; i < n_pkts; ++i)
		rte_pktmbuf_free(mbufs[i]);

	TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, handled);
	TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, discard);
	return 0;
}

static void load_dpi_engine(const char *dpi_engine_path, struct dpi_engine *dst)
{
	void *handle = prox_sh_find_system(dpi_engine_path);

	if (handle == NULL) {
		plogx_info("Loading DPI engine from '%s'\n", dpi_engine_path);
		handle = dlopen(dpi_engine_path, RTLD_NOW | RTLD_GLOBAL);

		PROX_PANIC(handle == NULL, "Failed to load dpi engine from '%s' with error:\n\t\t%s\n", dpi_engine_path, dlerror());
		prox_sh_add_system(dpi_engine_path, handle);
	}

	struct dpi_engine *(*get_dpi_engine)(void) = dlsym(handle, "get_dpi_engine");

	PROX_PANIC(get_dpi_engine == NULL, "Failed to find get_dpi_engine function from '%s'\n", dpi_engine_path);
	struct dpi_engine *dpi_engine = get_dpi_engine();

	dpi_engine->dpi_print = plog_info;
	rte_memcpy(dst, dpi_engine, sizeof(*dst));
}

static uint32_t count_fm_cores(void)
{
	uint32_t n_cores = 0;
	uint32_t lcore_id = -1;
	struct lcore_cfg *lconf;

	while(prox_core_next(&lcore_id, 0) == 0) {
		lconf = &lcore_cfg[lcore_id];
		for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
			if (!strcmp(lconf->targs[task_id].task_init->mode_str, "fm")) {
				n_cores++;
				/* Only intersted in number of cores
				   so break here. */
				break;
			}
		}
	}

	return n_cores;
}

static struct kv_store_expire *get_shared_flow_table(struct task_args *targ, struct dpi_engine *de)
{
	struct kv_store_expire *ret = prox_sh_find_core(targ->lconf->id, "flow_table");
	const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);

	if (!ret) {
		ret = kv_store_expire_create(rte_align32pow2(targ->flow_table_size) * 4,
					     sizeof(struct flow_info),
					     de->dpi_get_flow_entry_size(),
					     socket_id,
					     de->dpi_flow_expire,
					     rte_get_tsc_hz() * 60);
		PROX_PANIC(ret == NULL, "Failed to allocate KV store\n");
		prox_sh_add_core(targ->lconf->id, "flow_table", ret);
	}
	return ret;
}

static struct task_dpi_per_core *get_shared_dpi_shared(struct task_args *targ)
{
	static const char *name = "dpi_shared";
	struct task_dpi_per_core *ret = prox_sh_find_core(targ->lconf->id, name);
	const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);

	if (!ret) {
		ret = prox_zmalloc(sizeof(*ret), socket_id);
		prox_sh_add_core(targ->lconf->id, name, ret);
	}
	return ret;
}

static void init_task_fm(struct task_base *tbase, struct task_args *targ)
{
	struct task_fm *task = (struct task_fm *)tbase;
	static int dpi_inited = 0;

	load_dpi_engine(targ->dpi_engine_path, &task->dpi_engine);

	task->kv_store_expire = get_shared_flow_table(targ, &task->dpi_engine);
	task->dpi_shared = get_shared_dpi_shared(targ);

	if (!dpi_inited) {
		uint32_t n_threads = count_fm_cores();
		const char *dpi_params[16];

		plogx_info("Initializing DPI with %u threads\n", n_threads);
		dpi_inited = 1;

		PROX_PANIC(targ->n_dpi_engine_args > 16, "Too many DPI arguments");
		for (size_t i = 0; i < targ->n_dpi_engine_args && i < 16; ++i)
			dpi_params[i] = targ->dpi_engine_args[i];

		int ret = task->dpi_engine.dpi_init(n_threads, targ->n_dpi_engine_args, dpi_params);

		PROX_PANIC(ret, "Failed to initialize DPI engine\n");
	}
}

static void start_first(struct task_base *tbase)
{
	struct task_fm *task = (struct task_fm *)tbase;
	void *ret = task->dpi_engine.dpi_thread_start();

	task->dpi_shared->dpi_opaque = ret;
	PROX_PANIC(ret == NULL, "dpi_thread_init failed\n");
}

static void start(struct task_base *tbase)
{
	struct task_fm *task = (struct task_fm *)tbase;

	task->dpi_opaque = task->dpi_shared->dpi_opaque;
	PROX_PANIC(task->dpi_opaque == NULL, "dpi_opaque == NULL");
}

static void stop(struct task_base *tbase)
{
	struct task_fm *task = (struct task_fm *)tbase;

	size_t expired = kv_store_expire_expire_all(task->kv_store_expire);
	size_t size = kv_store_expire_size(task->kv_store_expire);

	plogx_info("%zu/%zu\n", expired, size);
}

static void stop_last(struct task_base *tbase)
{
	struct task_fm *task = (struct task_fm *)tbase;

	task->dpi_engine.dpi_thread_stop(task->dpi_shared->dpi_opaque);
	task->dpi_shared->dpi_opaque = NULL;
}

static struct task_init task_init_fm = {
	.mode_str = "fm",
	.init = init_task_fm,
	.handle = handle_fm_bulk,
	.start = start,
	.stop = stop,
	.start_first = start_first,
	.stop_last = stop_last,
	.size = sizeof(struct task_fm)
};

__attribute__((constructor)) static void reg_task_fm(void)
{
	reg_task(&task_init_fm);
}