summaryrefslogtreecommitdiffstats
path: root/tools/pkt_gen/xena/xena_json.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/pkt_gen/xena/xena_json.py')
-rw-r--r--tools/pkt_gen/xena/xena_json.py643
1 files changed, 0 insertions, 643 deletions
diff --git a/tools/pkt_gen/xena/xena_json.py b/tools/pkt_gen/xena/xena_json.py
deleted file mode 100644
index 50d0e2fe..00000000
--- a/tools/pkt_gen/xena/xena_json.py
+++ /dev/null
@@ -1,643 +0,0 @@
-# Copyright 2016-2017 Red Hat Inc & Xena Networks.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Contributors:
-# Dan Amzulescu, Xena Networks
-# Christian Trautman, Red Hat Inc.
-#
-# Usage can be seen below in unit test. This implementation is designed for one
-# module two port Xena chassis runs only.
-
-"""
-Xena JSON module
-"""
-
-import base64
-from collections import OrderedDict
-import json
-import locale
-import logging
-import uuid
-
-import scapy.layers.inet as inet
-
-_LOGGER = logging.getLogger(__name__)
-_LOCALE = locale.getlocale()[1]
-
-
-class XenaJSON(object):
- """
- Class to modify and read Xena JSON configuration files.
- """
- def __init__(self, json_path='./profiles/baseconfig.x2544'):
- """
- Constructor
- :param json_path: path to JSON file to read. Expected files must have
- two module ports with each port having its own stream config profile.
- :return: XenaJSON object
- """
- self.json_data = read_json_file(json_path)
-
- self.packet_data = OrderedDict()
- self.packet_data['layer2'] = None
- self.packet_data['vlan'] = None
- self.packet_data['layer3'] = None
- self.packet_data['layer4'] = None
-
- def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
- """
- Add the multi stream layers to the json file based on the layer provided
- :param entity: Entity to append the segment to in entity list
- :param seg_uuid: The UUID to attach the multistream layer to
- :param stop_value: The number of flows to configure
- :param layer: the layer that the multistream will be attached to
- :return: None
- """
- field_name = {
- 2: ('Dst MAC addr', 'Src MAC addr'),
- 3: ('Dest IP Addr', 'Src IP Addr'),
- 4: ('Dest Port', 'Src Port')
- }
- segments = [
- {
- "Offset": 0,
- "Mask": "//8=", # mask of 255/255
- "Action": "INC",
- "StartValue": 0,
- "StopValue": stop_value,
- "StepValue": 1,
- "RepeatCount": 1,
- "SegmentId": seg_uuid,
- "FieldName": field_name[int(layer)][0]
- },
- {
- "Offset": 0,
- "Mask": "//8=", # mask of 255/255
- "Action": "INC",
- "StartValue": 0,
- "StopValue": stop_value,
- "StepValue": 1,
- "RepeatCount": 1,
- "SegmentId": seg_uuid,
- "FieldName": field_name[int(layer)][1]
- }
- ]
-
- self.json_data['StreamProfileHandler']['EntityList'][entity][
- 'StreamConfig']['HwModifiers'] = (segments)
-
- def _create_packet_header(self):
- """
- Create the scapy packet header based on what has been built in this
- instance using the set header methods. Return tuple of the two byte
- arrays, one for each port.
- :return: Scapy packet headers as bytearrays
- """
- if not self.packet_data['layer2']:
- _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
- self.set_header_layer2()
- packet1, packet2 = (self.packet_data['layer2'][0],
- self.packet_data['layer2'][1])
- for packet_header in list(self.packet_data.copy().values())[1:]:
- if packet_header:
- packet1 /= packet_header[0]
- packet2 /= packet_header[1]
- ret = (bytes(packet1), bytes(packet2))
- return ret
-
- def add_header_segments(self, flows=0, multistream_layer=None):
- """
- Build the header segments to write to the JSON file.
- :param flows: Number of flows to configure for multistream if enabled
- :param multistream_layer: layer to set multistream flows as string.
- Acceptable values are L2, L3 or L4
- :return: None
- """
- packet = self._create_packet_header()
- segment1 = list()
- segment2 = list()
- header_pos = 0
- if self.packet_data['layer2']:
- # slice out the layer 2 bytes from the packet header byte array
- layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
- seg = create_segment(
- "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
- if multistream_layer == 'L2' and flows > 0:
- self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
- stop_value=flows, layer=2)
- segment1.append(seg)
- # now do the other port data with reversed src, dst info
- layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
- seg = create_segment(
- "ETHERNET", encode_byte_array(layer2).decode(_LOCALE))
- segment2.append(seg)
- if multistream_layer == 'L2' and flows > 0:
- self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
- stop_value=flows, layer=2)
- header_pos = len(layer2)
- if self.packet_data['vlan']:
- # slice out the vlan bytes from the packet header byte array
- vlan = packet[0][header_pos: len(
- self.packet_data['vlan'][0]) + header_pos]
- segment1.append(create_segment(
- "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
- segment2.append(create_segment(
- "VLAN", encode_byte_array(vlan).decode(_LOCALE)))
- header_pos += len(vlan)
- if self.packet_data['layer3']:
- # slice out the layer 3 bytes from the packet header byte array
- layer3 = packet[0][header_pos: len(
- self.packet_data['layer3'][0]) + header_pos]
- seg = create_segment(
- "IP", encode_byte_array(layer3).decode(_LOCALE))
- segment1.append(seg)
- if multistream_layer == 'L3' and flows > 0:
- self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
- stop_value=flows, layer=3)
- # now do the other port data with reversed src, dst info
- layer3 = packet[1][header_pos: len(
- self.packet_data['layer3'][1]) + header_pos]
- seg = create_segment(
- "IP", encode_byte_array(layer3).decode(_LOCALE))
- segment2.append(seg)
- if multistream_layer == 'L3' and flows > 0:
- self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
- stop_value=flows, layer=3)
- header_pos += len(layer3)
- if self.packet_data['layer4']:
- # slice out the layer 4 bytes from the packet header byte array
- layer4 = packet[0][header_pos: len(
- self.packet_data['layer4'][0]) + header_pos]
- seg = create_segment(
- "UDP", encode_byte_array(layer4).decode(_LOCALE))
- segment1.append(seg)
- if multistream_layer == 'L4' and flows > 0:
- self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
- stop_value=flows, layer=4)
- # now do the other port data with reversed src, dst info
- layer4 = packet[1][header_pos: len(
- self.packet_data['layer4'][1]) + header_pos]
- seg = create_segment(
- "UDP", encode_byte_array(layer4).decode(_LOCALE))
- segment2.append(seg)
- if multistream_layer == 'L4' and flows > 0:
- self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
- stop_value=flows, layer=4)
- header_pos += len(layer4)
-
- self.json_data['StreamProfileHandler']['EntityList'][0][
- 'StreamConfig']['HeaderSegments'] = segment1
- self.json_data['StreamProfileHandler']['EntityList'][1][
- 'StreamConfig']['HeaderSegments'] = segment2
-
- def disable_back2back_test(self):
- """
- Disable the rfc2544 back to back test
- :return: None
- """
- self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
- 'Enabled'] = 'false'
-
- def disable_throughput_test(self):
- """
- Disable the rfc2544 throughput test
- :return: None
- """
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'Enabled'] = 'false'
-
- def enable_back2back_test(self):
- """
- Enable the rfc2544 back to back test
- :return: None
- """
- self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
- 'Enabled'] = 'true'
-
- def enable_throughput_test(self):
- """
- Enable the rfc2544 throughput test
- :return: None
- """
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'Enabled'] = 'true'
- # pylint: disable=too-many-arguments
- def modify_2544_tput_options(self, initial_value, minimum_value,
- maximum_value, value_resolution,
- use_pass_threshhold, pass_threshhold):
- """
- modify_2544_tput_options
- """
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['InitialValue'] = initial_value
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['MinimumValue'] = minimum_value
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['MaximumValue'] = maximum_value
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['ValueResolution'] = value_resolution
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['UsePassThreshold'] = use_pass_threshhold
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['PassThreshold'] = pass_threshhold
-
- def set_chassis_info(self, hostname, pwd):
- """
- Set the chassis info
- :param hostname: hostname as string of ip
- :param pwd: password to chassis as string
- :return: None
- """
- self.json_data['ChassisManager']['ChassisList'][0][
- 'HostName'] = hostname
- self.json_data['ChassisManager']['ChassisList'][0][
- 'Password'] = pwd
-
- def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
- src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
- """
- Build a scapy Ethernet L2 objects inside instance packet_data structure
- :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
- :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
- :param kwargs: Extra params per scapy usage.
- :return: None
- """
- self.packet_data['layer2'] = [
- inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
- inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
-
- def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
- protocol='UDP', **kwargs):
- """
- Build scapy IPV4 L3 objects inside instance packet_data structure
- :param src_ip: source IP as string in dot notation format
- :param dst_ip: destination IP as string in dot notation format
- :param protocol: protocol for l4
- :param kwargs: Extra params per scapy usage
- :return: None
- """
- self.packet_data['layer3'] = [
- inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
- inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
-
- def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
- """
- Build scapy UDP L4 objects inside instance packet_data structure
- :param source_port: Source port as int
- :param destination_port: Destination port as int
- :param kwargs: Extra params per scapy usage
- :return: None
- """
- self.packet_data['layer4'] = [
- inet.UDP(sport=source_port, dport=destination_port, **kwargs),
- inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
-
- def set_header_vlan(self, vlan_id=1, **kwargs):
- """
- Build a Dot1Q scapy object inside instance packet_data structure
- :param vlan_id: The VLAN ID
- :param kwargs: Extra params per scapy usage
- :return: None
- """
- self.packet_data['vlan'] = [
- inet.Dot1Q(vlan=vlan_id, **kwargs),
- inet.Dot1Q(vlan=vlan_id, **kwargs)]
-
- def set_port(self, index, module, port):
- """
- Set the module and port for the 0 index port to use with the test
- :param index: Index of port to set, 0 = port1, 1=port2, etc..
- :param module: module location as int
- :param port: port location in module as int
- :return: None
- """
- self.json_data['PortHandler']['EntityList'][index]['PortRef'][
- 'ModuleIndex'] = module
- self.json_data['PortHandler']['EntityList'][index]['PortRef'][
- 'PortIndex'] = port
-
- def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
- """
- Set the port IP info
- :param port: port number as int of port to set ip info
- :param ip_addr: ip address in dot notation format as string
- :param netmask: cidr number for netmask (ie 24/16/8) as int
- :param gateway: gateway address in dot notation format
- :return: None
- """
- available_ports = range(len(
- self.json_data['PortHandler']['EntityList']))
- if port not in available_ports:
- raise ValueError("{}{}{}".format(
- 'Port assignment must be an available port ',
- 'number in baseconfig file. Port=', port))
- self.json_data['PortHandler']['EntityList'][
- port]["IpV4Address"] = ip_addr
- self.json_data['PortHandler']['EntityList'][
- port]["IpV4Gateway"] = gateway
- self.json_data['PortHandler']['EntityList'][
- port]["IpV4RoutingPrefix"] = int(netmask)
-
- def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
- """
- Set the port IP info
- :param port: port number as int of port to set ip info
- :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
- by a colon.
- :param netmask: cidr number for netmask (ie 24/16/8) as int
- :param gateway: gateway address as string in 8 group of 4 hexadecimal
- groups separated by a colon.
- :return: None
- """
- available_ports = range(len(
- self.json_data['PortHandler']['EntityList']))
- if port not in available_ports:
- raise ValueError("{}{}{}".format(
- 'Port assignment must be an available port ',
- 'number in baseconfig file. Port=', port))
- self.json_data['PortHandler']['EntityList'][
- port]["IpV6Address"] = ip_addr
- self.json_data['PortHandler']['EntityList'][
- port]["IpV6Gateway"] = gateway
- self.json_data['PortHandler']['EntityList'][
- port]["IpV6RoutingPrefix"] = int(netmask)
-
- def set_test_options_tput(self, packet_sizes, duration, iterations,
- loss_rate, micro_tpld=False):
- """
- Set the tput test options
- :param packet_sizes: List of packet sizes to test, single int entry is
- acceptable for one packet size testing
- :param duration: time for each test in seconds as int
- :param iterations: number of iterations of testing as int
- :param loss_rate: acceptable loss rate as float
- :param micro_tpld: boolean if micro_tpld should be enabled or disabled
- :return: None
- """
- if isinstance(packet_sizes, int):
- packet_sizes = [packet_sizes]
- self.json_data['TestOptions']['PacketSizes'][
- 'CustomPacketSizes'] = packet_sizes
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'Duration'] = duration
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'RateIterationOptions']['AcceptableLoss'] = loss_rate
- self.json_data['TestOptions']['FlowCreationOptions'][
- 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
- self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
- 'Iterations'] = iterations
-
- def set_test_options_back2back(self, packet_sizes, duration,
- iterations, startvalue, endvalue,
- micro_tpld=False):
- """
- Set the back2back test options
- :param packet_sizes: List of packet sizes to test, single int entry is
- acceptable for one packet size testing
- :param duration: time for each test in seconds as int
- :param iterations: number of iterations of testing as int
- :param micro_tpld: boolean if micro_tpld should be enabled or disabled
- :param StartValue: start value
- :param EndValue: end value
- :return: None
- """
- if isinstance(packet_sizes, int):
- packet_sizes = [packet_sizes]
- self.json_data['TestOptions']['PacketSizes'][
- 'CustomPacketSizes'] = packet_sizes
- self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
- 'Duration'] = duration
- self.json_data['TestOptions']['FlowCreationOptions'][
- 'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
- self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
- 'Iterations'] = iterations
- self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
- 'RateSweepOptions']['StartValue'] = startvalue
- self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
- 'RateSweepOptions']['EndValue'] = endvalue
-
- def set_topology_blocks(self):
- """
- Set the test topology to a West to East config for half duplex flow with
- port 0 as the sender and port 1 as the receiver.
- :return: None
- """
- self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'BLOCKS'
- self.json_data['TestOptions']['TopologyConfig'][
- 'Direction'] = 'WEST_EAST'
- self.json_data['PortHandler']['EntityList'][0][
- 'PortGroup'] = "WEST"
- self.json_data['PortHandler']['EntityList'][1][
- 'PortGroup'] = "EAST"
-
- def set_topology_mesh(self):
- """
- Set the test topology to Mesh for bi directional full duplex flow
- :return: None
- """
- self.json_data['TestOptions']['TopologyConfig']['Topology'] = 'MESH'
- self.json_data['TestOptions']['TopologyConfig']['Direction'] = 'BIDIR'
- self.json_data['PortHandler']['EntityList'][0][
- 'PortGroup'] = "UNDEFINED"
- self.json_data['PortHandler']['EntityList'][1][
- 'PortGroup'] = "UNDEFINED"
-
- def write_config(self, path='./2bUsed.x2544'):
- """
- Write the config to out as file
- :param path: Output file to export the json data to
- :return: None
- """
- if not write_json_file(self.json_data, path):
- raise RuntimeError("Could not write out file, please check config")
-
-
-def create_segment(header_type, encode_64_string):
- """
- Create segment for JSON file
- :param header_type: Type of header as string
- :param encode_64_string: 64 byte encoded string value of the hex bytes
- :return: segment as dictionary
- """
- return {
- "SegmentType": header_type.upper(),
- "SegmentValue": encode_64_string,
- "ItemID": str(uuid.uuid4()),
- "ParentID": "",
- "Label": ""}
-
-
-def decode_byte_array(enc_str):
- """ Decodes the base64-encoded string to a byte array
- :param enc_str: The base64-encoded string representing a byte array
- :return: The decoded byte array
- """
- dec_string = base64.b64decode(enc_str)
- barray = bytearray()
- barray.extend(dec_string)
- return barray
-
-
-def encode_byte_array(byte_arr):
- """ Encodes the byte array as a base64-encoded string
- :param byte_arr: A bytearray containing the bytes to convert
- :return: A base64 encoded string
- """
- enc_string = base64.b64encode(bytes(byte_arr))
- return enc_string
-
-
-def print_json_report(json_data):
- """
- Print out info from the json data for testing purposes only.
- :param json_data: json loaded data from json.loads
- :return: None
- """
- print("<<Xena JSON Config Report>>\n")
- try:
- print("### Chassis Info ###")
- print("Chassis IP: {}".format(json_data['ChassisManager'][
- 'ChassisList'][0]['HostName']))
- print("Chassis Password: {}".format(json_data['ChassisManager'][
- 'ChassisList'][0]['Password']))
- print("### Port Configuration ###")
- print("Port 1 IPv4:{}/{} gateway:{}".format(
- json_data['PortHandler']['EntityList'][0]["IpV4Address"],
- json_data['PortHandler']['EntityList'][0]["IpV4RoutingPrefix"],
- json_data['PortHandler']['EntityList'][0]["IpV4Gateway"]))
- print("Port 1 IPv6:{}/{} gateway:{}".format(
- json_data['PortHandler']['EntityList'][0]["IpV6Address"],
- json_data['PortHandler']['EntityList'][0]["IpV6RoutingPrefix"],
- json_data['PortHandler']['EntityList'][0]["IpV6Gateway"]))
- print("Port 2 IPv4:{}/{} gateway:{}".format(
- json_data['PortHandler']['EntityList'][1]["IpV4Address"],
- json_data['PortHandler']['EntityList'][1]["IpV4RoutingPrefix"],
- json_data['PortHandler']['EntityList'][1]["IpV4Gateway"]))
- print("Port 2 IPv6:{}/{} gateway:{}".format(
- json_data['PortHandler']['EntityList'][1]["IpV6Address"],
- json_data['PortHandler']['EntityList'][1]["IpV6RoutingPrefix"],
- json_data['PortHandler']['EntityList'][1]["IpV6Gateway"]))
- print("Port 1: {}/{} group: {}".format(
- json_data['PortHandler']['EntityList'][0]['PortRef']['ModuleIndex'],
- json_data['PortHandler']['EntityList'][0]['PortRef']['PortIndex'],
- json_data['PortHandler']['EntityList'][0]['PortGroup']))
- print("Port 2: {}/{} group: {}".format(
- json_data['PortHandler']['EntityList'][1]['PortRef']['ModuleIndex'],
- json_data['PortHandler']['EntityList'][1]['PortRef']['PortIndex'],
- json_data['PortHandler']['EntityList'][1]['PortGroup']))
- print("### Tests Enabled ###")
- print("Back2Back Enabled: {}".format(json_data['TestOptions'][
- 'TestTypeOptionMap']['Back2Back']['Enabled']))
- print("Throughput Enabled: {}".format(json_data['TestOptions'][
- 'TestTypeOptionMap']['Throughput']['Enabled']))
- print("### Test Options ###")
- print("Test topology: {}/{}".format(
- json_data['TestOptions']['TopologyConfig']['Topology'],
- json_data['TestOptions']['TopologyConfig']['Direction']))
- print("Packet Sizes: {}".format(json_data['TestOptions'][
- 'PacketSizes']['CustomPacketSizes']))
- print("Test duration: {}".format(json_data['TestOptions'][
- 'TestTypeOptionMap']['Throughput']['Duration']))
- print("Acceptable loss rate: {}".format(json_data['TestOptions'][
- 'TestTypeOptionMap']['Throughput']['RateIterationOptions'][
- 'AcceptableLoss']))
- print("Micro TPLD enabled: {}".format(json_data['TestOptions'][
- 'FlowCreationOptions']['UseMicroTpldOnDemand']))
- print("Test iterations: {}".format(json_data['TestOptions'][
- 'TestTypeOptionMap']['Throughput']['Iterations']))
- if 'StreamConfig' in json_data['StreamProfileHandler']['EntityList'][0]:
- print("### Header segments ###")
- for seg in json_data['StreamProfileHandler']['EntityList']:
- for header in seg['StreamConfig']['HeaderSegments']:
- print("Type: {}".format(
- header['SegmentType']))
- print("Value: {}".format(decode_byte_array(
- header['SegmentValue'])))
- print("### Multi Stream config ###")
- for seg in json_data['StreamProfileHandler']['EntityList']:
- for header in seg['StreamConfig']['HwModifiers']:
- print(header)
- except KeyError as exc:
- print("Error setting not found in JSON data: {}".format(exc))
-
-
-def read_json_file(json_file):
- """
- Read the json file path and return a dictionary of the data
- :param json_file: path to json file
- :return: dictionary of json data
- """
- try:
- with open(json_file, 'r', encoding=_LOCALE) as data_file:
- file_data = json.loads(data_file.read())
- except ValueError as exc:
- # general json exception, Python 3.5 adds new exception type
- _LOGGER.exception("Exception with json read: %s", exc)
- raise
- except IOError as exc:
- _LOGGER.exception(
- 'Exception during file open: %s file=%s', exc, json_file)
- raise
- return file_data
-
-
-def write_json_file(json_data, output_path):
- """
- Write out the dictionary of data to a json file
- :param json_data: dictionary of json data
- :param output_path: file path to write output
- :return: Boolean if success
- """
- try:
- with open(output_path, 'w', encoding=_LOCALE) as fileh:
- json.dump(json_data, fileh, indent=2, sort_keys=True,
- ensure_ascii=True)
- return True
- except ValueError as exc:
- # general json exception, Python 3.5 adds new exception type
- _LOGGER.exception(
- "Exception with json write: %s", exc)
- return False
- except IOError as exc:
- _LOGGER.exception(
- 'Exception during file write: %s file=%s', exc, output_path)
- return False
-
-
-if __name__ == "__main__":
- print("Running UnitTest for XenaJSON")
- JSON = XenaJSON()
- print_json_report(JSON.json_data)
- JSON.set_chassis_info('192.168.0.5', 'vsperf')
- JSON.set_port(0, 1, 0)
- JSON.set_port(1, 1, 1)
- JSON.set_port_ip_v4(0, '192.168.240.10', 32, '192.168.240.1')
- JSON.set_port_ip_v4(1, '192.168.240.11', 32, '192.168.240.1')
- JSON.set_port_ip_v6(0, 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:a8a8', 128,
- 'a1a1:a2a2:a3a3:a4a4:a5a5:a6a6:a7a7:1111')
- JSON.set_port_ip_v6(1, 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:b8b8', 128,
- 'b1b1:b2b2:b3b3:b4b4:b5b5:b6b6:b7b7:1111')
- JSON.set_header_layer2(dst_mac='dd:dd:dd:dd:dd:dd',
- src_mac='ee:ee:ee:ee:ee:ee')
- JSON.set_header_vlan(vlan_id=5)
- JSON.set_header_layer3(src_ip='192.168.100.2', dst_ip='192.168.100.3',
- protocol='udp')
- JSON.set_header_layer4_udp(source_port=3000, destination_port=3001)
- JSON.set_test_options_tput(packet_sizes=[64], duration=10, iterations=1,
- loss_rate=0.0, micro_tpld=True)
- JSON.add_header_segments(flows=4000, multistream_layer='L4')
- JSON.set_topology_blocks()
- write_json_file(JSON.json_data, './testthis.x2544')
- JSON = XenaJSON('./testthis.x2544')
- print_json_report(JSON.json_data)