diff options
Diffstat (limited to 'common/VIL/pipeline_loadb/pipeline_loadb_be.c')
-rw-r--r-- | common/VIL/pipeline_loadb/pipeline_loadb_be.c | 1417 |
1 files changed, 1417 insertions, 0 deletions
diff --git a/common/VIL/pipeline_loadb/pipeline_loadb_be.c b/common/VIL/pipeline_loadb/pipeline_loadb_be.c new file mode 100644 index 00000000..c7910127 --- /dev/null +++ b/common/VIL/pipeline_loadb/pipeline_loadb_be.c @@ -0,0 +1,1417 @@ +/* +// Copyright (c) 2017 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +*/ + +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <string.h> +#include <unistd.h> + +#include <rte_common.h> +#include <rte_malloc.h> +#include <rte_ip.h> +#include <rte_hash.h> +#include <rte_byteorder.h> +#include <rte_table_lpm.h> +#include <rte_table_hash.h> +#include <rte_jhash.h> +#include <rte_thash.h> +#include <rte_cycles.h> +#include <rte_ethdev.h> +#include <rte_prefetch.h> +#include <rte_table_array.h> +#include "pipeline_loadb_be.h" +#include "pipeline_actions_common.h" +#include "hash_func.h" +#include "pipeline_arpicmp_be.h" +#include "vnf_common.h" +#include "app.h" + +#define BYTES_TO_BITS 8 +#define ROTATE_15_BITS 15 + +#define MAX_VNF_THREADS 16 + +int pkt_burst_cnt; + +uint8_t LOADB_DEBUG; +uint8_t total_vnf_threads; +uint32_t phyport_offset; + +struct pipeline_loadb { + struct pipeline p; + pipeline_msg_req_handler custom_handlers[PIPELINE_LOADB_MSG_REQS]; + + uint8_t n_vnf_threads; + uint8_t n_lb_tuples; + uint32_t outport_offset; + uint64_t receivedLBPktCount; + uint64_t droppedLBPktCount; + uint8_t links_map[PIPELINE_MAX_PORT_IN]; + uint8_t outport_id[PIPELINE_MAX_PORT_IN]; + uint8_t n_prv_Q; + uint8_t n_pub_Q; + uint8_t pipeline_num; +} __rte_cache_aligned; + +uint8_t default_rss_key[] = { + 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, + 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, + 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, + 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, + 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, 0x6d, 0x5a, +}; + +static void *pipeline_loadb_msg_req_custom_handler(struct pipeline *p, + void *msg); + +static pipeline_msg_req_handler handlers[] = { + [PIPELINE_MSG_REQ_PING] = + pipeline_msg_req_ping_handler, + [PIPELINE_MSG_REQ_STATS_PORT_IN] = + pipeline_msg_req_stats_port_in_handler, + [PIPELINE_MSG_REQ_STATS_PORT_OUT] = + pipeline_msg_req_stats_port_out_handler, + [PIPELINE_MSG_REQ_STATS_TABLE] = + pipeline_msg_req_stats_table_handler, + [PIPELINE_MSG_REQ_PORT_IN_ENABLE] = + pipeline_msg_req_port_in_enable_handler, + [PIPELINE_MSG_REQ_PORT_IN_DISABLE] = + pipeline_msg_req_port_in_disable_handler, + [PIPELINE_MSG_REQ_CUSTOM] = + pipeline_loadb_msg_req_custom_handler, + +}; + +static void *pipeline_loadb_msg_req_entry_dbg_handler(struct pipeline *, + void *msg); + +static pipeline_msg_req_handler custom_handlers[] = { + [PIPELINE_LOADB_MSG_REQ_ENTRY_DBG] = + pipeline_loadb_msg_req_entry_dbg_handler, +}; + +/* + * LOADB table + */ +struct loadb_table_entry { + struct rte_pipeline_table_entry head; +} __rte_cache_aligned; + +void *pipeline_loadb_msg_req_custom_handler(struct pipeline *p, void *msg) +{ + struct pipeline_loadb *p_lb = (struct pipeline_loadb *)p; + struct pipeline_custom_msg_req *req = msg; + pipeline_msg_req_handler f_handle; + + f_handle = (req->subtype < PIPELINE_LOADB_MSG_REQS) ? + p_lb->custom_handlers[req->subtype] : + pipeline_msg_req_invalid_handler; + + if (f_handle == NULL) + f_handle = pipeline_msg_req_invalid_handler; + + return f_handle(p, req); +} + +uint32_t lb_pkt_print_count; + +uint8_t calculate_lb_thread_prv(struct rte_mbuf *pkt, void *arg) +{ + uint32_t hash_key[2], hash_ipv4; + uint32_t temp1, temp2, temp3; + uint8_t thread; + struct pipeline_loadb_in_port_h_arg *ap = arg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *) ap->p; + uint8_t nthreads = p_loadb->n_vnf_threads; + union rte_thash_tuple tuple; + + uint32_t *src_addr; + uint32_t *dst_addr; + uint16_t *src_port; + uint16_t *dst_port; + uint8_t *protocol; + struct lb_pkt *lb_pkt = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt, MBUF_HDR_ROOM); + + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == ETHER_TYPE_IPv6) { + src_addr = (uint32_t *)&lb_pkt->ipv6_port.ipv6.src_addr; + dst_addr = (uint32_t *)&lb_pkt->ipv6_port.ipv6.dst_addr; + src_port = &lb_pkt->ipv6_port.src_port; + dst_port = &lb_pkt->ipv6_port.dst_port; + protocol = &lb_pkt->ipv6_port.ipv6.proto; + } else { + src_addr = &lb_pkt->ipv4_port.ipv4.src_addr; + dst_addr = &lb_pkt->ipv4_port.ipv4.dst_addr; + src_port = &lb_pkt->ipv4_port.src_port; + dst_port = &lb_pkt->ipv4_port.dst_port; + protocol = &lb_pkt->ipv4_port.ipv4.next_proto_id; + } + + switch (p_loadb->n_lb_tuples) { + + case 0: + /* Write */ + /* Egress */ + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == ETHER_TYPE_IPv6) + temp1 = rte_bswap32(dst_addr[3]) ^ *dst_port; + else + temp1 = *dst_addr ^ *dst_port; + + temp2 = (temp1 >> 24) ^ (temp1 >> 16) ^ + (temp1 >> 8) ^ temp1; + + temp3 = (temp2 >> 4) ^ (temp2 & 0xf); + + /* To select the thread */ + thread = temp3 % nthreads; + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + return thread; + + case 1: + /* Write */ + /* Egress */ + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == ETHER_TYPE_IPv6) + hash_key[0] = rte_bswap32(dst_addr[3]); + else + hash_key[0] = rte_bswap32(*dst_addr); + + /* Compute */ + hash_ipv4 = rte_jhash(&hash_key[0], 4, 0); + + /* To select the thread */ + thread = (hash_ipv4 % nthreads); + + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) + printf("thread: %u hash: %x hash_key: %x\n", + thread, hash_ipv4, hash_key[0]); + return thread; + + case 2: + /* Write */ + /* Egress */ + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == + ETHER_TYPE_IPv6) { + hash_key[0] = rte_bswap32(dst_addr[3]); + hash_key[1] = *dst_port << 16; + } else{ + hash_key[0] = rte_bswap32(*dst_addr); + hash_key[1] = *dst_port << 16; + } + /* Compute */ + hash_ipv4 = rte_jhash(&hash_key[0], 6, 0); + + /* To select the thread */ + thread = (hash_ipv4 % nthreads); + + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) { + printf("public_addr: %x public_port: %x\n", + hash_key[0], *dst_port); + printf("thread: %u hash: %x hash_key0: %x " + "hash_key1: %x\n", thread, hash_ipv4, + hash_key[0], hash_key[1]); + } + return thread; + + case 3: + printf("Invalid n_lb_tuples: %d\n", p_loadb->n_lb_tuples); + return 0; + + case 4: + /* Write */ + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == + ETHER_TYPE_IPv6) { + tuple.v4.src_addr = rte_bswap32(src_addr[3]); + tuple.v4.dst_addr = rte_bswap32(dst_addr[3]); + tuple.v4.sport = *src_port; + tuple.v4.dport = *dst_port; + } else{ + tuple.v4.src_addr = rte_bswap32(*src_addr); + tuple.v4.dst_addr = rte_bswap32(*dst_addr); + tuple.v4.sport = *src_port; + tuple.v4.dport = *dst_port; + } + /* Compute */ + hash_ipv4 = rte_softrss((uint32_t *)&tuple, + RTE_THASH_V4_L4_LEN, + default_rss_key); + /* Egress */ + + /* To select the thread */ + thread = (hash_ipv4 % nthreads); + + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) { + printf("src_addr: %x dst_addr: %x src_port: %x " + "dst_port: %x\n", tuple.v4.src_addr, tuple.v4.dst_addr, + tuple.v4.sport, tuple.v4.dport); + printf("thread: %u hash: %x\n", thread, hash_ipv4); + } + + return thread; + + case 5: + + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == + ETHER_TYPE_IPv6) { + /* point to last 32 bits of IPv6 addresses*/ + src_addr += 3; + dst_addr += 3; + } + + /* Compute */ + temp1 = *src_addr ^ *dst_addr ^ *src_port ^ + *dst_port ^ *protocol; + + temp2 = (temp1 >> 24) ^ (temp1 >> 16) ^ (temp1 >> 8) ^ temp1; + temp3 = (temp2 >> 4) ^ (temp2 & 0xf); + + /* Egress */ + + /* To select the thread */ + thread = (temp3 % nthreads); + + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) { + printf("thread: %u temp1: %x temp2: %x temp3: %x\n", + thread, temp1, temp2, temp3); + printf("src_addr: %x dst_addr: %x src_port: %x " + "dst_port: %x protocol: %x\n", *src_addr, *dst_addr, + *src_port, *dst_port, *protocol); + } + return thread; + + default: + printf("Invalid n_lb_tuples: %d\n", p_loadb->n_lb_tuples); + return 0; + + } +} + +uint8_t calculate_lb_thread_pub(struct rte_mbuf *pkt, void *arg) +{ + uint32_t hash_key[2], hash_ipv4; + uint32_t temp1, temp2, temp3; + uint8_t thread; + struct pipeline_loadb_in_port_h_arg *ap = arg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *) ap->p; + uint8_t nthreads = p_loadb->n_vnf_threads; + union rte_thash_tuple tuple; + + uint32_t *src_addr; + uint32_t *dst_addr; + uint16_t *src_port; + uint16_t *dst_port; + uint8_t *protocol; + struct lb_pkt *lb_pkt = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt, + MBUF_HDR_ROOM); + + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == ETHER_TYPE_IPv6) { + src_addr = (uint32_t *)&lb_pkt->ipv6_port.ipv6.src_addr; + dst_addr = (uint32_t *)&lb_pkt->ipv6_port.ipv6.dst_addr; + src_port = &lb_pkt->ipv6_port.src_port; + dst_port = &lb_pkt->ipv6_port.dst_port; + protocol = &lb_pkt->ipv6_port.ipv6.proto; + } else { + src_addr = &lb_pkt->ipv4_port.ipv4.src_addr; + dst_addr = &lb_pkt->ipv4_port.ipv4.dst_addr; + src_port = &lb_pkt->ipv4_port.src_port; + dst_port = &lb_pkt->ipv4_port.dst_port; + protocol = &lb_pkt->ipv4_port.ipv4.next_proto_id; + } + + switch (p_loadb->n_lb_tuples) { + + case 0: + /* Write */ + /* Ingress */ + temp1 = *src_addr ^ *src_port; + temp2 = (temp1 >> 24) ^ (temp1 >> 16) ^ + (temp1 >> 8) ^ temp1; + temp3 = (temp2 >> 4) ^ (temp2 & 0xf); + + /* To select the thread */ + thread = temp3 % nthreads; + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + return thread; + + case 1: + /* Write */ + /* Ingress */ + hash_key[0] = rte_bswap32(*src_addr); + + /* Compute */ + hash_ipv4 = rte_jhash(&hash_key[0], 4, 0); + + /* To select the thread */ + thread = hash_ipv4 % nthreads; + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) + printf("thread: %u hash: %x hash_key: %x\n", + thread, hash_ipv4, hash_key[0]); + return thread; + + case 2: + /* Write */ + /* Ingress */ + hash_key[0] = rte_bswap32(*src_addr); + hash_key[1] = *src_port << 16; + + /* Compute */ + hash_ipv4 = rte_jhash(&hash_key[0], 6, 0); + + /* To select the thread */ + thread = hash_ipv4 % nthreads; + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) { + printf("thread: %u hash: %x hash_key0: %x " + "hash_key1: %x\n", thread, hash_ipv4, + hash_key[0], hash_key[1]); + printf("public_addr: %x public_port: %x\n", + hash_key[0], *src_port); + } + return thread; + + case 3: + printf("Invalid n_lb_tuples: %d\n", p_loadb->n_lb_tuples); + return 0; + + case 4: + /* Write */ + tuple.v4.src_addr = rte_bswap32(*src_addr); + tuple.v4.dst_addr = rte_bswap32(*dst_addr); + tuple.v4.sport = *src_port; + tuple.v4.dport = *dst_port; + + /* Compute */ + hash_ipv4 = rte_softrss((uint32_t *)&tuple, + RTE_THASH_V4_L4_LEN, default_rss_key); + + /* Ingress */ + /* To select the thread */ + thread = hash_ipv4 % nthreads; + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) { + printf("src_addr: %x dst_addr: %x src_port: %x " + "dst_port: %x\n", tuple.v4.src_addr, + tuple.v4.dst_addr, tuple.v4.sport, tuple.v4.dport); + + printf("thread: %u hash: %x\n", thread, hash_ipv4); + } + return thread; + + case 5: + + if (rte_be_to_cpu_16(lb_pkt->eth.ether_type) == + ETHER_TYPE_IPv6) { + /* point to last 32 bits of IPv6 addresses*/ + src_addr += 3; + dst_addr += 3; + } + + /* Compute */ + temp1 = *src_addr ^ *dst_addr ^ *src_port ^ + *dst_port ^ *protocol; + temp2 = (temp1 >> 24) ^ (temp1 >> 16) ^ + (temp1 >> 8) ^ temp1; + temp3 = (temp2 >> 4) ^ (temp2 & 0xf); + + /* To select the thread */ + thread = temp3 % nthreads; + /* To select the Q */ + thread = ap->in_port_id + (p_loadb->p.n_ports_in * + (thread + 1) - p_loadb->p.n_ports_in); + + if (LOADB_DEBUG > 3) { + printf("src_addr: %x dst_addr: %x src_port: %x " + "dst_port: %x protocol: %x\n", *src_addr, *dst_addr, + *src_port, *dst_port, *protocol); + + printf("thread: %u temp1: %x temp2: %x temp3: %x\n", + thread, temp1, temp2, temp3); + } + + return thread; + + default: + printf("Invalid n_lb_tuples: %d\n", p_loadb->n_lb_tuples); + return 0; + + } +} + +static inline void +pkt_work_loadb_key_prv( + struct rte_mbuf *pkt, + __rte_unused uint32_t pkt_num, + void *arg) +{ + struct pipeline_loadb_in_port_h_arg *ap = arg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *)ap->p; + uint32_t outport_offset = p_loadb->outport_offset; + + struct lb_pkt *lb_pkt = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt, + MBUF_HDR_ROOM); + uint32_t *out_port = RTE_MBUF_METADATA_UINT32_PTR(pkt, + outport_offset); + + #ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("Start pkt_work_loadb_key\n"); + #endif + + if ((LOADB_DEBUG > 2) && (lb_pkt_print_count < 10)) { + print_pkt1(pkt); + lb_pkt_print_count++; + printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, " + "ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n", + rte_be_to_cpu_16(lb_pkt->eth.ether_type), + lb_pkt->ipv4_port.ipv4.next_proto_id, ETH_TYPE_ARP, + ETH_TYPE_IPV4, IP_PROTOCOL_ICMP); + } + + /* Write */ + *out_port = calculate_lb_thread_prv(pkt, arg); + + p_loadb->receivedLBPktCount++; + + #ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("End pkt_work_loadb_key\n"); + #endif +} + +static inline void +pkt_work_loadb_key_pub( + struct rte_mbuf *pkt, + __rte_unused uint32_t pkt_num, + void *arg) +{ + struct pipeline_loadb_in_port_h_arg *ap = arg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *)ap->p; + uint32_t outport_offset = p_loadb->outport_offset; + + struct lb_pkt *lb_pkt = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt, + MBUF_HDR_ROOM); + uint32_t *out_port = RTE_MBUF_METADATA_UINT32_PTR(pkt, + outport_offset); + + #ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("Start pkt_work_loadb_key\n"); + #endif + + if ((LOADB_DEBUG > 2) && (lb_pkt_print_count < 10)) { + print_pkt1(pkt); + lb_pkt_print_count++; + printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, " + "ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n", + rte_be_to_cpu_16(lb_pkt->eth.ether_type), + lb_pkt->ipv4_port.ipv4.next_proto_id, ETH_TYPE_ARP, + ETH_TYPE_IPV4, IP_PROTOCOL_ICMP); + } + + /* Write */ + *out_port = calculate_lb_thread_pub(pkt, arg); + + p_loadb->receivedLBPktCount++; +#ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("End pkt_work_loadb_key\n"); +#endif +} + +static inline void +pkt4_work_loadb_key_prv( + struct rte_mbuf **pkt, + __rte_unused uint32_t pkt_num, + void *arg) +{ + struct pipeline_loadb_in_port_h_arg *ap = arg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *)ap->p; + uint32_t outport_offset = p_loadb->outport_offset; + + uint32_t *out_port0 = RTE_MBUF_METADATA_UINT32_PTR(pkt[0], + outport_offset); + uint32_t *out_port1 = RTE_MBUF_METADATA_UINT32_PTR(pkt[1], + outport_offset); + uint32_t *out_port2 = RTE_MBUF_METADATA_UINT32_PTR(pkt[2], + outport_offset); + uint32_t *out_port3 = RTE_MBUF_METADATA_UINT32_PTR(pkt[3], + outport_offset); + + struct lb_pkt *lb_pkt0 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[0], + MBUF_HDR_ROOM); + struct lb_pkt *lb_pkt1 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[1], + MBUF_HDR_ROOM); + struct lb_pkt *lb_pkt2 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[2], + MBUF_HDR_ROOM); + struct lb_pkt *lb_pkt3 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[3], + MBUF_HDR_ROOM); + + #ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("Start pkt4_work_loadb_key\n"); + #endif + + if ((LOADB_DEBUG > 2) && (lb_pkt_print_count < 10)) { + print_pkt1(pkt[0]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt0->eth.ether_type), + lb_pkt0->ipv4_port.ipv4.next_proto_id); + + print_pkt1(pkt[1]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt1->eth.ether_type), + lb_pkt1->ipv4_port.ipv4.next_proto_id); + + print_pkt1(pkt[2]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt2->eth.ether_type), + lb_pkt2->ipv4_port.ipv4.next_proto_id); + + print_pkt1(pkt[3]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt3->eth.ether_type), + lb_pkt3->ipv4_port.ipv4.next_proto_id); + } + *out_port0 = calculate_lb_thread_prv(pkt[0], arg); + *out_port1 = calculate_lb_thread_prv(pkt[1], arg); + *out_port2 = calculate_lb_thread_prv(pkt[2], arg); + *out_port3 = calculate_lb_thread_prv(pkt[3], arg); + + p_loadb->receivedLBPktCount += 4; + + #ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("End pkt4_work_loadb_key\n"); + #endif + +} + +static inline void +pkt4_work_loadb_key_pub( + struct rte_mbuf **pkt, + __rte_unused uint32_t pkt_num, + void *arg) +{ + struct pipeline_loadb_in_port_h_arg *ap = arg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *)ap->p; + uint32_t outport_offset = p_loadb->outport_offset; + + uint32_t *out_port0 = RTE_MBUF_METADATA_UINT32_PTR(pkt[0], + outport_offset); + uint32_t *out_port1 = RTE_MBUF_METADATA_UINT32_PTR(pkt[1], + outport_offset); + uint32_t *out_port2 = RTE_MBUF_METADATA_UINT32_PTR(pkt[2], + outport_offset); + uint32_t *out_port3 = RTE_MBUF_METADATA_UINT32_PTR(pkt[3], + outport_offset); + + struct lb_pkt *lb_pkt0 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[0], + MBUF_HDR_ROOM); + struct lb_pkt *lb_pkt1 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[1], + MBUF_HDR_ROOM); + struct lb_pkt *lb_pkt2 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[2], + MBUF_HDR_ROOM); + struct lb_pkt *lb_pkt3 = (struct lb_pkt *) + RTE_MBUF_METADATA_UINT8_PTR(pkt[3], + MBUF_HDR_ROOM); + + #ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("Start pkt4_work_loadb_key\n"); + #endif + + if ((LOADB_DEBUG > 2) && (lb_pkt_print_count < 10)) { + print_pkt1(pkt[0]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt0->eth.ether_type), + lb_pkt0->ipv4_port.ipv4.next_proto_id); + + print_pkt1(pkt[1]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt1->eth.ether_type), + lb_pkt1->ipv4_port.ipv4.next_proto_id); + + print_pkt1(pkt[2]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt2->eth.ether_type), + lb_pkt2->ipv4_port.ipv4.next_proto_id); + + print_pkt1(pkt[3]); + lb_pkt_print_count++; + + printf("\nEth Typ %x, Prot %x\n", + rte_be_to_cpu_16(lb_pkt3->eth.ether_type), + lb_pkt3->ipv4_port.ipv4.next_proto_id); + } + *out_port0 = calculate_lb_thread_prv(pkt[0], arg); + *out_port1 = calculate_lb_thread_pub(pkt[1], arg); + *out_port2 = calculate_lb_thread_pub(pkt[2], arg); + *out_port3 = calculate_lb_thread_pub(pkt[3], arg); + + p_loadb->receivedLBPktCount += 4; +#ifdef MY_LOADB_DBG_PRINT + if (LOADB_DEBUG == 3) + printf("End pkt4_work_loadb_key\n"); +#endif + +} + +PIPELINE_LOADB_KEY_PORT_IN_AH(port_in_ah_loadb_key_prv, + pkt_work_loadb_key_prv, + pkt4_work_loadb_key_prv); + +PIPELINE_LOADB_KEY_PORT_IN_AH(port_in_ah_loadb_key_pub, + pkt_work_loadb_key_pub, + pkt4_work_loadb_key_pub); + +static int +pipeline_loadb_parse_args(struct pipeline_loadb *p, + struct pipeline_params *params) +{ + uint32_t outport_offset_present = 0; + uint32_t n_vnf_threads_present = 0; + uint32_t pktq_in_prv_present = 0; + uint32_t prv_que_handler_present = 0; + uint32_t prv_to_pub_map_present = 0; + uint8_t n_prv_in_port = 0; + uint32_t i; + + /* Default number of tuples */ + p->n_lb_tuples = 0; + + if (LOADB_DEBUG > 2) + printf("LOADB pipeline_loadb_parse_args params->n_args: %d\n", + params->n_args); + + for (i = 0; i < params->n_args; i++) { + char *arg_name = params->args_name[i]; + char *arg_value = params->args_value[i]; + + if (LOADB_DEBUG > 2) + printf("LOADB args[%d]: %s %d, %s\n", i, arg_name, + atoi(arg_value), arg_value); + + /* outport_offset = 128 + 8 */ + if (strcmp(arg_name, "outport_offset") == 0) { + if (outport_offset_present) + return -1; + outport_offset_present = 1; + + p->outport_offset = atoi(arg_value); + if (p->outport_offset <= 0) { + printf("Outport_offset is invalid\n"); + return -1; + } + printf("outport_offset: 0x%x\n", p->outport_offset); + continue; + } + /* n_vnf_threads = 4 */ + if (strcmp(arg_name, "n_vnf_threads") == 0) { + if (n_vnf_threads_present) + return -1; + n_vnf_threads_present = 1; + + p->n_vnf_threads = atoi(arg_value); + + total_vnf_threads += p->n_vnf_threads; + + if ((p->n_vnf_threads <= 0) + || (total_vnf_threads > MAX_VNF_THREADS)) { + printf("n_vnf_threads : MIN->0 MAX->16\n"); + return -1; + } + printf("n_vnf_threads : 0x%x\n", p->n_vnf_threads); + printf("total_vnf_threads: 0x%x\n", total_vnf_threads); + continue; + } + + /* pktq_in_prv */ + if (strcmp(arg_name, "pktq_in_prv") == 0) { + if (pktq_in_prv_present) { + printf("Duplicate pktq_in_prv ... " + "parse failed..\n\n"); + return -1; + } + pktq_in_prv_present = 1; + + int rxport = 0, j = 0; + char phy_port_num[8]; + char *token = strtok(arg_value, "RXQ"); + while (token) { + j = 0; + while ((j < 7) && (token[j] != '.')) { + phy_port_num[j] = token[j]; + j++; + } + phy_port_num[j] = '\0'; + rxport = atoi(phy_port_num); + printf("token: %s, phy_port_str: %s, " + "phy_port_num %d\n", + token, phy_port_num, rxport); + prv_in_port_a[n_prv_in_port++] = rxport; + // set rxport egress + if (rxport < 0xff){ + if(rxport < PIPELINE_MAX_PORT_IN) + in_port_dir_a[rxport] = 1; + } + token = strtok(NULL, "RXQ"); + } + + if (n_prv_in_port == 0) { + printf("VNF common parse error - " + "no prv RX phy port\n"); + return -1; + } + continue; + } + + /* pktq_in_prv_handler */ + + if (strcmp(arg_name, "prv_que_handler") == 0) { + + if (prv_que_handler_present) { + printf("Duplicate pktq_in_prv ..\n\n"); + return -1; + } + prv_que_handler_present = 1; + n_prv_in_port = 0; + + char *token; + int rxport = 0; + /* get the first token */ + token = strtok(arg_value, "("); + token = strtok(token, ")"); + token = strtok(token, ","); + printf("***** prv_que_handler *****\n"); + if (token) + printf("string is :%s\n", token); + + if (token) + //printf("string is null\n"); + printf("string is :%s\n", token); + + /* walk through other tokens */ + while (token != NULL) { + printf(" %s\n", token); + rxport = atoi(token); + prv_que_port_index[n_prv_in_port++] = rxport; + if (rxport < 0xff){ + if(rxport < PIPELINE_MAX_PORT_IN) + in_port_egress_prv[rxport] = 1; + } + p->n_prv_Q++; + token = strtok(NULL, ","); + } + + if (n_prv_in_port == 0) { + printf("VNF common parse err - no prv RX phy port\n"); + return -1; + } + + continue; + } + /* prv_to_pub_map */ + if (strcmp(arg_name, "prv_to_pub_map") == 0) { + if (prv_to_pub_map_present) { + printf("Duplicated prv_to_pub_map ... " + "parse failed ...\n"); + return -1; + } + prv_to_pub_map_present = 1; + + int rxport = 0, txport = 0, j = 0, k = 0; + char rx_phy_port_num[5]; + char tx_phy_port_num[5]; + char *token = strtok(arg_value, "("); + while (token) { + j = 0; + while ((j < 4) && (token[j] != ',')) { + rx_phy_port_num[j] = token[j]; + j++; + } + rx_phy_port_num[j] = '\0'; + rxport = atoi(rx_phy_port_num); + + j++; + k = 0; + while ((k < 4) && (token[j+k] != ')')) { + tx_phy_port_num[k] = token[j+k]; + k++; + } + tx_phy_port_num[k] = '\0'; + txport = atoi(tx_phy_port_num); + + printf("token: %s,rx_phy_port_str: %s, phy_port_num " + "%d, tx_phy_port_str: %s, tx_phy_port_num %d\n", + token, rx_phy_port_num, rxport, + tx_phy_port_num, txport); + if(rxport < PIPELINE_MAX_PORT_IN) + if ((rxport >= PIPELINE_MAX_PORT_IN) || + (txport >= PIPELINE_MAX_PORT_IN) || + (in_port_dir_a[rxport] != 1)) { + printf("CG-NAPT parse error - " + "incorrect prv-pub translation. Rx %d, " + "Tx %d, Rx Dir %d\n", rxport, txport, + in_port_dir_a[rxport]); + + return -1; + } + if (rxport < 0xff){ + if (rxport < PIPELINE_MAX_PORT_IN) + prv_to_pub_map[rxport] = txport; + } + if (txport < 0xff) + if(txport < PIPELINE_MAX_PORT_IN) + pub_to_prv_map[txport] = rxport; + token = strtok(NULL, "("); + } + + continue; + } + /* Set number of tuples if available in config file */ + if (strcmp(arg_name, "n_lb_tuples") == 0) { + p->n_lb_tuples = atoi(arg_value); + printf("n_lb_tuples: 0x%x\n", p->n_lb_tuples); + } + + /* loadb_debug */ + if (strcmp(arg_name, "loadb_debug") == 0) { + LOADB_DEBUG = atoi(arg_value); + continue; + } + + /* any other Unknown argument return -1 */ + } /* for */ + + /* Check that mandatory arguments are present */ + if ((n_vnf_threads_present == 0) || (outport_offset_present == 0)) + return -1; + + return 0; + +} + +int check_loadb_thread( + struct app_params *app, + struct pipeline_params *params, + int32_t n_vnf_threads) +{ + uint32_t i; + int pipeline_num = 0; + int count = 0; + int dont_care = sscanf(params->name, "PIPELINE%d", &pipeline_num); + if (dont_care != 1) + return -1; + /* changed from pipeline_num+1 to +2 */ + for (i = pipeline_num + 2; i < app->n_pipelines; i++) { + struct app_pipeline_params *p = &app->pipeline_params[i]; + if (!strncmp(p->type, "LOADB", strlen(p->type))) + break; + count++; + } + if (n_vnf_threads != count) + return -1; + return 0; + +} + +static void *pipeline_loadb_init( + struct pipeline_params *params, + __rte_unused void *arg) + /* arg is app parameter (struct app_params *app) */ + /*save it for use in port in handler */ +{ + struct pipeline *p; + struct pipeline_loadb *p_loadb; + uint32_t size, i, in_ports_arg_size; + + /* Check input arguments */ + if ((params == NULL) || + (params->n_ports_in == 0) || (params->n_ports_out == 0)) + return NULL; + + /* Memory allocation */ + size = RTE_CACHE_LINE_ROUNDUP(sizeof(struct pipeline_loadb)); + p = rte_zmalloc(NULL, size, RTE_CACHE_LINE_SIZE); + p_loadb = (struct pipeline_loadb *)p; + if (p == NULL) + return NULL; + + strcpy(p->name, params->name); + p->log_level = params->log_level; + + PLOG(p, HIGH, "LOADB"); + + p_loadb->n_vnf_threads = 0; + p_loadb->outport_offset = 0; + p_loadb->receivedLBPktCount = 0; + p_loadb->droppedLBPktCount = 0; + for (i = 0; i < PIPELINE_MAX_PORT_IN; i++) { + p_loadb->links_map[i] = 0xff; + } + p_loadb->pipeline_num = 0xff; + p_loadb->n_prv_Q = 0; + p_loadb->n_pub_Q = 0; + + /* Parse arguments */ + + if (pipeline_loadb_parse_args(p_loadb, params)) + return NULL; + + /* Pipeline */ + { + struct rte_pipeline_params pipeline_params = { + .name = "LOADB", + .socket_id = params->socket_id, + .offset_port_id = 0, + }; + + p->p = rte_pipeline_create(&pipeline_params); + if (p->p == NULL) { + rte_free(p); + return NULL; + } + + printf("Loadb p->p %p, socket %d\n", p->p, + pipeline_params.socket_id); + } + + /* Memory allocation for in_port_h_arg */ + in_ports_arg_size = + RTE_CACHE_LINE_ROUNDUP((sizeof(struct pipeline_loadb_in_port_h_arg)) + * (params->n_ports_in)); + struct pipeline_loadb_in_port_h_arg *ap = + (struct pipeline_loadb_in_port_h_arg *) + rte_zmalloc(NULL, + in_ports_arg_size, + RTE_CACHE_LINE_SIZE); + if (ap == NULL) + return NULL; + + printf("ap pointer %p\n", ap); + + /* Input ports */ + p->n_ports_in = params->n_ports_in; + for (i = 0; i < p->n_ports_in; i++) { + /* passing our loadb pipeline in call back arg */ + (ap[i]).p = p_loadb; + (ap[i]).in_port_id = i; + + struct rte_pipeline_port_in_params port_params = { + .ops = + pipeline_port_in_params_get_ops(¶ms->port_in + [i]), + .arg_create = + pipeline_port_in_params_convert(¶ms->port_in + [i]), + /* Public in-port handler */ + .f_action = NULL, + .arg_ah = &(ap[i]), + .burst_size = params->port_in[i].burst_size, + }; + + /* Private in-port handler */ + if (is_port_index_privte(i)) {/* Multiport changes*/ + printf("LOADB %d port is Prv\n", i); + port_params.f_action = port_in_ah_loadb_key_prv; + } else{ + printf("LOADB %d port is Pub\n", i); + port_params.f_action = port_in_ah_loadb_key_pub; + } + + int status = rte_pipeline_port_in_create(p->p, + &port_params, + &p->port_in_id[i]); + + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + + } + + p_loadb->n_pub_Q = p_loadb->p.n_ports_in - p_loadb->n_prv_Q; + printf("LOADB : n_prv_Q - %d n_pub_Q - %d\n", + p_loadb->n_prv_Q, p_loadb->n_pub_Q); + + for (i = 0; i < p->n_ports_in; i++) { + printf("is_port_index_privte(%d): %d\n", i, + is_port_index_privte(i)); + printf("is_phy_port_privte(%d): %d\n", i, + is_phy_port_privte(i)); + printf("action handler of %d:%p\n", i, + p_loadb->p.p->ports_in[i].f_action); + } + + /* Output ports */ + p->n_ports_out = params->n_ports_out; + for (i = 0; i < p->n_ports_out; i++) { + struct rte_pipeline_port_out_params port_params = { + .ops = + pipeline_port_out_params_get_ops(¶ms->port_out + [i]), + .arg_create = + pipeline_port_out_params_convert(¶ms->port_out + [i]), + .f_action = NULL, + .arg_ah = NULL, + }; + + int status = rte_pipeline_port_out_create(p->p, + &port_params, + &p->port_out_id[i]); + + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + + printf("Outport p->port_out_id[%d] %p\n", i, + &p->port_out_id[i]); + } + + int pipeline_num = 0; + int dont_care = sscanf(params->name, "PIPELINE%d", &pipeline_num); + if (dont_care != 1) { + printf("Unable to read pipeline number\n"); + return NULL; + } + p_loadb->pipeline_num = pipeline_num; +#if 0 + set_outport_id(pipeline_num, p, lb_outport_id); + set_phy_outport_map(pipeline_num, p_loadb->links_map); + + set_port_to_loadb_map(pipeline_num); + + register_loadb_to_arp(pipeline_num, p, app); +#endif + register_pipeline_Qs(p_loadb->pipeline_num, p); + set_link_map(p_loadb->pipeline_num, p, p_loadb->links_map); + //set_outport_id(p_loadb->pipeline_num, p, p_loadb->outport_id); + + /* Tables */ + p->n_tables = 1; + { + + struct rte_table_array_params table_array_params = { + .n_entries = MAX_VNF_THREADS, + .offset = p_loadb->outport_offset, + }; + struct rte_pipeline_table_params table_params = { + .ops = &rte_table_array_ops, + .arg_create = &table_array_params, + .f_action_hit = NULL, + .f_action_miss = NULL, + .arg_ah = p_loadb, + .action_data_size = 0, + }; + + int status; + + status = rte_pipeline_table_create(p->p, + &table_params, + &p->table_id[0]); + + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + } /* Tables */ + + /* Connecting input ports to tables */ + for (i = 0; i < p->n_ports_in; i++) { + int status = rte_pipeline_port_in_connect_to_table( + p->p, + p->port_in_id[i], + p->table_id[0]); + + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + } + + /* Enable input ports */ + for (i = 0; i < p->n_ports_in; i++) { + int status = rte_pipeline_port_in_enable(p->p, + p->port_in_id[i]); + + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + } + + /* Initialize table entries */ + { + for (i = 0; i < MAX_VNF_THREADS; i++) { + struct rte_table_array_key key = { + .pos = i, + }; + struct loadb_table_entry entry; + entry.head.action = RTE_PIPELINE_ACTION_PORT; + + if (i < p->n_ports_out) { + entry.head.port_id = p->port_out_id[i]; + printf("\ni %d, p->port_out_id[%d] %d", i, i, + p->port_out_id[i]); + } else { + /* First CGNAPT thread */ + entry.head.port_id = p->port_out_id[0]; + entry.head.action = RTE_PIPELINE_ACTION_DROP; + } + + struct rte_pipeline_table_entry *entry_ptr; + int key_found, status; + status = rte_pipeline_table_entry_add( + p->p, + p->table_id[0], + &key, + (struct rte_pipeline_table_entry *) + &entry, + &key_found, + &entry_ptr); + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + } + } + /* Add default entry to tables */ + { + struct rte_pipeline_table_entry default_entry = { + .action = RTE_PIPELINE_ACTION_PORT, + /* LB by default forward to 1st cgnat thread */ + .port_id = p->port_out_id[0], + }; + + struct rte_pipeline_table_entry *default_entry_ptr; + + int status = rte_pipeline_table_default_entry_add( + p->p, + p->table_id[0], + &default_entry, + &default_entry_ptr); + + if (status) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + + } + + /* Check pipeline consistency */ + if (rte_pipeline_check(p->p) < 0) { + rte_pipeline_free(p->p); + rte_free(p); + return NULL; + } + + /* Message queues */ + p->n_msgq = params->n_msgq; + for (i = 0; i < p->n_msgq; i++) + p->msgq_in[i] = params->msgq_in[i]; + for (i = 0; i < p->n_msgq; i++) + p->msgq_out[i] = params->msgq_out[i]; + + /* Message handlers */ + memcpy(p->handlers, handlers, sizeof(p->handlers)); + memcpy(p_loadb->custom_handlers, + custom_handlers, sizeof(p_loadb->custom_handlers)); + + return p; +} + +static int pipeline_loadb_free(void *pipeline) +{ + struct pipeline *p = (struct pipeline *)pipeline; + + /* Check input arguments */ + if (p == NULL) + return -1; + + /* Free resources */ + rte_pipeline_free(p->p); + rte_free(p); + return 0; +} + +static int +pipeline_loadb_track(void *pipeline, + __rte_unused uint32_t port_in, uint32_t *port_out) +{ + struct pipeline *p = (struct pipeline *)pipeline; + + /* Check input arguments */ + if ((p == NULL) || (port_in >= p->n_ports_in) || (port_out == NULL)) + return -1; + + if (p->n_ports_in == 1) { + *port_out = 0; + return 0; + } + + return -1; +} + +static int pipeline_loadb_timer(void *pipeline) +{ + struct pipeline *p = (struct pipeline *)pipeline; + + pipeline_msg_req_handle(p); + rte_pipeline_flush(p->p); + + return 0; +} + +void *pipeline_loadb_msg_req_entry_dbg_handler(struct pipeline *p, void *msg) +{ + struct pipeline_loadb_entry_dbg_msg_rsp *rsp = msg; + uint8_t *Msg = msg; + struct pipeline_loadb *p_loadb = (struct pipeline_loadb *)p; + + rsp->status = 0; + + printf("LoadB debug handler called with args %x %x, offset %d\n", + Msg[LOADB_DBG_CMD_OFST], Msg[LOADB_DBG_CMD_OFST + 1], + LOADB_DBG_CMD_OFST); + + if (Msg[LOADB_DBG_CMD_OFST] == LOADB_DBG_CMD_STATS_SHOW) { + printf("\nLoadB Packet Stats: Received %" PRIu64 "\n", + p_loadb->receivedLBPktCount); + return rsp; + } + if (Msg[LOADB_DBG_CMD_OFST] == LOADB_DBG_CMD_STATS_CLEAR) { + printf("\nLoadB Packet Stats: Received %" PRIu64 "\n", + p_loadb->receivedLBPktCount); + p_loadb->receivedLBPktCount = 0; + return rsp; + } + + if (Msg[LOADB_DBG_CMD_OFST] == LOADB_DBG_CMD_DBG_LEVEL) { + LOADB_DEBUG = Msg[LOADB_DBG_CMD_OFST + 1]; + printf("LOADB Debug level set to %d\n", LOADB_DEBUG); + lb_pkt_print_count = 0; + return rsp; + } + if (Msg[LOADB_DBG_CMD_OFST] == LOADB_DBG_CMD_DBG_SHOW) { + printf("\nLoadB DBG Level: %u\n", LOADB_DEBUG); + return rsp; + } + if (Msg[LOADB_DBG_CMD_OFST] == LOADB_DBG_CMD_IF_STATS) { + printf("\n"); + uint8_t i, j; + + for (i = 0; i < p->n_ports_in; i++) { + struct rte_eth_stats stats; + rte_eth_stats_get(p_loadb->links_map[i], &stats); + if (is_phy_port_privte(i)) + printf("Private Port Stats %d\n", i); + else + printf("Public Port Stats %d\n", i); + printf("\n\tipackets : %" PRIu64 "\n\topackets : %" + PRIu64 "\n\tierrors : %" PRIu64 + "\n\toerrors : %" PRIu64 "\n\trx_nombuf: %" + PRIu64 "\n", stats.ipackets, stats.opackets, + stats.ierrors, stats.oerrors, stats.rx_nombuf); + if (is_phy_port_privte(i)) + printf("Private Q: "); + else + printf("Public Q: "); + for (j = 0; j < RTE_ETHDEV_QUEUE_STAT_CNTRS; j++) + printf(" %" PRIu64 ", %" PRIu64 "|", + stats.q_ipackets[j], + stats.q_opackets[j]); + + printf("\n\n"); + + } + return rsp; + } + + return rsp; + +} + +struct pipeline_be_ops pipeline_loadb_be_ops = { + .f_init = pipeline_loadb_init, + .f_free = pipeline_loadb_free, + .f_run = NULL, + .f_timer = pipeline_loadb_timer, + .f_track = pipeline_loadb_track, +}; |