aboutsummaryrefslogtreecommitdiffstats
path: root/puppet/services/database/mysql-client.yaml
blob: 4b0201f3129f81b49f1aee84076e315166b8d3c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
heat_template_version: ocata

description: >
  Mysql client settings

parameters:
  ServiceNetMap:
    default: {}
    description: Mapping of service_name -> network name. Typically set
                 via parameter_defaults in the resource registry.  This
                 mapping overrides those in ServiceNetMapDefaults.
    type: json
  DefaultPasswords:
    default: {}
    type: json
  RoleName:
    default: ''
    description: Role name on which the service is applied
    type: string
  RoleParameters:
    default: {}
    description: Parameters specific to the role
    type: json
  EndpointMap:
    default: {}
    description: Mapping of service endpoint -> protocol. Typically set
                 via parameter_defaults in the resource registry.
    type: json
  EnableInternalTLS:
    type: boolean
    default: false
  InternalTLSCAFile:
    default: '/etc/ipa/ca.crt'
    type: string
    description: Specifies the default CA cert to use if TLS is used for
                 services in the internal network.

outputs:
  role_data:
    description: Role for setting mysql client parameters
    value:
      service_name: mysql_client
      config_settings:
        tripleo::profile::base::database::mysql::client::mysql_client_bind_address: {get_param: [ServiceNetMap, MysqlNetwork]}
        tripleo::profile::base::database::mysql::client::enable_ssl: {get_param: EnableInternalTLS}
        tripleo::profile::base::database::mysql::client::ssl_ca: {get_param: InternalTLSCAFile}
      step_config: |
        include ::tripleo::profile::base::database::mysql::client
'n526' href='#n526'>526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
/*
// Copyright (c) 2017 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
*/

#include <string.h>

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>

#include <app.h>
#include <rte_common.h>
#include <rte_malloc.h>
#include <rte_ip.h>
#include <rte_byteorder.h>
#include <rte_table_stub.h>
#include <rte_table_hash.h>
#include <rte_pipeline.h>
#include <rte_arp.h>
#include <rte_icmp.h>
#include <rte_hash.h>
#include <rte_jhash.h>
#include <rte_cycles.h>
#include <rte_hexdump.h>
#include "pipeline_actions_common.h"
#include "hash_func.h"
#include "vnf_common.h"
#include "pipeline_common_be.h"
#include "pipeline_arpicmp_be.h"
#include "parser.h"
#include "hash_func.h"
#include "vnf_common.h"
#include "app.h"

#include"pipeline_common_fe.h"
#include "lib_arp.h"
#include "lib_icmpv6.h"
#include "interface.h"
#include "gateway.h"

/* Shared among all VNFs including LB */
struct app_params *myApp;
struct rte_pipeline *myP;
struct pipeline_arpicmp *gp_arp;
uint8_t num_vnf_threads;

struct pipeline_arpicmp {
	struct pipeline p;
	pipeline_msg_req_handler
		custom_handlers[PIPELINE_ARPICMP_MSG_REQS];
	uint64_t receivedPktCount;
	uint64_t droppedPktCount;
	uint64_t sentPktCount;
	uint8_t links_map[PIPELINE_MAX_PORT_IN];
	uint8_t outport_id[PIPELINE_MAX_PORT_IN];
	uint8_t pipeline_num;
} __rte_cache_aligned;

void pipelines_port_info(void)
{
	struct app_params *app = myApp;
	uint8_t i, pipeline;
	for (pipeline = 0; pipeline < app->n_pipelines; pipeline++) {
		printf("*** PIPELINE %d ***\n\n", pipeline);

		printf("*** OUTPORTs ***\n");
		for (i = 1; i < app->pipeline_params[pipeline].n_pktq_out;
			i++) {
			switch (app->pipeline_params[pipeline].pktq_out[i].
			type) {
			case APP_PKTQ_OUT_SWQ:
				printf("pktq_out[%d]:%s\n", i,
							 app->swq_params[app->pipeline_params
									 [pipeline].
									 pktq_out[i].id].name);
				break;
			case APP_PKTQ_OUT_HWQ:
				printf("pktq_out[%d]:%s\n", i,
							 app->hwq_out_params[app->pipeline_params
								 [pipeline].pktq_out
								 [i].id].name);
				break;
			default:
				printf("Not OUT SWQ or HWQ\n");
			}
		}
		printf("*** INPORTs ***\n");
		for (i = 0; i < app->pipeline_params[pipeline].n_pktq_in; i++) {
			switch (app->pipeline_params[pipeline].pktq_in[i]
			.type) {
			case APP_PKTQ_IN_SWQ:
				printf("pktq_in[%d]:%s\n", i,
							 app->swq_params[app->pipeline_params
									 [pipeline].
									 pktq_in[i].id].name);
				break;
			case APP_PKTQ_IN_HWQ:
				printf("pktq_in[%d]:%s\n", i,
							 app->hwq_in_params[app->pipeline_params
								[pipeline].
								pktq_in[i].id].name);
				break;
			default:
				printf("Not IN SWQ or HWQ\n");
			}
		}
	}                       //for
}

