diff options
author | Martin Klozik <martinx.klozik@intel.com> | 2017-05-16 07:23:02 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2017-05-16 07:23:02 +0000 |
commit | 6139a65e7100a46a557a70a077e04cffdbb54223 (patch) | |
tree | 8687a9299ccd12085aa93ac90f349da5be7f4df4 /tools/pkt_gen/xena/json | |
parent | 03f38e3c9804f9c5a053a7a3b2c79381fe0deb68 (diff) | |
parent | 7eaeffff1c2976e1022f7716c7eb0c746482824e (diff) |
Merge "xena_pairs: Add pairs topology for 2544 testing"
Diffstat (limited to 'tools/pkt_gen/xena/json')
-rw-r--r-- | tools/pkt_gen/xena/json/__init__.py | 13 | ||||
-rw-r--r-- | tools/pkt_gen/xena/json/json_utilities.py | 105 | ||||
-rw-r--r-- | tools/pkt_gen/xena/json/xena_json.py | 443 | ||||
-rw-r--r-- | tools/pkt_gen/xena/json/xena_json_blocks.py | 50 | ||||
-rw-r--r-- | tools/pkt_gen/xena/json/xena_json_mesh.py | 46 | ||||
-rw-r--r-- | tools/pkt_gen/xena/json/xena_json_pairs.py | 60 |
6 files changed, 717 insertions, 0 deletions
diff --git a/tools/pkt_gen/xena/json/__init__.py b/tools/pkt_gen/xena/json/__init__.py new file mode 100644 index 00000000..54e0301a --- /dev/null +++ b/tools/pkt_gen/xena/json/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2015-2017 Intel Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tools/pkt_gen/xena/json/json_utilities.py b/tools/pkt_gen/xena/json/json_utilities.py new file mode 100644 index 00000000..857aa102 --- /dev/null +++ b/tools/pkt_gen/xena/json/json_utilities.py @@ -0,0 +1,105 @@ +# Copyright 2017 Red Hat Inc & Xena Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Contributors: +# Dan Amzulescu, Xena Networks +# Christian Trautman, Red Hat Inc. + +"""JSON utility module""" + +import base64 +import json +import locale +import logging +import uuid + +_LOGGER = logging.getLogger(__name__) +_LOCALE = locale.getlocale()[1] + +def create_segment(header_type, encode_64_string): + """ + Create segment for JSON file + :param header_type: Type of header as string + :param encode_64_string: 64 byte encoded string value of the hex bytes + :return: segment as dictionary + """ + return { + "SegmentType": header_type.upper(), + "SegmentValue": encode_64_string, + "ItemID": str(uuid.uuid4()), + "ParentID": "", + "Label": ""} + + +def decode_byte_array(enc_str): + """ Decodes the base64-encoded string to a byte array + :param enc_str: The base64-encoded string representing a byte array + :return: The decoded byte array + """ + dec_string = base64.b64decode(enc_str) + barray = bytearray() + barray.extend(dec_string) + return barray + + +def encode_byte_array(byte_arr): + """ Encodes the byte array as a base64-encoded string + :param byte_arr: A bytearray containing the bytes to convert + :return: A base64 encoded string + """ + enc_string = base64.b64encode(bytes(byte_arr)) + return enc_string + + +def read_json_file(json_file): + """ + Read the json file path and return a dictionary of the data + :param json_file: path to json file + :return: dictionary of json data + """ + try: + with open(json_file, 'r', encoding=_LOCALE) as data_file: + file_data = json.loads(data_file.read()) + except ValueError as exc: + # general json exception, Python 3.5 adds new exception type + _LOGGER.exception("Exception with json read: %s", exc) + raise + except IOError as exc: + _LOGGER.exception( + 'Exception during file open: %s file=%s', exc, json_file) + raise + return file_data + + +def write_json_file(json_data, output_path): + """ + Write out the dictionary of data to a json file + :param json_data: dictionary of json data + :param output_path: file path to write output + :return: Boolean if success + """ + try: + with open(output_path, 'w', encoding=_LOCALE) as fileh: + json.dump(json_data, fileh, indent=2, sort_keys=True, + ensure_ascii=True) + return True + except ValueError as exc: + # general json exception, Python 3.5 adds new exception type + _LOGGER.exception( + "Exception with json write: %s", exc) + return False + except IOError as exc: + _LOGGER.exception( + 'Exception during file write: %s file=%s', exc, output_path) + return False diff --git a/tools/pkt_gen/xena/json/xena_json.py b/tools/pkt_gen/xena/json/xena_json.py new file mode 100644 index 00000000..b1eed720 --- /dev/null +++ b/tools/pkt_gen/xena/json/xena_json.py @@ -0,0 +1,443 @@ +# Copyright 2016-2017 Red Hat Inc & Xena Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Contributors: +# Dan Amzulescu, Xena Networks +# Christian Trautman, Red Hat Inc. +# +# Usage can be seen below in unit test. This implementation is designed for one +# module two port Xena chassis runs only. + +""" +Xena JSON module +""" + +from collections import OrderedDict +import locale +import logging +import os + +import scapy.layers.inet as inet + +from tools.pkt_gen.xena.json import json_utilities + +_LOGGER = logging.getLogger(__name__) +_LOCALE = locale.getlocale()[1] + +_CURR_DIR = os.path.dirname(os.path.realpath(__file__)) + + +class XenaJSON(object): + """ + Parent Class to modify and read Xena JSON configuration files. + """ + def __init__(self, + json_path=os.path.join( + _CURR_DIR, '../profiles/baseconfig.x2544')): + """ + Constructor + :param json_path: path to JSON file to read. Expected files must have + two module ports with each port having its own stream config profile. + :return: XenaJSON object + """ + self.json_data = json_utilities.read_json_file(json_path) + + self.packet_data = OrderedDict() + self.packet_data['layer2'] = None + self.packet_data['vlan'] = None + self.packet_data['layer3'] = None + self.packet_data['layer4'] = None + + def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer): + """ + Add the multi stream layers to the json file based on the layer provided + :param entity: Entity to append the segment to in entity list + :param seg_uuid: The UUID to attach the multistream layer to + :param stop_value: The number of flows to configure + :param layer: the layer that the multistream will be attached to + :return: None + """ + field_name = { + 2: ('Dst MAC addr', 'Src MAC addr'), + 3: ('Dest IP Addr', 'Src IP Addr'), + 4: ('Dest Port', 'Src Port') + } + segments = [ + { + "Offset": 0, + "Mask": "//8=", # mask of 255/255 + "Action": "INC", + "StartValue": 0, + "StopValue": stop_value, + "StepValue": 1, + "RepeatCount": 1, + "SegmentId": seg_uuid, + "FieldName": field_name[int(layer)][0] + }, + { + "Offset": 0, + "Mask": "//8=", # mask of 255/255 + "Action": "INC", + "StartValue": 0, + "StopValue": stop_value, + "StepValue": 1, + "RepeatCount": 1, + "SegmentId": seg_uuid, + "FieldName": field_name[int(layer)][1] + } + ] + + self.json_data['StreamProfileHandler']['EntityList'][entity][ + 'StreamConfig']['HwModifiers'] = (segments) + + def _create_packet_header(self): + """ + Create the scapy packet header based on what has been built in this + instance using the set header methods. Return tuple of the two byte + arrays, one for each port. + :return: Scapy packet headers as bytearrays + """ + if not self.packet_data['layer2']: + _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file') + self.set_header_layer2() + packet1, packet2 = (self.packet_data['layer2'][0], + self.packet_data['layer2'][1]) + for packet_header in list(self.packet_data.copy().values())[1:]: + if packet_header: + packet1 /= packet_header[0] + packet2 /= packet_header[1] + ret = (bytes(packet1), bytes(packet2)) + return ret + + def add_header_segments(self, flows=0, multistream_layer=None): + """ + Build the header segments to write to the JSON file. + :param flows: Number of flows to configure for multistream if enabled + :param multistream_layer: layer to set multistream flows as string. + Acceptable values are L2, L3 or L4 + :return: None + """ + packet = self._create_packet_header() + segment1 = list() + segment2 = list() + header_pos = 0 + if self.packet_data['layer2']: + # slice out the layer 2 bytes from the packet header byte array + layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])] + seg = json_utilities.create_segment( + "ETHERNET", json_utilities.encode_byte_array(layer2).decode( + _LOCALE)) + if multistream_layer == 'L2' and flows > 0: + self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'], + stop_value=flows, layer=2) + segment1.append(seg) + # now do the other port data with reversed src, dst info + layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])] + seg = json_utilities.create_segment( + "ETHERNET", json_utilities.encode_byte_array(layer2).decode( + _LOCALE)) + segment2.append(seg) + if multistream_layer == 'L2' and flows > 0: + self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'], + stop_value=flows, layer=2) + header_pos = len(layer2) + if self.packet_data['vlan']: + # slice out the vlan bytes from the packet header byte array + vlan = packet[0][header_pos: len( + self.packet_data['vlan'][0]) + header_pos] + segment1.append(json_utilities.create_segment( + "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE))) + segment2.append(json_utilities.create_segment( + "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE))) + header_pos += len(vlan) + if self.packet_data['layer3']: + # slice out the layer 3 bytes from the packet header byte array + layer3 = packet[0][header_pos: len( + self.packet_data['layer3'][0]) + header_pos] + seg = json_utilities.create_segment( + "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE)) + segment1.append(seg) + if multistream_layer == 'L3' and flows > 0: + self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'], + stop_value=flows, layer=3) + # now do the other port data with reversed src, dst info + layer3 = packet[1][header_pos: len( + self.packet_data['layer3'][1]) + header_pos] + seg = json_utilities.create_segment( + "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE)) + segment2.append(seg) + if multistream_layer == 'L3' and flows > 0: + self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'], + stop_value=flows, layer=3) + header_pos += len(layer3) + if self.packet_data['layer4']: + # slice out the layer 4 bytes from the packet header byte array + layer4 = packet[0][header_pos: len( + self.packet_data['layer4'][0]) + header_pos] + seg = json_utilities.create_segment( + "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE)) + segment1.append(seg) + if multistream_layer == 'L4' and flows > 0: + self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'], + stop_value=flows, layer=4) + # now do the other port data with reversed src, dst info + layer4 = packet[1][header_pos: len( + self.packet_data['layer4'][1]) + header_pos] + seg = json_utilities.create_segment( + "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE)) + segment2.append(seg) + if multistream_layer == 'L4' and flows > 0: + self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'], + stop_value=flows, layer=4) + header_pos += len(layer4) + + self.json_data['StreamProfileHandler']['EntityList'][0][ + 'StreamConfig']['HeaderSegments'] = segment1 + self.json_data['StreamProfileHandler']['EntityList'][1][ + 'StreamConfig']['HeaderSegments'] = segment2 + + def disable_back2back_test(self): + """ + Disable the rfc2544 back to back test + :return: None + """ + self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][ + 'Enabled'] = 'false' + + def disable_throughput_test(self): + """ + Disable the rfc2544 throughput test + :return: None + """ + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'Enabled'] = 'false' + + def enable_back2back_test(self): + """ + Enable the rfc2544 back to back test + :return: None + """ + self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][ + 'Enabled'] = 'true' + + def enable_throughput_test(self): + """ + Enable the rfc2544 throughput test + :return: None + """ + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'Enabled'] = 'true' + # pylint: disable=too-many-arguments + def modify_2544_tput_options(self, initial_value, minimum_value, + maximum_value, value_resolution, + use_pass_threshhold, pass_threshhold): + """ + modify_2544_tput_options + """ + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['InitialValue'] = initial_value + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['MinimumValue'] = minimum_value + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['MaximumValue'] = maximum_value + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['ValueResolution'] = value_resolution + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['UsePassThreshold'] = use_pass_threshhold + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['PassThreshold'] = pass_threshhold + + def set_chassis_info(self, hostname, pwd): + """ + Set the chassis info + :param hostname: hostname as string of ip + :param pwd: password to chassis as string + :return: None + """ + self.json_data['ChassisManager']['ChassisList'][0][ + 'HostName'] = hostname + self.json_data['ChassisManager']['ChassisList'][0][ + 'Password'] = pwd + + def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc', + src_mac='bb:bb:bb:bb:bb:bb', **kwargs): + """ + Build a scapy Ethernet L2 objects inside instance packet_data structure + :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa" + :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb" + :param kwargs: Extra params per scapy usage. + :return: None + """ + self.packet_data['layer2'] = [ + inet.Ether(dst=dst_mac, src=src_mac, **kwargs), + inet.Ether(dst=src_mac, src=dst_mac, **kwargs)] + + def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3', + protocol='UDP', **kwargs): + """ + Build scapy IPV4 L3 objects inside instance packet_data structure + :param src_ip: source IP as string in dot notation format + :param dst_ip: destination IP as string in dot notation format + :param protocol: protocol for l4 + :param kwargs: Extra params per scapy usage + :return: None + """ + self.packet_data['layer3'] = [ + inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs), + inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)] + + def set_header_layer4_udp(self, source_port, destination_port, **kwargs): + """ + Build scapy UDP L4 objects inside instance packet_data structure + :param source_port: Source port as int + :param destination_port: Destination port as int + :param kwargs: Extra params per scapy usage + :return: None + """ + self.packet_data['layer4'] = [ + inet.UDP(sport=source_port, dport=destination_port, **kwargs), + inet.UDP(sport=source_port, dport=destination_port, **kwargs)] + + def set_header_vlan(self, vlan_id=1, **kwargs): + """ + Build a Dot1Q scapy object inside instance packet_data structure + :param vlan_id: The VLAN ID + :param kwargs: Extra params per scapy usage + :return: None + """ + self.packet_data['vlan'] = [ + inet.Dot1Q(vlan=vlan_id, **kwargs), + inet.Dot1Q(vlan=vlan_id, **kwargs)] + + def set_port(self, index, module, port): + """ + Set the module and port for the 0 index port to use with the test + :param index: Index of port to set, 0 = port1, 1=port2, etc.. + :param module: module location as int + :param port: port location in module as int + :return: None + """ + self.json_data['PortHandler']['EntityList'][index]['PortRef'][ + 'ModuleIndex'] = module + self.json_data['PortHandler']['EntityList'][index]['PortRef'][ + 'PortIndex'] = port + + def set_port_ip_v4(self, port, ip_addr, netmask, gateway): + """ + Set the port IP info + :param port: port number as int of port to set ip info + :param ip_addr: ip address in dot notation format as string + :param netmask: cidr number for netmask (ie 24/16/8) as int + :param gateway: gateway address in dot notation format + :return: None + """ + available_ports = range(len( + self.json_data['PortHandler']['EntityList'])) + if port not in available_ports: + raise ValueError("{}{}{}".format( + 'Port assignment must be an available port ', + 'number in baseconfig file. Port=', port)) + self.json_data['PortHandler']['EntityList'][ + port]["IpV4Address"] = ip_addr + self.json_data['PortHandler']['EntityList'][ + port]["IpV4Gateway"] = gateway + self.json_data['PortHandler']['EntityList'][ + port]["IpV4RoutingPrefix"] = int(netmask) + + def set_port_ip_v6(self, port, ip_addr, netmask, gateway): + """ + Set the port IP info + :param port: port number as int of port to set ip info + :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated + by a colon. + :param netmask: cidr number for netmask (ie 24/16/8) as int + :param gateway: gateway address as string in 8 group of 4 hexadecimal + groups separated by a colon. + :return: None + """ + available_ports = range(len( + self.json_data['PortHandler']['EntityList'])) + if port not in available_ports: + raise ValueError("{}{}{}".format( + 'Port assignment must be an available port ', + 'number in baseconfig file. Port=', port)) + self.json_data['PortHandler']['EntityList'][ + port]["IpV6Address"] = ip_addr + self.json_data['PortHandler']['EntityList'][ + port]["IpV6Gateway"] = gateway + self.json_data['PortHandler']['EntityList'][ + port]["IpV6RoutingPrefix"] = int(netmask) + + def set_test_options_tput(self, packet_sizes, duration, iterations, + loss_rate, micro_tpld=False): + """ + Set the tput test options + :param packet_sizes: List of packet sizes to test, single int entry is + acceptable for one packet size testing + :param duration: time for each test in seconds as int + :param iterations: number of iterations of testing as int + :param loss_rate: acceptable loss rate as float + :param micro_tpld: boolean if micro_tpld should be enabled or disabled + :return: None + """ + if isinstance(packet_sizes, int): + packet_sizes = [packet_sizes] + self.json_data['TestOptions']['PacketSizes'][ + 'CustomPacketSizes'] = packet_sizes + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'Duration'] = duration + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'RateIterationOptions']['AcceptableLoss'] = loss_rate + self.json_data['TestOptions']['FlowCreationOptions'][ + 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false' + self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][ + 'Iterations'] = iterations + + def set_test_options_back2back(self, packet_sizes, duration, + iterations, startvalue, endvalue, + micro_tpld=False): + """ + Set the back2back test options + :param packet_sizes: List of packet sizes to test, single int entry is + acceptable for one packet size testing + :param duration: time for each test in seconds as int + :param iterations: number of iterations of testing as int + :param micro_tpld: boolean if micro_tpld should be enabled or disabled + :param StartValue: start value + :param EndValue: end value + :return: None + """ + if isinstance(packet_sizes, int): + packet_sizes = [packet_sizes] + self.json_data['TestOptions']['PacketSizes'][ + 'CustomPacketSizes'] = packet_sizes + self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][ + 'Duration'] = duration + self.json_data['TestOptions']['FlowCreationOptions'][ + 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false' + self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][ + 'Iterations'] = iterations + self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][ + 'RateSweepOptions']['StartValue'] = startvalue + self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][ + 'RateSweepOptions']['EndValue'] = endvalue + + def write_config(self, path='./2bUsed.x2544'): + """ + Write the config to out as file + :param path: Output file to export the json data to + :return: None + """ + if not json_utilities.write_json_file(self.json_data, path): + raise RuntimeError("Could not write out file, please check config") diff --git a/tools/pkt_gen/xena/json/xena_json_blocks.py b/tools/pkt_gen/xena/json/xena_json_blocks.py new file mode 100644 index 00000000..d0a0864c --- /dev/null +++ b/tools/pkt_gen/xena/json/xena_json_blocks.py @@ -0,0 +1,50 @@ +# Copyright 2017 Red Hat Inc & Xena Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Contributors: +# Dan Amzulescu, Xena Networks +# Christian Trautman, Red Hat Inc. +# +# Usage can be seen below in unit test. This implementation is designed for one +# module two port Xena chassis runs only. + +""" +Xena JSON module for blocks topology +""" + +from tools.pkt_gen.xena.json.xena_json import XenaJSON + + +class XenaJSONBlocks(XenaJSON): + """ + Class to modify and read Xena JSON configuration files. Topology will be set + as Blocks + """ + def __init__(self): + super(XenaJSONBlocks, self).__init__() + self.set_topology_blocks() + + def set_topology_blocks(self): + """ + Set the test topology to a West to East config for half duplex flow with + port 0 as the sender and port 1 as the receiver. + :return: None + """ + self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS' + self.json_data['TestOptions']['TopologyConfig'][ + 'Direction'] = 'WEST_EAST' + self.json_data['PortHandler']['EntityList'][0][ + 'PortGroup'] = "WEST" + self.json_data['PortHandler']['EntityList'][1][ + 'PortGroup'] = "EAST" diff --git a/tools/pkt_gen/xena/json/xena_json_mesh.py b/tools/pkt_gen/xena/json/xena_json_mesh.py new file mode 100644 index 00000000..5d6a1f13 --- /dev/null +++ b/tools/pkt_gen/xena/json/xena_json_mesh.py @@ -0,0 +1,46 @@ +# Copyright 2017 Red Hat Inc & Xena Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Contributors: +# Dan Amzulescu, Xena Networks +# Christian Trautman, Red Hat Inc. +# +# Usage can be seen below in unit test. This implementation is designed for one +# module two port Xena chassis runs only. + +""" +Xena JSON module for mesh topology. +""" + +from tools.pkt_gen.xena.json.xena_json import XenaJSON + + +class XenaJSONMesh(XenaJSON): + """ + Class to modify and read Xena JSON configuration files. Topology will be set + as Mesh. + """ + def __init__(self): + super(XenaJSONMesh, self).__init__() + self.set_topology_mesh() + + def set_topology_mesh(self): + """ + Set the test topology to Mesh for bi directional full duplex flow + :return: None + """ + self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH' + self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR' + self.json_data['PortHandler']['EntityList'][0]['PortGroup'] = "UNDEFINED" + self.json_data['PortHandler']['EntityList'][1]['PortGroup'] = "UNDEFINED" diff --git a/tools/pkt_gen/xena/json/xena_json_pairs.py b/tools/pkt_gen/xena/json/xena_json_pairs.py new file mode 100644 index 00000000..4f97f0ba --- /dev/null +++ b/tools/pkt_gen/xena/json/xena_json_pairs.py @@ -0,0 +1,60 @@ +# Copyright 2017 Red Hat Inc & Xena Networks. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Contributors: +# Dan Amzulescu, Xena Networks +# Christian Trautman, Red Hat Inc. +# +# Usage can be seen below in unit test. This implementation is designed for one +# module two port Xena chassis runs only. + +""" +Xena JSON module for pairs topology +""" + +from tools.pkt_gen.xena.json.xena_json import XenaJSON + + +class XenaJSONPairs(XenaJSON): + """ + Class to modify and read Xena JSON configuration files. Topology will be set + as Blocks + """ + def __init__(self): + super(XenaJSONPairs, self).__init__() + self.set_topology_pairs() + + def set_topology_pairs(self): + """ + Set the test topology to loopback pairs for packets that need to return + to the same port they are sent from + :return: None + """ + self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'PAIRS' + self.json_data['TestOptions']['TopologyConfig'][ + 'Direction'] = 'WEST_EAST' + self.json_data['PortHandler']['EntityList'][0][ + 'PortGroup'] = "WEST" + self.json_data['PortHandler']['EntityList'][1][ + 'PortGroup'] = "WEST" + + def remove_port(self, port_number): + """ + Remove a port from the config. + :param port_number: port to remove from json config as int + :return: None + """ + del self.json_data['PortHandler']['EntityList'][port_number] + del self.json_data['StreamHandler']['StreamConnectionList'][0][ + 'Port{}Id'.format(port_number + 1)] |