aboutsummaryrefslogtreecommitdiffstats
path: root/tools/pkt_gen/xena/json
diff options
context:
space:
mode:
authorMartin Klozik <martinx.klozik@intel.com>2017-05-16 07:23:02 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-05-16 07:23:02 +0000
commit6139a65e7100a46a557a70a077e04cffdbb54223 (patch)
tree8687a9299ccd12085aa93ac90f349da5be7f4df4 /tools/pkt_gen/xena/json
parent03f38e3c9804f9c5a053a7a3b2c79381fe0deb68 (diff)
parent7eaeffff1c2976e1022f7716c7eb0c746482824e (diff)
Merge "xena_pairs: Add pairs topology for 2544 testing"
Diffstat (limited to 'tools/pkt_gen/xena/json')
-rw-r--r--tools/pkt_gen/xena/json/__init__.py13
-rw-r--r--tools/pkt_gen/xena/json/json_utilities.py105
-rw-r--r--tools/pkt_gen/xena/json/xena_json.py443
-rw-r--r--tools/pkt_gen/xena/json/xena_json_blocks.py50
-rw-r--r--tools/pkt_gen/xena/json/xena_json_mesh.py46
-rw-r--r--tools/pkt_gen/xena/json/xena_json_pairs.py60
6 files changed, 717 insertions, 0 deletions
diff --git a/tools/pkt_gen/xena/json/__init__.py b/tools/pkt_gen/xena/json/__init__.py
new file mode 100644
index 00000000..54e0301a
--- /dev/null
+++ b/tools/pkt_gen/xena/json/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2015-2017 Intel Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tools/pkt_gen/xena/json/json_utilities.py b/tools/pkt_gen/xena/json/json_utilities.py
new file mode 100644
index 00000000..857aa102
--- /dev/null
+++ b/tools/pkt_gen/xena/json/json_utilities.py
@@ -0,0 +1,105 @@
+# Copyright 2017 Red Hat Inc & Xena Networks.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Contributors:
+# Dan Amzulescu, Xena Networks
+# Christian Trautman, Red Hat Inc.
+
+"""JSON utility module"""
+
+import base64
+import json
+import locale
+import logging
+import uuid
+
+_LOGGER = logging.getLogger(__name__)
+_LOCALE = locale.getlocale()[1]
+
+def create_segment(header_type, encode_64_string):
+ """
+ Create segment for JSON file
+ :param header_type: Type of header as string
+ :param encode_64_string: 64 byte encoded string value of the hex bytes
+ :return: segment as dictionary
+ """
+ return {
+ "SegmentType": header_type.upper(),
+ "SegmentValue": encode_64_string,
+ "ItemID": str(uuid.uuid4()),
+ "ParentID": "",
+ "Label": ""}
+
+
+def decode_byte_array(enc_str):
+ """ Decodes the base64-encoded string to a byte array
+ :param enc_str: The base64-encoded string representing a byte array
+ :return: The decoded byte array
+ """
+ dec_string = base64.b64decode(enc_str)
+ barray = bytearray()
+ barray.extend(dec_string)
+ return barray
+
+
+def encode_byte_array(byte_arr):
+ """ Encodes the byte array as a base64-encoded string
+ :param byte_arr: A bytearray containing the bytes to convert
+ :return: A base64 encoded string
+ """
+ enc_string = base64.b64encode(bytes(byte_arr))
+ return enc_string
+
+
+def read_json_file(json_file):
+ """
+ Read the json file path and return a dictionary of the data
+ :param json_file: path to json file
+ :return: dictionary of json data
+ """
+ try:
+ with open(json_file, 'r', encoding=_LOCALE) as data_file:
+ file_data = json.loads(data_file.read())
+ except ValueError as exc:
+ # general json exception, Python 3.5 adds new exception type
+ _LOGGER.exception("Exception with json read: %s", exc)
+ raise
+ except IOError as exc:
+ _LOGGER.exception(
+ 'Exception during file open: %s file=%s', exc, json_file)
+ raise
+ return file_data
+
+
+def write_json_file(json_data, output_path):
+ """
+ Write out the dictionary of data to a json file
+ :param json_data: dictionary of json data
+ :param output_path: file path to write output
+ :return: Boolean if success
+ """
+ try:
+ with open(output_path, 'w', encoding=_LOCALE) as fileh:
+ json.dump(json_data, fileh, indent=2, sort_keys=True,
+ ensure_ascii=True)
+ return True
+ except ValueError as exc:
+ # general json exception, Python 3.5 adds new exception type
+ _LOGGER.exception(
+ "Exception with json write: %s", exc)
+ return False
+ except IOError as exc:
+ _LOGGER.exception(
+ 'Exception during file write: %s file=%s', exc, output_path)
+ return False
diff --git a/tools/pkt_gen/xena/json/xena_json.py b/tools/pkt_gen/xena/json/xena_json.py
new file mode 100644
index 00000000..b1eed720
--- /dev/null
+++ b/tools/pkt_gen/xena/json/xena_json.py
@@ -0,0 +1,443 @@
+# Copyright 2016-2017 Red Hat Inc & Xena Networks.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Contributors:
+# Dan Amzulescu, Xena Networks
+# Christian Trautman, Red Hat Inc.
+#
+# Usage can be seen below in unit test. This implementation is designed for one
+# module two port Xena chassis runs only.
+
+"""
+Xena JSON module
+"""
+
+from collections import OrderedDict
+import locale
+import logging
+import os
+
+import scapy.layers.inet as inet
+
+from tools.pkt_gen.xena.json import json_utilities
+
+_LOGGER = logging.getLogger(__name__)
+_LOCALE = locale.getlocale()[1]
+
+_CURR_DIR = os.path.dirname(os.path.realpath(__file__))
+
+
+class XenaJSON(object):
+ """
+ Parent Class to modify and read Xena JSON configuration files.
+ """
+ def __init__(self,
+ json_path=os.path.join(
+ _CURR_DIR, '../profiles/baseconfig.x2544')):
+ """
+ Constructor
+ :param json_path: path to JSON file to read. Expected files must have
+ two module ports with each port having its own stream config profile.
+ :return: XenaJSON object
+ """
+ self.json_data = json_utilities.read_json_file(json_path)
+
+ self.packet_data = OrderedDict()
+ self.packet_data['layer2'] = None
+ self.packet_data['vlan'] = None
+ self.packet_data['layer3'] = None
+ self.packet_data['layer4'] = None
+
+ def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
+ """
+ Add the multi stream layers to the json file based on the layer provided
+ :param entity: Entity to append the segment to in entity list
+ :param seg_uuid: The UUID to attach the multistream layer to
+ :param stop_value: The number of flows to configure
+ :param layer: the layer that the multistream will be attached to
+ :return: None
+ """
+ field_name = {
+ 2: ('Dst MAC addr', 'Src MAC addr'),
+ 3: ('Dest IP Addr', 'Src IP Addr'),
+ 4: ('Dest Port', 'Src Port')
+ }
+ segments = [
+ {
+ "Offset": 0,
+ "Mask": "//8=", # mask of 255/255
+ "Action": "INC",
+ "StartValue": 0,
+ "StopValue": stop_value,
+ "StepValue": 1,
+ "RepeatCount": 1,
+ "SegmentId": seg_uuid,
+ "FieldName": field_name[int(layer)][0]
+ },
+ {
+ "Offset": 0,
+ "Mask": "//8=", # mask of 255/255
+ "Action": "INC",
+ "StartValue": 0,
+ "StopValue": stop_value,
+ "StepValue": 1,
+ "RepeatCount": 1,
+ "SegmentId": seg_uuid,
+ "FieldName": field_name[int(layer)][1]
+ }
+ ]
+
+ self.json_data['StreamProfileHandler']['EntityList'][entity][
+ 'StreamConfig']['HwModifiers'] = (segments)
+
+ def _create_packet_header(self):
+ """
+ Create the scapy packet header based on what has been built in this
+ instance using the set header methods. Return tuple of the two byte
+ arrays, one for each port.
+ :return: Scapy packet headers as bytearrays
+ """
+ if not self.packet_data['layer2']:
+ _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
+ self.set_header_layer2()
+ packet1, packet2 = (self.packet_data['layer2'][0],
+ self.packet_data['layer2'][1])
+ for packet_header in list(self.packet_data.copy().values())[1:]:
+ if packet_header:
+ packet1 /= packet_header[0]
+ packet2 /= packet_header[1]
+ ret = (bytes(packet1), bytes(packet2))
+ return ret
+
+ def add_header_segments(self, flows=0, multistream_layer=None):
+ """
+ Build the header segments to write to the JSON file.
+ :param flows: Number of flows to configure for multistream if enabled
+ :param multistream_layer: layer to set multistream flows as string.
+ Acceptable values are L2, L3 or L4
+ :return: None
+ """
+ packet = self._create_packet_header()
+ segment1 = list()
+ segment2 = list()
+ header_pos = 0
+ if self.packet_data['layer2']:
+ # slice out the layer 2 bytes from the packet header byte array
+ layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
+ seg = json_utilities.create_segment(
+ "ETHERNET", json_utilities.encode_byte_array(layer2).decode(
+ _LOCALE))
+ if multistream_layer == 'L2' and flows > 0:
+ self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
+ stop_value=flows, layer=2)
+ segment1.append(seg)
+ # now do the other port data with reversed src, dst info
+ layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
+ seg = json_utilities.create_segment(
+ "ETHERNET", json_utilities.encode_byte_array(layer2).decode(
+ _LOCALE))
+ segment2.append(seg)
+ if multistream_layer == 'L2' and flows > 0:
+ self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
+ stop_value=flows, layer=2)
+ header_pos = len(layer2)
+ if self.packet_data['vlan']:
+ # slice out the vlan bytes from the packet header byte array
+ vlan = packet[0][header_pos: len(
+ self.packet_data['vlan'][0]) + header_pos]
+ segment1.append(json_utilities.create_segment(
+ "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE)))
+ segment2.append(json_utilities.create_segment(
+ "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE)))
+ header_pos += len(vlan)
+ if self.packet_data['layer3']:
+ # slice out the layer 3 bytes from the packet header byte array
+ layer3 = packet[0][header_pos: len(
+ self.packet_data['layer3'][0]) + header_pos]
+ seg = json_utilities.create_segment(
+ "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE))
+ segment1.append(seg)
+ if multistream_layer == 'L3' and flows > 0:
+ self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
+ stop_value=flows, layer=3)
+ # now do the other port data with reversed src, dst info
+ layer3 = packet[1][header_pos: len(
+ self.packet_data['layer3'][1]) + header_pos]
+ seg = json_utilities.create_segment(
+ "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE))
+ segment2.append(seg)
+ if multistream_layer == 'L3' and flows > 0:
+ self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
+ stop_value=flows, layer=3)
+ header_pos += len(layer3)
+ if self.packet_data['layer4']:
+ # slice out the layer 4 bytes from the packet header byte array
+ layer4 = packet[0][header_pos: len(
+ self.packet_data['layer4'][0]) + header_pos]
+ seg = json_utilities.create_segment(
+ "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE))
+ segment1.append(seg)
+ if multistream_layer == 'L4' and flows > 0:
+ self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
+ stop_value=flows, layer=4)
+ # now do the other port data with reversed src, dst info
+ layer4 = packet[1][header_pos: len(
+ self.packet_data['layer4'][1]) + header_pos]
+ seg = json_utilities.create_segment(
+ "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE))
+ segment2.append(seg)
+ if multistream_layer == 'L4' and flows > 0:
+ self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
+ stop_value=flows, layer=4)
+ header_pos += len(layer4)
+
+ self.json_data['StreamProfileHandler']['EntityList'][0][
+ 'StreamConfig']['HeaderSegments'] = segment1
+ self.json_data['StreamProfileHandler']['EntityList'][1][
+ 'StreamConfig']['HeaderSegments'] = segment2
+
+ def disable_back2back_test(self):
+ """
+ Disable the rfc2544 back to back test
+ :return: None
+ """
+ self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
+ 'Enabled'] = 'false'
+
+ def disable_throughput_test(self):
+ """
+ Disable the rfc2544 throughput test
+ :return: None
+ """
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'Enabled'] = 'false'
+
+ def enable_back2back_test(self):
+ """
+ Enable the rfc2544 back to back test
+ :return: None
+ """
+ self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
+ 'Enabled'] = 'true'
+
+ def enable_throughput_test(self):
+ """
+ Enable the rfc2544 throughput test
+ :return: None
+ """
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'Enabled'] = 'true'
+ # pylint: disable=too-many-arguments
+ def modify_2544_tput_options(self, initial_value, minimum_value,
+ maximum_value, value_resolution,
+ use_pass_threshhold, pass_threshhold):
+ """
+ modify_2544_tput_options
+ """
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['InitialValue'] = initial_value
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['MinimumValue'] = minimum_value
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['MaximumValue'] = maximum_value
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['ValueResolution'] = value_resolution
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['UsePassThreshold'] = use_pass_threshhold
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['PassThreshold'] = pass_threshhold
+
+ def set_chassis_info(self, hostname, pwd):
+ """
+ Set the chassis info
+ :param hostname: hostname as string of ip
+ :param pwd: password to chassis as string
+ :return: None
+ """
+ self.json_data['ChassisManager']['ChassisList'][0][
+ 'HostName'] = hostname
+ self.json_data['ChassisManager']['ChassisList'][0][
+ 'Password'] = pwd
+
+ def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
+ src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
+ """
+ Build a scapy Ethernet L2 objects inside instance packet_data structure
+ :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
+ :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
+ :param kwargs: Extra params per scapy usage.
+ :return: None
+ """
+ self.packet_data['layer2'] = [
+ inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
+ inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
+
+ def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
+ protocol='UDP', **kwargs):
+ """
+ Build scapy IPV4 L3 objects inside instance packet_data structure
+ :param src_ip: source IP as string in dot notation format
+ :param dst_ip: destination IP as string in dot notation format
+ :param protocol: protocol for l4
+ :param kwargs: Extra params per scapy usage
+ :return: None
+ """
+ self.packet_data['layer3'] = [
+ inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
+ inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
+
+ def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
+ """
+ Build scapy UDP L4 objects inside instance packet_data structure
+ :param source_port: Source port as int
+ :param destination_port: Destination port as int
+ :param kwargs: Extra params per scapy usage
+ :return: None
+ """
+ self.packet_data['layer4'] = [
+ inet.UDP(sport=source_port, dport=destination_port, **kwargs),
+ inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
+
+ def set_header_vlan(self, vlan_id=1, **kwargs):
+ """
+ Build a Dot1Q scapy object inside instance packet_data structure
+ :param vlan_id: The VLAN ID
+ :param kwargs: Extra params per scapy usage
+ :return: None
+ """
+ self.packet_data['vlan'] = [
+ inet.Dot1Q(vlan=vlan_id, **kwargs),
+ inet.Dot1Q(vlan=vlan_id, **kwargs)]
+
+ def set_port(self, index, module, port):
+ """
+ Set the module and port for the 0 index port to use with the test
+ :param index: Index of port to set, 0 = port1, 1=port2, etc..
+ :param module: module location as int
+ :param port: port location in module as int
+ :return: None
+ """
+ self.json_data['PortHandler']['EntityList'][index]['PortRef'][
+ 'ModuleIndex'] = module
+ self.json_data['PortHandler']['EntityList'][index]['PortRef'][
+ 'PortIndex'] = port
+
+ def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
+ """
+ Set the port IP info
+ :param port: port number as int of port to set ip info
+ :param ip_addr: ip address in dot notation format as string
+ :param netmask: cidr number for netmask (ie 24/16/8) as int
+ :param gateway: gateway address in dot notation format
+ :return: None
+ """
+ available_ports = range(len(
+ self.json_data['PortHandler']['EntityList']))
+ if port not in available_ports:
+ raise ValueError("{}{}{}".format(
+ 'Port assignment must be an available port ',
+ 'number in baseconfig file. Port=', port))
+ self.json_data['PortHandler']['EntityList'][
+ port]["IpV4Address"] = ip_addr
+ self.json_data['PortHandler']['EntityList'][
+ port]["IpV4Gateway"] = gateway
+ self.json_data['PortHandler']['EntityList'][
+ port]["IpV4RoutingPrefix"] = int(netmask)
+
+ def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
+ """
+ Set the port IP info
+ :param port: port number as int of port to set ip info
+ :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
+ by a colon.
+ :param netmask: cidr number for netmask (ie 24/16/8) as int
+ :param gateway: gateway address as string in 8 group of 4 hexadecimal
+ groups separated by a colon.
+ :return: None
+ """
+ available_ports = range(len(
+ self.json_data['PortHandler']['EntityList']))
+ if port not in available_ports:
+ raise ValueError("{}{}{}".format(
+ 'Port assignment must be an available port ',
+ 'number in baseconfig file. Port=', port))
+ self.json_data['PortHandler']['EntityList'][
+ port]["IpV6Address"] = ip_addr
+ self.json_data['PortHandler']['EntityList'][
+ port]["IpV6Gateway"] = gateway
+ self.json_data['PortHandler']['EntityList'][
+ port]["IpV6RoutingPrefix"] = int(netmask)
+
+ def set_test_options_tput(self, packet_sizes, duration, iterations,
+ loss_rate, micro_tpld=False):
+ """
+ Set the tput test options
+ :param packet_sizes: List of packet sizes to test, single int entry is
+ acceptable for one packet size testing
+ :param duration: time for each test in seconds as int
+ :param iterations: number of iterations of testing as int
+ :param loss_rate: acceptable loss rate as float
+ :param micro_tpld: boolean if micro_tpld should be enabled or disabled
+ :return: None
+ """
+ if isinstance(packet_sizes, int):
+ packet_sizes = [packet_sizes]
+ self.json_data['TestOptions']['PacketSizes'][
+ 'CustomPacketSizes'] = packet_sizes
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'Duration'] = duration
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'RateIterationOptions']['AcceptableLoss'] = loss_rate
+ self.json_data['TestOptions']['FlowCreationOptions'][
+ 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
+ self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
+ 'Iterations'] = iterations
+
+ def set_test_options_back2back(self, packet_sizes, duration,
+ iterations, startvalue, endvalue,
+ micro_tpld=False):
+ """
+ Set the back2back test options
+ :param packet_sizes: List of packet sizes to test, single int entry is
+ acceptable for one packet size testing
+ :param duration: time for each test in seconds as int
+ :param iterations: number of iterations of testing as int
+ :param micro_tpld: boolean if micro_tpld should be enabled or disabled
+ :param StartValue: start value
+ :param EndValue: end value
+ :return: None
+ """
+ if isinstance(packet_sizes, int):
+ packet_sizes = [packet_sizes]
+ self.json_data['TestOptions']['PacketSizes'][
+ 'CustomPacketSizes'] = packet_sizes
+ self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
+ 'Duration'] = duration
+ self.json_data['TestOptions']['FlowCreationOptions'][
+ 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
+ self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
+ 'Iterations'] = iterations
+ self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
+ 'RateSweepOptions']['StartValue'] = startvalue
+ self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
+ 'RateSweepOptions']['EndValue'] = endvalue
+
+ def write_config(self, path='./2bUsed.x2544'):
+ """
+ Write the config to out as file
+ :param path: Output file to export the json data to
+ :return: None
+ """
+ if not json_utilities.write_json_file(self.json_data, path):
+ raise RuntimeError("Could not write out file, please check config")
diff --git a/tools/pkt_gen/xena/json/xena_json_blocks.py b/tools/pkt_gen/xena/json/xena_json_blocks.py
new file mode 100644
index 00000000..d0a0864c
--- /dev/null
+++ b/tools/pkt_gen/xena/json/xena_json_blocks.py
@@ -0,0 +1,50 @@
+# Copyright 2017 Red Hat Inc & Xena Networks.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Contributors:
+# Dan Amzulescu, Xena Networks
+# Christian Trautman, Red Hat Inc.
+#
+# Usage can be seen below in unit test. This implementation is designed for one
+# module two port Xena chassis runs only.
+
+"""
+Xena JSON module for blocks topology
+"""
+
+from tools.pkt_gen.xena.json.xena_json import XenaJSON
+
+
+class XenaJSONBlocks(XenaJSON):
+ """
+ Class to modify and read Xena JSON configuration files. Topology will be set
+ as Blocks
+ """
+ def __init__(self):
+ super(XenaJSONBlocks, self).__init__()
+ self.set_topology_blocks()
+
+ def set_topology_blocks(self):
+ """
+ Set the test topology to a West to East config for half duplex flow with
+ port 0 as the sender and port 1 as the receiver.
+ :return: None
+ """
+ self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS'
+ self.json_data['TestOptions']['TopologyConfig'][
+ 'Direction'] = 'WEST_EAST'
+ self.json_data['PortHandler']['EntityList'][0][
+ 'PortGroup'] = "WEST"
+ self.json_data['PortHandler']['EntityList'][1][
+ 'PortGroup'] = "EAST"
diff --git a/tools/pkt_gen/xena/json/xena_json_mesh.py b/tools/pkt_gen/xena/json/xena_json_mesh.py
new file mode 100644
index 00000000..5d6a1f13
--- /dev/null
+++ b/tools/pkt_gen/xena/json/xena_json_mesh.py
@@ -0,0 +1,46 @@
+# Copyright 2017 Red Hat Inc & Xena Networks.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Contributors:
+# Dan Amzulescu, Xena Networks
+# Christian Trautman, Red Hat Inc.
+#
+# Usage can be seen below in unit test. This implementation is designed for one
+# module two port Xena chassis runs only.
+
+"""
+Xena JSON module for mesh topology.
+"""
+
+from tools.pkt_gen.xena.json.xena_json import XenaJSON
+
+
+class XenaJSONMesh(XenaJSON):
+ """
+ Class to modify and read Xena JSON configuration files. Topology will be set
+ as Mesh.
+ """
+ def __init__(self):
+ super(XenaJSONMesh, self).__init__()
+ self.set_topology_mesh()
+
+ def set_topology_mesh(self):
+ """
+ Set the test topology to Mesh for bi directional full duplex flow
+ :return: None
+ """
+ self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH'
+ self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR'
+ self.json_data['PortHandler']['EntityList'][0]['PortGroup'] = "UNDEFINED"
+ self.json_data['PortHandler']['EntityList'][1]['PortGroup'] = "UNDEFINED"
diff --git a/tools/pkt_gen/xena/json/xena_json_pairs.py b/tools/pkt_gen/xena/json/xena_json_pairs.py
new file mode 100644
index 00000000..4f97f0ba
--- /dev/null
+++ b/tools/pkt_gen/xena/json/xena_json_pairs.py
@@ -0,0 +1,60 @@
+# Copyright 2017 Red Hat Inc & Xena Networks.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Contributors:
+# Dan Amzulescu, Xena Networks
+# Christian Trautman, Red Hat Inc.
+#
+# Usage can be seen below in unit test. This implementation is designed for one
+# module two port Xena chassis runs only.
+
+"""
+Xena JSON module for pairs topology
+"""
+
+from tools.pkt_gen.xena.json.xena_json import XenaJSON
+
+
+class XenaJSONPairs(XenaJSON):
+ """
+ Class to modify and read Xena JSON configuration files. Topology will be set
+ as Blocks
+ """
+ def __init__(self):
+ super(XenaJSONPairs, self).__init__()
+ self.set_topology_pairs()
+
+ def set_topology_pairs(self):
+ """
+ Set the test topology to loopback pairs for packets that need to return
+ to the same port they are sent from
+ :return: None
+ """
+ self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'PAIRS'
+ self.json_data['TestOptions']['TopologyConfig'][
+ 'Direction'] = 'WEST_EAST'
+ self.json_data['PortHandler']['EntityList'][0][
+ 'PortGroup'] = "WEST"
+ self.json_data['PortHandler']['EntityList'][1][
+ 'PortGroup'] = "WEST"
+
+ def remove_port(self, port_number):
+ """
+ Remove a port from the config.
+ :param port_number: port to remove from json config as int
+ :return: None
+ """
+ del self.json_data['PortHandler']['EntityList'][port_number]
+ del self.json_data['StreamHandler']['StreamConnectionList'][0][
+ 'Port{}Id'.format(port_number + 1)]