void pipelines_map_info(void)
{
	 int i = 0;

	printf("PIPELINE_MAX_PORT_IN %d\n", PIPELINE_MAX_PORT_IN);
	printf("lb_outport_id[%d", lb_outport_id[0]);
	for (i = 1; i < PIPELINE_MAX_PORT_IN; i++)
		printf(",%d", lb_outport_id[i]);
	printf("]\n");

	printf("vnf_to_loadb_map[%d", vnf_to_loadb_map[0]);
	for (i = 1; i < PIPELINE_MAX_PORT_IN; i++)
		printf(",%d", vnf_to_loadb_map[i]);
	printf("]\n");

	printf("port_to_loadb_map[%d", port_to_loadb_map[0]);
	for (i = 1; i < PIPELINE_MAX_PORT_IN; i++)
		printf(",%d", port_to_loadb_map[i]);
	printf("]\n");

	printf("loadb_pipeline_nums[%d", loadb_pipeline_nums[0]);
	for (i = 1; i < PIPELINE_MAX_PORT_IN; i++)
		printf(",%d", loadb_pipeline_nums[i]);
	printf("]\n");

	printf("loadb_pipeline[%p", loadb_pipeline[0]);
	for (i = 1; i < PIPELINE_MAX_PORT_IN; i++)
		printf(",%p", loadb_pipeline[i]);
	printf("]\n");
}

void register_pipeline_Qs(uint8_t pipeline_num, struct pipeline *p)
{
	struct rte_port_ethdev_reader *hwq;
	struct rte_port_ring_writer *out_swq;
	struct rte_port_ring_reader *in_swq;
	struct rte_pipeline *rte = p->p;
	uint8_t port_count = 0;
	int queue_out = 0xff, queue_in = 0xff;

	printf("Calling register_pipeline_Qs in PIPELINE%d\n", pipeline_num);
	for (port_count = 0; port_count < rte->num_ports_out; port_count++) {

	switch (myApp->pipeline_params[pipeline_num].
				pktq_out[port_count].type){

	case APP_PKTQ_OUT_SWQ:

		if (port_count >= rte->num_ports_in) {

			/* Dont register ARP output Q */
			if (rte->num_ports_out % rte->num_ports_in)
				if (port_count == rte->num_ports_out - 1)
					return;
			int temp;
			temp = ((port_count) % rte->num_ports_in);

			in_swq = rte->ports_in[temp].h_port;
			out_swq = rte->ports_out[port_count].h_port;
			printf("in_swq : %s\n",
				in_swq->ring->name);
			int status =
			sscanf(in_swq->ring->name, "SWQ%d",
					&queue_in);
			if (status < 0) {
				printf("Unable to read SWQ number\n");
				return;
			}
			printf("out_swq: %s\n",
					out_swq->ring->name);
			status =
			sscanf(out_swq->ring->name, "SWQ%d",
					&queue_out);
			if (status < 0) {
				printf("Unable to read SWQ number\n");
				return;
			}
			if (queue_in < 128 && queue_out < 128) {
				SWQ_to_Port_map[queue_out] =
					SWQ_to_Port_map[queue_in];
			 printf("SWQ_to_Port_map[%d]%d\n", queue_out,
				 SWQ_to_Port_map[queue_out]);
                        }
			continue;
		}

		switch (myApp->pipeline_params[pipeline_num].
			 pktq_in[port_count].type){

		case APP_PKTQ_IN_HWQ:
			 hwq = rte->ports_in[port_count].h_port;
			 out_swq = rte->ports_out[port_count].h_port;
			 printf("out_swq: %s\n",
				 out_swq->ring->name);
			int status =
			sscanf(out_swq->ring->name, "SWQ%d",
				 &queue_out);

			if (status < 0) {
				printf("Unable to read SWQ number\n");
				return;
			}
			if (queue_out < 128) {
				SWQ_to_Port_map[queue_out] = hwq->port_id;
				printf("SWQ_to_Port_map[%d]%d\n", queue_out,
					SWQ_to_Port_map[queue_out]);
			}
		break;

		case APP_PKTQ_IN_SWQ:
			 in_swq = rte->ports_in[port_count].h_port;
			 out_swq = rte->ports_out[port_count].h_port;
			 printf("in_swq : %s\n",
				 in_swq->ring->name);
			status =
			sscanf(in_swq->ring->name, "SWQ%d",
					 &queue_in);
			if (status < 0) {
				printf("Unable to read SWQ number\n");
				return;
			}
			printf("out_swq: %s\n",
					 out_swq->ring->name);
			status =
			sscanf(out_swq->ring->name, "SWQ%d",
					 &queue_out);
			if (status < 0) {
				printf("Unable to read SWQ number\n");
				return;
			}
			if (queue_in < 128 && queue_out < 128){
				SWQ_to_Port_map[queue_out] =
					SWQ_to_Port_map[queue_in];
			 printf("SWQ_to_Port_map[%d]%d\n", queue_out,
				 SWQ_to_Port_map[queue_out]);
                          }
		break;

		default:
			 printf("This never hits\n");
		}

	break;

	case APP_PKTQ_OUT_HWQ:
		 printf("This is HWQ\n");
	break;

	default:
		 printf("set_phy_outport_map: This never hits\n");
	}
	}
}

