summaryrefslogtreecommitdiffstats
path: root/VNFs/DPPD-PROX/handle_fm.c
diff options
context:
space:
mode:
Diffstat (limited to 'VNFs/DPPD-PROX/handle_fm.c')
-rw-r--r--VNFs/DPPD-PROX/handle_fm.c373
1 files changed, 373 insertions, 0 deletions
diff --git a/VNFs/DPPD-PROX/handle_fm.c b/VNFs/DPPD-PROX/handle_fm.c
new file mode 100644
index 00000000..c4a10e67
--- /dev/null
+++ b/VNFs/DPPD-PROX/handle_fm.c
@@ -0,0 +1,373 @@
+/*
+// Copyright (c) 2010-2017 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+
+#include <dlfcn.h>
+
+#include <rte_ip.h>
+#include <rte_udp.h>
+#include <rte_tcp.h>
+#include <rte_cycles.h>
+#include <rte_ether.h>
+#include <rte_eth_ctrl.h>
+
+#include "log.h"
+#include "quit.h"
+#include "lconf.h"
+#include "task_init.h"
+#include "task_base.h"
+#include "kv_store_expire.h"
+#include "stats.h"
+#include "prox_shared.h"
+#include "etypes.h"
+#include "prox_cfg.h"
+#include "dpi/dpi.h"
+
+struct task_dpi_per_core {
+ void *dpi_opaque;
+};
+
+struct task_fm {
+ struct task_base base;
+ /* FM related fields */
+ struct kv_store_expire *kv_store_expire;
+ void *dpi_opaque;
+
+ struct dpi_engine dpi_engine;
+ struct task_dpi_per_core *dpi_shared; /* Used only during init */
+};
+
+struct eth_ip4_udp {
+ struct ether_hdr l2;
+ struct ipv4_hdr l3;
+ union {
+ struct udp_hdr udp;
+ struct tcp_hdr tcp;
+ } l4;
+} __attribute__((packed));
+
+union pkt_type {
+ struct {
+ uint16_t etype;
+ uint8_t ip_byte;
+ uint8_t next_proto;
+ } __attribute__((packed));
+ uint32_t val;
+};
+
+static union pkt_type pkt_type_udp = {
+ .next_proto = IPPROTO_UDP,
+ .ip_byte = 0x45,
+ .etype = ETYPE_IPv4,
+};
+
+static union pkt_type pkt_type_tcp = {
+ .next_proto = IPPROTO_TCP,
+ .ip_byte = 0x45,
+ .etype = ETYPE_IPv4,
+};
+
+static int extract_flow_info(struct eth_ip4_udp *p, struct flow_info *fi, struct flow_info *fi_flipped, uint32_t *len, uint8_t **payload)
+{
+ union pkt_type pkt_type = {
+ .next_proto = p->l3.next_proto_id,
+ .ip_byte = p->l3.version_ihl,
+ .etype = p->l2.ether_type,
+ };
+
+ memset(fi->reservered, 0, sizeof(fi->reservered));
+ memset(fi_flipped->reservered, 0, sizeof(fi_flipped->reservered));
+
+ if (pkt_type.val == pkt_type_udp.val) {
+ fi->ip_src = p->l3.src_addr;
+ fi->ip_dst = p->l3.dst_addr;
+ fi->ip_proto = p->l3.next_proto_id;
+ fi->port_src = p->l4.udp.src_port;
+ fi->port_dst = p->l4.udp.dst_port;
+
+ fi_flipped->ip_src = p->l3.dst_addr;
+ fi_flipped->ip_dst = p->l3.src_addr;
+ fi_flipped->ip_proto = p->l3.next_proto_id;
+ fi_flipped->port_src = p->l4.udp.dst_port;
+ fi_flipped->port_dst = p->l4.udp.src_port;
+
+ *len = rte_be_to_cpu_16(p->l4.udp.dgram_len) - sizeof(struct udp_hdr);
+ *payload = (uint8_t*)(&p->l4.udp) + sizeof(struct udp_hdr);
+ return 0;
+ }
+ else if (pkt_type.val == pkt_type_tcp.val) {
+ fi->ip_src = p->l3.src_addr;
+ fi->ip_dst = p->l3.dst_addr;
+ fi->ip_proto = p->l3.next_proto_id;
+ fi->port_src = p->l4.tcp.src_port;
+ fi->port_dst = p->l4.tcp.dst_port;
+
+ fi_flipped->ip_src = p->l3.dst_addr;
+ fi_flipped->ip_dst = p->l3.src_addr;
+ fi_flipped->ip_proto = p->l3.next_proto_id;
+ fi_flipped->port_src = p->l4.tcp.dst_port;
+ fi_flipped->port_dst = p->l4.tcp.src_port;
+
+ *len = rte_be_to_cpu_16(p->l3.total_length) - sizeof(struct ipv4_hdr) - ((p->l4.tcp.data_off >> 4)*4);
+ *payload = ((uint8_t*)&p->l4.tcp) + ((p->l4.tcp.data_off >> 4)*4);
+ return 0;
+ }
+
+ return -1;
+}
+
+static int is_flow_beg(const struct flow_info *fi, const struct eth_ip4_udp *p)
+{
+ return fi->ip_proto == IPPROTO_UDP ||
+ (fi->ip_proto == IPPROTO_TCP && p->l4.tcp.tcp_flags & TCP_SYN_FLAG);
+}
+
+static void *lookup_flow(struct task_fm *task, struct flow_info *fi, uint64_t now_tsc)
+{
+ struct kv_store_expire_entry *entry;
+
+ entry = kv_store_expire_get(task->kv_store_expire, fi, now_tsc);
+
+ return entry ? entry_value(task->kv_store_expire, entry) : NULL;
+}
+
+static void *lookup_or_insert_flow(struct task_fm *task, struct flow_info *fi, uint64_t now_tsc)
+{
+ struct kv_store_expire_entry *entry;
+
+ entry = kv_store_expire_get_or_put(task->kv_store_expire, fi, now_tsc);
+
+ return entry ? entry_value(task->kv_store_expire, entry) : NULL;
+}
+
+static int handle_fm(struct task_fm *task, struct rte_mbuf *mbuf, uint64_t now_tsc)
+{
+ struct eth_ip4_udp *p;
+ struct flow_info fi, fi_flipped;
+ void *flow_data;
+ uint32_t len;
+ uint8_t *payload;
+ uint32_t res[2];
+ size_t res_len = 2;
+ int flow_beg;
+ struct dpi_payload dpi_payload;
+ int is_upstream = 0;
+
+ p = rte_pktmbuf_mtod(mbuf, struct eth_ip4_udp *);
+
+ if (0 != extract_flow_info(p, &fi, &fi_flipped, &len, &payload)) {
+ plogx_err("Unknown packet type\n");
+ return OUT_DISCARD;
+ }
+
+ /* First, try to see if the flow already exists where the
+ current packet is sent by the server. */
+ if (!(flow_data = lookup_flow(task, &fi_flipped, now_tsc))) {
+ /* Insert a new flow, only if this is the first packet
+ in the flow. */
+ is_upstream = 1;
+ if (is_flow_beg(&fi, p))
+ flow_data = lookup_or_insert_flow(task, &fi, now_tsc);
+ else
+ flow_data = lookup_flow(task, &fi, now_tsc);
+ }
+
+ if (!flow_data)
+ return OUT_DISCARD;
+ else if (!len)
+ return 0;
+
+ dpi_payload.payload = payload;
+ dpi_payload.len = len;
+ dpi_payload.client_to_server = is_upstream;
+ gettimeofday(&dpi_payload.tv, NULL);
+ task->dpi_engine.dpi_process(task->dpi_opaque, is_upstream? &fi : &fi_flipped, flow_data, &dpi_payload, res, &res_len);
+ return OUT_HANDLED;
+}
+
+static int handle_fm_bulk(struct task_base *tbase, struct rte_mbuf **mbufs, uint16_t n_pkts)
+{
+ struct task_fm *task = (struct task_fm *)tbase;
+ uint64_t now_tsc = rte_rdtsc();
+ uint16_t handled = 0;
+ uint16_t discard = 0;
+ int ret;
+
+ for (uint16_t i = 0; i < n_pkts; ++i) {
+ ret = handle_fm(task, mbufs[i], now_tsc);
+ if (ret == OUT_DISCARD)
+ discard++;
+ else if (ret == OUT_HANDLED)
+ handled++;
+ }
+
+ for (uint16_t i = 0; i < n_pkts; ++i)
+ rte_pktmbuf_free(mbufs[i]);
+
+ TASK_STATS_ADD_DROP_HANDLED(&tbase->aux->stats, handled);
+ TASK_STATS_ADD_DROP_DISCARD(&tbase->aux->stats, discard);
+ return 0;
+}
+
+static void load_dpi_engine(const char *dpi_engine_path, struct dpi_engine *dst)
+{
+ void *handle = prox_sh_find_system(dpi_engine_path);
+
+ if (handle == NULL) {
+ plogx_info("Loading DPI engine from '%s'\n", dpi_engine_path);
+ handle = dlopen(dpi_engine_path, RTLD_NOW | RTLD_GLOBAL);
+
+ PROX_PANIC(handle == NULL, "Failed to load dpi engine from '%s' with error:\n\t\t%s\n", dpi_engine_path, dlerror());
+ prox_sh_add_system(dpi_engine_path, handle);
+ }
+
+ struct dpi_engine *(*get_dpi_engine)(void) = dlsym(handle, "get_dpi_engine");
+
+ PROX_PANIC(get_dpi_engine == NULL, "Failed to find get_dpi_engine function from '%s'\n", dpi_engine_path);
+ struct dpi_engine *dpi_engine = get_dpi_engine();
+
+ dpi_engine->dpi_print = plog_info;
+ rte_memcpy(dst, dpi_engine, sizeof(*dst));
+}
+
+static uint32_t count_fm_cores(void)
+{
+ uint32_t n_cores = 0;
+ uint32_t lcore_id = -1;
+ struct lcore_cfg *lconf;
+
+ while(prox_core_next(&lcore_id, 0) == 0) {
+ lconf = &lcore_cfg[lcore_id];
+ for (uint8_t task_id = 0; task_id < lconf->n_tasks_all; ++task_id) {
+ if (!strcmp(lconf->targs[task_id].task_init->mode_str, "fm")) {
+ n_cores++;
+ /* Only intersted in number of cores
+ so break here. */
+ break;
+ }
+ }
+ }
+
+ return n_cores;
+}
+
+static struct kv_store_expire *get_shared_flow_table(struct task_args *targ, struct dpi_engine *de)
+{
+ struct kv_store_expire *ret = prox_sh_find_core(targ->lconf->id, "flow_table");
+ const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
+
+ if (!ret) {
+ ret = kv_store_expire_create(rte_align32pow2(targ->flow_table_size) * 4,
+ sizeof(struct flow_info),
+ de->dpi_get_flow_entry_size(),
+ socket_id,
+ de->dpi_flow_expire,
+ rte_get_tsc_hz() * 60);
+ PROX_PANIC(ret == NULL, "Failed to allocate KV store\n");
+ prox_sh_add_core(targ->lconf->id, "flow_table", ret);
+ }
+ return ret;
+}
+
+static struct task_dpi_per_core *get_shared_dpi_shared(struct task_args *targ)
+{
+ static const char *name = "dpi_shared";
+ struct task_dpi_per_core *ret = prox_sh_find_core(targ->lconf->id, name);
+ const int socket_id = rte_lcore_to_socket_id(targ->lconf->id);
+
+ if (!ret) {
+ ret = prox_zmalloc(sizeof(*ret), socket_id);
+ prox_sh_add_core(targ->lconf->id, name, ret);
+ }
+ return ret;
+}
+
+static void init_task_fm(struct task_base *tbase, struct task_args *targ)
+{
+ struct task_fm *task = (struct task_fm *)tbase;
+ static int dpi_inited = 0;
+
+ load_dpi_engine(targ->dpi_engine_path, &task->dpi_engine);
+
+ task->kv_store_expire = get_shared_flow_table(targ, &task->dpi_engine);
+ task->dpi_shared = get_shared_dpi_shared(targ);
+
+ if (!dpi_inited) {
+ uint32_t n_threads = count_fm_cores();
+ const char *dpi_params[16];
+
+ plogx_info("Initializing DPI with %u threads\n", n_threads);
+ dpi_inited = 1;
+
+ PROX_PANIC(targ->n_dpi_engine_args > 16, "Too many DPI arguments");
+ for (size_t i = 0; i < targ->n_dpi_engine_args && i < 16; ++i)
+ dpi_params[i] = targ->dpi_engine_args[i];
+
+ int ret = task->dpi_engine.dpi_init(n_threads, targ->n_dpi_engine_args, dpi_params);
+
+ PROX_PANIC(ret, "Failed to initialize DPI engine\n");
+ }
+}
+
+static void start_first(struct task_base *tbase)
+{
+ struct task_fm *task = (struct task_fm *)tbase;
+ void *ret = task->dpi_engine.dpi_thread_start();
+
+ task->dpi_shared->dpi_opaque = ret;
+ PROX_PANIC(ret == NULL, "dpi_thread_init failed\n");
+}
+
+static void start(struct task_base *tbase)
+{
+ struct task_fm *task = (struct task_fm *)tbase;
+
+ task->dpi_opaque = task->dpi_shared->dpi_opaque;
+ PROX_PANIC(task->dpi_opaque == NULL, "dpi_opaque == NULL");
+}
+
+static void stop(struct task_base *tbase)
+{
+ struct task_fm *task = (struct task_fm *)tbase;
+
+ size_t expired = kv_store_expire_expire_all(task->kv_store_expire);
+ size_t size = kv_store_expire_size(task->kv_store_expire);
+
+ plogx_info("%zu/%zu\n", expired, size);
+}
+
+static void stop_last(struct task_base *tbase)
+{
+ struct task_fm *task = (struct task_fm *)tbase;
+
+ task->dpi_engine.dpi_thread_stop(task->dpi_shared->dpi_opaque);
+ task->dpi_shared->dpi_opaque = NULL;
+}
+
+static struct task_init task_init_fm = {
+ .mode_str = "fm",
+ .init = init_task_fm,
+ .handle = handle_fm_bulk,
+ .start = start,
+ .stop = stop,
+ .start_first = start_first,
+ .stop_last = stop_last,
+ .size = sizeof(struct task_fm)
+};
+
+__attribute__((constructor)) static void reg_task_fm(void)
+{
+ reg_task(&task_init_fm);
+}