void set_link_map(uint8_t pipeline_num, struct pipeline *p, uint8_t *map)
{
		struct rte_port_ethdev_writer *hwq;
		struct rte_port_ring_writer *out_swq;
		struct rte_pipeline *rte = p->p;

		uint8_t port_count = 0;
		int index = 0, queue_out = 0xff;

	printf("Calling set_link_map in PIPELINE%d\n", pipeline_num);
	for (port_count = 0; port_count < rte->num_ports_out; port_count++) {

		switch (myApp->pipeline_params[pipeline_num].
				pktq_out[port_count].type){

		case APP_PKTQ_OUT_HWQ:
			hwq = rte->ports_out[port_count].h_port;
			map[index++] = hwq->port_id;
			printf("links_map[%d]:%d\n", index - 1, map[index - 1]);
		break;

		case APP_PKTQ_OUT_SWQ:
			out_swq = rte->ports_out[port_count].h_port;
			printf("set_link_map out_swq: %s\n",
				out_swq->ring->name);
			int status = sscanf(out_swq->ring->name, "SWQ%d",
					&queue_out);
			if (status < 0) {
				printf("Unable to read SWQ number\n");
				return;
			}

			if (queue_out < 128) {
			map[index++] = SWQ_to_Port_map[queue_out];
			printf("links_map[%s]:%d\n", out_swq->ring->name,
					map[index - 1]);
			}
		break;

		default:
			printf("set_phy_outport_map: This never hits\n");
		}
		}
}

void set_outport_id(uint8_t pipeline_num, struct pipeline *p, uint8_t *map)
{
	uint8_t port_count = 0;
	int queue_out = 0xff, index = 0;

	struct rte_port_ethdev_writer *hwq;
	struct rte_port_ring_writer *out_swq;
	struct rte_pipeline *rte = p->p;

	printf("\n**** set_outport_id() with pipeline_num:%d ****\n\n",
		pipeline_num);
	for (port_count = 0;
		port_count < rte->num_ports_out;
		port_count++) {

	switch (myApp->pipeline_params[pipeline_num].
			pktq_out[port_count].type) {

	case APP_PKTQ_OUT_HWQ:
		hwq = rte->ports_out[port_count].h_port;
		//if (index >= 0)
		{
			map[hwq->port_id] = index;
			printf("hwq port_id:%d index:%d\n",
				hwq->port_id, index);
			map[hwq->port_id] = index++;
			printf("hwq port_id:%d index:%d\n",
				hwq->port_id, index-1);
			printf("outport_id[%d]:%d\n", index - 1,
				map[index - 1]);
		}
		break;

	case APP_PKTQ_OUT_SWQ:

		/* Dont register ARP output Q */
		if (port_count >= rte->num_ports_in)
			if (rte->num_ports_out % rte->num_ports_in)
				if (port_count == rte->num_ports_out - 1)
					return;
		 out_swq = rte->ports_out[port_count].h_port;
		 printf("set_outport_id out_swq: %s\n",
			 out_swq->ring->name);
		int temp = sscanf(out_swq->ring->name, "SWQ%d",
				 &queue_out);
		if (temp < 0) {
			printf("Unable to read SWQ number\n");
			return;
		}

		if (queue_out < 128 && index >= 0) {
			map[SWQ_to_Port_map[queue_out]] = index++;
			printf("outport_id[%s]:%d\n", out_swq->ring->name,
					 map[SWQ_to_Port_map[queue_out]]);
		}
		break;

		default:
			 printf(" ");

		}
	}
}

void set_phy_outport_id(uint8_t pipeline_num, struct pipeline *p, uint8_t *map)
{
	uint8_t port_count = 0;
	int index = 0;

	struct rte_port_ethdev_writer *hwq;
	struct rte_pipeline *rte = p->p;

	printf("\n**** set_phy_outport_id() with pipeline_num:%d ****\n\n",
		pipeline_num);
	for (port_count = 0;
		port_count < myApp->pipeline_params[pipeline_num].n_pktq_out;
		port_count++) {

	switch (myApp->pipeline_params[pipeline_num].
			pktq_out[port_count].type) {

	case APP_PKTQ_OUT_HWQ:
		hwq = rte->ports_out[port_count].h_port;
		map[hwq->port_id] = index++;
		printf("outport_id[%d]:%d\n", index - 1,
			map[index - 1]);
	break;

	default:
		 printf(" ");

		}
	}
}

void set_phy_inport_id(uint8_t pipeline_num, struct pipeline *p, uint8_t *map)
{
	uint8_t port_count = 0;
	int index = 0;

	struct rte_port_ethdev_reader *hwq;
	struct rte_pipeline *rte = p->p;

	printf("\n**** set_phy_inport_id() with pipeline_num:%d ****\n\n",
				 pipeline_num);
	for (port_count = 0;
		port_count < myApp->pipeline_params[pipeline_num].n_pktq_in;
		port_count++) {

		switch (myApp->pipeline_params[pipeline_num].
			pktq_in[port_count].type) {

		case APP_PKTQ_IN_HWQ:
			hwq = rte->ports_in[port_count].h_port;
			map[hwq->port_id] = index++;
			printf("outport_id[%d]:%d\n", index - 1,
				map[index - 1]);
		break;

		default:
			printf(" ");

		}
	}
}

static void *pipeline_arpicmp_msg_req_custom_handler(struct pipeline *p,
							void *msg);

static pipeline_msg_req_handler handlers[] = {
	[PIPELINE_MSG_REQ_PING] =
		pipeline_msg_req_ping_handler,
	[PIPELINE_MSG_REQ_STATS_PORT_IN] =
		pipeline_msg_req_stats_port_in_handler,
	[PIPELINE_MSG_REQ_STATS_PORT_OUT] =
		pipeline_msg_req_stats_port_out_handler,
	[PIPELINE_MSG_REQ_STATS_TABLE] =
		pipeline_msg_req_stats_table_handler,
	[PIPELINE_MSG_REQ_PORT_IN_ENABLE] =
		pipeline_msg_req_port_in_enable_handler,
	[PIPELINE_MSG_REQ_PORT_IN_DISABLE] =
		pipeline_msg_req_port_in_disable_handler,
	[PIPELINE_MSG_REQ_CUSTOM] =
		pipeline_arpicmp_msg_req_custom_handler,

};

static void *pipeline_arpicmp_msg_req_entry_dbg_handler(struct pipeline *p,
								 void *msg);
static void *pipeline_arpicmp_msg_req_entry_dbg_handler(
	__rte_unused struct pipeline *p,
	__rte_unused void *msg)
{
	/*have to handle dbg commands*/
	return NULL;
}

static __rte_unused pipeline_msg_req_handler custom_handlers[] = {
	[PIPELINE_ARPICMP_MSG_REQ_ENTRY_DBG] =
			pipeline_arpicmp_msg_req_entry_dbg_handler,
};

/**
 * Function for pipeline custom handlers
 *
 * @param pipeline
 *  A void pointer to pipeline
 * @param msg
 *  void pointer for incoming data
 *
 * @return
 *  void pointer of response
 */
void *pipeline_arpicmp_msg_req_custom_handler(struct pipeline *p, void *msg)
{
	struct pipeline_arpicmp *p_arp = (struct pipeline_arpicmp *)p;
	struct pipeline_custom_msg_req *req = msg;
	pipeline_msg_req_handler f_handle;

	f_handle = (req->subtype < PIPELINE_ARPICMP_MSG_REQS) ?
			p_arp->custom_handlers[req->subtype] :
			pipeline_msg_req_invalid_handler;

	if (f_handle == NULL)
		f_handle = pipeline_msg_req_invalid_handler;

	return f_handle(p, req);
}

uint32_t arpicmp_pkt_print_count;
static inline void
pkt_key_arpicmp(struct rte_mbuf *pkt, uint32_t pkt_num, void *arg)
{

	struct pipeline_arpicmp_in_port_h_arg *ap = arg;
	struct pipeline_arpicmp *p_arp = (struct pipeline_arpicmp *)ap->p;

	p_arp->receivedPktCount++;

	uint8_t in_port_id = pkt->port;
	uint8_t *protocol;
	uint32_t pkt_mask = 1 << pkt_num;
	uint32_t eth_proto_offset = MBUF_HDR_ROOM + 12;

	uint32_t prot_offset =
			MBUF_HDR_ROOM + ETH_HDR_SIZE + IP_HDR_PROTOCOL_OFST;

	uint16_t *eth_proto =
		RTE_MBUF_METADATA_UINT16_PTR(pkt, eth_proto_offset);

	/* header room + eth hdr size + src_aadr offset in ip header */
	#ifdef IPV6
	 uint32_t prot_offset_ipv6 =
			 MBUF_HDR_ROOM + ETH_HDR_SIZE + IPV6_HDR_PROTOCOL_OFST;

	if (rte_be_to_cpu_16(*eth_proto) == ETHER_TYPE_IPv6)
		protocol = RTE_MBUF_METADATA_UINT8_PTR(pkt, prot_offset_ipv6);
	else
		protocol = RTE_MBUF_METADATA_UINT8_PTR(pkt, prot_offset);
	#else
	protocol = RTE_MBUF_METADATA_UINT8_PTR(pkt, prot_offset);
	#endif

	if ((ARPICMP_DEBUG > 2) && (arpicmp_pkt_print_count < 10)) {
		print_pkt1(pkt);
		arpicmp_pkt_print_count++;
		printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, "
			"ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n",
				 rte_be_to_cpu_16(*eth_proto), *protocol, ETH_TYPE_ARP,
				 ETH_TYPE_IPV4, IP_PROTOCOL_ICMP);
	}

	/* Classifier for ICMP pass-through*/
	if ((rte_be_to_cpu_16(*eth_proto) == ETH_TYPE_ARP) ||
			((rte_be_to_cpu_16(*eth_proto) == ETH_TYPE_IPV4)
			 && (*protocol == IP_PROTOCOL_ICMP)
		)) {
		process_arpicmp_pkt(pkt, ifm_get_port(in_port_id));
		return;
	}
	#ifdef IPV6
	else if ((rte_be_to_cpu_16(*eth_proto) == ETH_TYPE_IPV6)
		&& (*protocol == ICMPV6_PROTOCOL_ID)) {
		process_icmpv6_pkt(pkt, ifm_get_port(in_port_id));
		return;
	}
	#endif

	/* Drop the pkt if not ARP/ICMP */
	rte_pipeline_ah_packet_drop(p_arp->p.p, pkt_mask);
	p_arp->droppedPktCount++;

}

static inline void
pkt4_key_arpicmp(struct rte_mbuf **pkt, uint32_t pkt_num, void *arg)
{

	struct pipeline_arpicmp_in_port_h_arg *ap = arg;
	struct pipeline_arpicmp *p_arp = (struct pipeline_arpicmp *)ap->p;
	p_arp->receivedPktCount += 4;

	uint32_t eth_proto_offset = MBUF_HDR_ROOM + 12;
	uint8_t in_port_id = pkt[0]->port;

	uint32_t prot_offset =
			MBUF_HDR_ROOM + ETH_HDR_SIZE + IP_HDR_PROTOCOL_OFST;

	/* header room + eth hdr size + src_aadr offset in ip header */
	uint32_t pkt_mask0 = 1 << pkt_num;
	uint32_t pkt_mask1 = 1 << (pkt_num + 1);
	uint32_t pkt_mask2 = 1 << (pkt_num + 2);
	uint32_t pkt_mask3 = 1 << (pkt_num + 3);

	uint16_t *eth_proto0 =
		RTE_MBUF_METADATA_UINT16_PTR(pkt[0], eth_proto_offset);
	uint16_t *eth_proto1 =
		RTE_MBUF_METADATA_UINT16_PTR(pkt[1], eth_proto_offset);
	uint16_t *eth_proto2 =
		RTE_MBUF_METADATA_UINT16_PTR(pkt[2], eth_proto_offset);
	uint16_t *eth_proto3 =
		RTE_MBUF_METADATA_UINT16_PTR(pkt[3], eth_proto_offset);

	uint8_t *protocol0;
	uint8_t *protocol1;
	uint8_t *protocol2;
	uint8_t *protocol3;

	#ifdef IPV6
	uint32_t prot_offset_ipv6 =
			MBUF_HDR_ROOM + ETH_HDR_SIZE + IPV6_HDR_PROTOCOL_OFST;

	#endif

	#ifdef IPV6
/* --0-- */
	if (rte_be_to_cpu_16(*eth_proto0) == ETHER_TYPE_IPv6)
		protocol0 =
				RTE_MBUF_METADATA_UINT8_PTR(pkt[0], prot_offset_ipv6);
	else
		protocol0 = RTE_MBUF_METADATA_UINT8_PTR(pkt[0], prot_offset);

/* --1-- */
	if (rte_be_to_cpu_16(*eth_proto1) == ETHER_TYPE_IPv6)
		protocol1 =
				RTE_MBUF_METADATA_UINT8_PTR(pkt[1], prot_offset_ipv6);
	else
		protocol1 = RTE_MBUF_METADATA_UINT8_PTR(pkt[1], prot_offset);

/* --2-- */
	if (rte_be_to_cpu_16(*eth_proto2) == ETHER_TYPE_IPv6)
		protocol2 =
				RTE_MBUF_METADATA_UINT8_PTR(pkt[2], prot_offset_ipv6);
	else
		protocol2 = RTE_MBUF_METADATA_UINT8_PTR(pkt[2], prot_offset);

/* --3-- */
	if (rte_be_to_cpu_16(*eth_proto3) == ETHER_TYPE_IPv6)
		protocol3 =
				RTE_MBUF_METADATA_UINT8_PTR(pkt[3], prot_offset_ipv6);
	else
		protocol3 = RTE_MBUF_METADATA_UINT8_PTR(pkt[3], prot_offset);
	#else
	protocol0 = RTE_MBUF_METADATA_UINT8_PTR(pkt[0], prot_offset);
	protocol1 = RTE_MBUF_METADATA_UINT8_PTR(pkt[1], prot_offset);
	protocol2 = RTE_MBUF_METADATA_UINT8_PTR(pkt[2], prot_offset);
	protocol3 = RTE_MBUF_METADATA_UINT8_PTR(pkt[3], prot_offset);
	#endif

	if ((ARPICMP_DEBUG > 2) && (arpicmp_pkt_print_count < 10)) {
		print_pkt1(pkt[0]);
		arpicmp_pkt_print_count++;
		printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, "
			"ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n",
				 rte_be_to_cpu_16(*eth_proto0), *protocol0, ETH_TYPE_ARP,
				 ETH_TYPE_IPV4, IP_PROTOCOL_ICMP);
	}


	if ((rte_be_to_cpu_16(*eth_proto0) == ETH_TYPE_ARP) ||
			((rte_be_to_cpu_16(*eth_proto0) == ETH_TYPE_IPV4)
			 && (*protocol0 == IP_PROTOCOL_ICMP)
		)) {
		process_arpicmp_pkt(pkt[0], ifm_get_port(in_port_id));

		goto PKT1;
	}
	#ifdef IPV6
	else if ((rte_be_to_cpu_16(*eth_proto0) == ETH_TYPE_IPV6)
			 && (*protocol0 == ICMPV6_PROTOCOL_ID)) {
		process_icmpv6_pkt(pkt[0], ifm_get_port(in_port_id));

		goto PKT1;
	}
	#endif

	/* Drop the pkt if not ARP/ICMP */
	rte_pipeline_ah_packet_drop(p_arp->p.p, pkt_mask0);
	p_arp->droppedPktCount++;

PKT1:
	if ((ARPICMP_DEBUG > 2) && (arpicmp_pkt_print_count < 10)) {
		print_pkt1(pkt[1]);
		arpicmp_pkt_print_count++;
		printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, "
			"ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n",
				 rte_be_to_cpu_16(*eth_proto1), *protocol1, ETH_TYPE_ARP,
				 ETH_TYPE_IPV4, IP_PROTOCOL_ICMP);
	}

	if ((rte_be_to_cpu_16(*eth_proto1) == ETH_TYPE_ARP) ||
			((rte_be_to_cpu_16(*eth_proto1) == ETH_TYPE_IPV4)
			 && (*protocol1 == IP_PROTOCOL_ICMP)
		)) {
		process_arpicmp_pkt(pkt[1], ifm_get_port(in_port_id));
		goto PKT2;
	}
	#ifdef IPV6
	else if ((rte_be_to_cpu_16(*eth_proto1) == ETH_TYPE_IPV6)
		&& (*protocol1 == ICMPV6_PROTOCOL_ID)) {
		process_icmpv6_pkt(pkt[1], ifm_get_port(in_port_id));

		goto PKT2;
	}
	#endif

	/* Drop the pkt if not ARP/ICMP */
	rte_pipeline_ah_packet_drop(p_arp->p.p, pkt_mask1);
	p_arp->droppedPktCount++;

PKT2:
	if ((ARPICMP_DEBUG > 2) && (arpicmp_pkt_print_count < 10)) {
		print_pkt1(pkt[2]);
		arpicmp_pkt_print_count++;
		printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, "
			"ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n",
				 rte_be_to_cpu_16(*eth_proto2), *protocol2, ETH_TYPE_ARP,
				 ETH_TYPE_IPV4, IP_PROTOCOL_ICMP);
	}

	if ((rte_be_to_cpu_16(*eth_proto2) == ETH_TYPE_ARP) ||
			((rte_be_to_cpu_16(*eth_proto2) == ETH_TYPE_IPV4)
			 && (*protocol2 == IP_PROTOCOL_ICMP)
		)) {
		process_arpicmp_pkt(pkt[2], ifm_get_port(in_port_id));
		goto PKT3;
	}
	#ifdef IPV6
	else if ((rte_be_to_cpu_16(*eth_proto2) == ETH_TYPE_IPV6)
		&& (*protocol2 == ICMPV6_PROTOCOL_ID)) {
		process_icmpv6_pkt(pkt[2], ifm_get_port(in_port_id));

		goto PKT3;
	}
	#endif

	/* Drop the pkt if not ARP/ICMP */
	rte_pipeline_ah_packet_drop(p_arp->p.p, pkt_mask2);
	p_arp->droppedPktCount++;

PKT3:
	if ((ARPICMP_DEBUG > 2) && (arpicmp_pkt_print_count < 10)) {
		print_pkt1(pkt[3]);
		arpicmp_pkt_print_count++;
		printf("\nEth Typ %x, Prot %x, ETH_TYPE_ARP %x, "
			"ETH_TYPE_IPV4 %x, IP_PROTOCOL_ICMP %x\n",
				 rte_be_to_cpu_16(*eth_proto3), *protocol3, ETH_TYPE_ARP,
				 ETH_TYPE_IPV4, IP_PROTOCOL_ICMP);
	}

	if ((rte_be_to_cpu_16(*eth_proto3) == ETH_TYPE_ARP) ||
			((rte_be_to_cpu_16(*eth_proto3) == ETH_TYPE_IPV4)
			 && (*protocol3 == IP_PROTOCOL_ICMP)
		)) {

		process_arpicmp_pkt(pkt[3], ifm_get_port(in_port_id));

		return;
	}
	#ifdef IPV6
	else if ((rte_be_to_cpu_16(*eth_proto3) == ETH_TYPE_IPV6)
		&& (*protocol3 == ICMPV6_PROTOCOL_ID)) {

		process_icmpv6_pkt(pkt[3], ifm_get_port(in_port_id));
		return;
	}
	#endif

	/* Drop the pkt if not ARP/ICMP */
	rte_pipeline_ah_packet_drop(p_arp->p.p, pkt_mask3);
	p_arp->droppedPktCount++;


}

PIPELINE_ARPICMP_KEY_PORT_IN_AH(
	port_in_ah_arpicmp,
	pkt_key_arpicmp,
	pkt4_key_arpicmp);

static void *pipeline_arpicmp_init(struct pipeline_params *params,
				__rte_unused void *arg)
{
	struct pipeline *p;
	struct pipeline_arpicmp *p_arp;
	uint32_t size, i, in_ports_arg_size;

	printf("Start pipeline_arpicmp_init\n");

	/* Check input arguments */
	if ((params == NULL) ||
			(params->n_ports_in == 0) ||
			(params->n_ports_out == 0))
		return NULL;

	/* Memory allocation */
	size = RTE_CACHE_LINE_ROUNDUP(sizeof(struct pipeline_arpicmp));
	p = rte_zmalloc(NULL, size, RTE_CACHE_LINE_SIZE);
	p_arp = (struct pipeline_arpicmp *)p;
	if (p == NULL)
		return NULL;

	//gp_arp = p_arp;
	struct app_params *app = (struct app_params *)arg;
	myApp = arg;

	PLOG(p, HIGH, "ARPICMP");
	strcpy(p->name, params->name);
	p->log_level = params->log_level;

	p_arp->receivedPktCount = 0;
	p_arp->droppedPktCount = 0;
	gw_init(rte_eth_dev_count());
	lib_arp_init(params, app);

	/* Pipeline */
	{
		struct rte_pipeline_params pipeline_params = {
			.name = "ARPICMP",
			.socket_id = params->socket_id,
			.offset_port_id = 0,
			//.offset_port_id = arp_meta_offset,
		};

		p->p = rte_pipeline_create(&pipeline_params);
		if (p->p == NULL) {
			rte_free(p);
			return NULL;
		}
	}

	p->n_ports_in = params->n_ports_in;
	p->n_ports_out = params->n_ports_out;
	p->n_tables = 1;

	/* Memory allocation for in_port_h_arg */
	in_ports_arg_size = RTE_CACHE_LINE_ROUNDUP(
		(sizeof(struct pipeline_arpicmp_in_port_h_arg)) *
				(params->n_ports_in));
	struct pipeline_arpicmp_in_port_h_arg *ap =
		(struct pipeline_arpicmp_in_port_h_arg *)rte_zmalloc(NULL,
				in_ports_arg_size,
				RTE_CACHE_LINE_SIZE);
	if (ap == NULL)
		return NULL;

	/*Input ports */
	for (i = 0; i < p->n_ports_in; i++) {
		/* passing our txrx pipeline in call back arg */
		(ap[i]).p = p_arp;
		(ap[i]).in_port_id = i;
		struct rte_pipeline_port_in_params port_params = {
			.ops =
					pipeline_port_in_params_get_ops(&params->
									port_in[i]),
			.arg_create =
					pipeline_port_in_params_convert(&params->
									port_in[i]),
			.f_action = NULL,
			.arg_ah = &(ap[i]),
			.burst_size = params->port_in[i].burst_size,
		};

			port_params.f_action = port_in_ah_arpicmp;

		int status = rte_pipeline_port_in_create(p->p,
							 &port_params,
							 &p->port_in_id[i]);

		if (status) {
			rte_pipeline_free(p->p);
			rte_free(p);
			return NULL;
		}
	}

	/* Output ports */
	for (i = 0; i < p->n_ports_out; i++) {
		struct rte_pipeline_port_out_params port_params = {
			.ops =
					pipeline_port_out_params_get_ops(&params->
									 port_out[i]),
			.arg_create =
					pipeline_port_out_params_convert(&params->
									 port_out[i]),
			.f_action = NULL,
			.arg_ah = NULL,
		};

		int status = rte_pipeline_port_out_create(p->p,
								&port_params,
								&p->port_out_id[i]);

		if (status) {
			rte_pipeline_free(p->p);
			rte_free(p);
			return NULL;
		}
	}
	int pipeline_num = 0;

	int status = sscanf(params->name, "PIPELINE%d", &pipeline_num);

	if (status < 0) {
		return NULL;
		printf("Unable to read pipeline number\n");
	}

	p_arp->pipeline_num = (uint8_t) pipeline_num;

	register_pipeline_Qs(p_arp->pipeline_num, p);
	set_phy_outport_id(p_arp->pipeline_num, p, p_arp->outport_id);

	/* Tables */
	{
		struct rte_pipeline_table_params table_params = {
			.ops = &rte_table_stub_ops,
			.arg_create = NULL,
			.f_action_hit = NULL,
			.f_action_miss = NULL,
			.arg_ah = NULL,
			.action_data_size = 0,
		};

		int status = rte_pipeline_table_create(p->p,
									 &table_params,
									 &p->table_id[0]);

		if (status) {
			rte_pipeline_free(p->p);
			rte_free(p);
			return NULL;
		}
	}

	/* Connecting input ports to tables */
	for (i = 0; i < p->n_ports_in; i++) {

		int status = rte_pipeline_port_in_connect_to_table(p->p,
									 p->
									 port_in_id
									 [i],
									 p->
									 table_id[0]);

		if (status) {
			rte_pipeline_free(p->p);
			rte_free(p);
			return NULL;
		}

	}

	/* Enable input ports */
	for (i = 0; i < p->n_ports_in; i++) {
		int status = rte_pipeline_port_in_enable(p->p,
							 p->port_in_id[i]);

		if (status) {
			rte_pipeline_free(p->p);
			rte_free(p);
			return NULL;
		}
	}

	/* Check pipeline consistency */
	if (rte_pipeline_check(p->p) < 0) {
		rte_pipeline_free(p->p);
		rte_free(p);
		return NULL;
	}

	/* Message queues */
	p->n_msgq = params->n_msgq;
	for (i = 0; i < p->n_msgq; i++)
		p->msgq_in[i] = params->msgq_in[i];
	for (i = 0; i < p->n_msgq; i++)
		p->msgq_out[i] = params->msgq_out[i];

	/* Message handlers */
	memcpy(p->handlers, handlers, sizeof(p->handlers));

	return p;
}

static int pipeline_arpicmp_free(void *pipeline)
{
	struct pipeline *p = (struct pipeline *)pipeline;

	/* Check input arguments */
	if (p == NULL)
		return -1;

	/* Free resources */
	rte_pipeline_free(p->p);
	rte_free(p);
	return 0;
}

static int pipeline_arpicmp_timer(void *pipeline)
{
	struct pipeline *p = (struct pipeline *)pipeline;

	pipeline_msg_req_handle(p);
	rte_pipeline_flush(p->p);

	return 0;
}

static int
pipeline_arpicmp_track(void *pipeline, uint32_t port_in, uint32_t *port_out)
{
	struct pipeline *p = (struct pipeline *)pipeline;

	/* Check input arguments */
	if ((p == NULL) || (port_in >= p->n_ports_in) || (port_out == NULL))
		return -1;

	*port_out = port_in / p->n_ports_in;
	return 0;
}

struct pipeline_be_ops pipeline_arpicmp_be_ops = {
	.f_init = pipeline_arpicmp_init,
	.f_free = pipeline_arpicmp_free,
	.f_run = NULL,
	.f_timer = pipeline_arpicmp_timer,
	.f_track = pipeline_arpicmp_track,
};