summaryrefslogtreecommitdiffstats
path: root/tools/pkt_gen/xena/xena.py
diff options
context:
space:
mode:
authorChristian Trautman <ctrautma@redhat.com>2016-05-04 15:29:22 -0400
committerChristian Trautman <ctrautma@redhat.com>2016-06-07 09:28:59 -0400
commit1641271a83022cca9d4548839c0b54debf9c520f (patch)
tree841e463757c60757ed8cefb2bf3ec274ff12f016 /tools/pkt_gen/xena/xena.py
parent5e3b6ae0427963520357453728411327ac8efafe (diff)
xena_cont: Add Xena continuous traffic functionality
* Adds XenaDriver.py module to allow direct communication to Xena chassis through socket API and collect stats. * Adds implementation into xena.py for continuous traffic flow. JIRA: VSPERF-262 Change-Id: I6f975bc205e3c954215eb16466f11298e3fcdfaf Signed-off-by: Christian Trautman <ctrautma@redhat.com>
Diffstat (limited to 'tools/pkt_gen/xena/xena.py')
-rwxr-xr-xtools/pkt_gen/xena/xena.py330
1 files changed, 296 insertions, 34 deletions
diff --git a/tools/pkt_gen/xena/xena.py b/tools/pkt_gen/xena/xena.py
index d29fc362..99da4584 100755
--- a/tools/pkt_gen/xena/xena.py
+++ b/tools/pkt_gen/xena/xena.py
@@ -23,6 +23,7 @@ Xena Traffic Generator Model
"""
# python imports
+import binascii
import logging
import subprocess
import sys
@@ -40,6 +41,15 @@ from tools.pkt_gen.trafficgen.trafficgen import ITrafficGenerator
# Xena module imports
from tools.pkt_gen.xena.xena_json import XenaJSON
+from tools.pkt_gen.xena.XenaDriver import (
+ aggregate_stats,
+ line_percentage,
+ XenaSocketDriver,
+ XenaManager,
+ )
+
+# scapy imports
+import scapy.layers.inet as inet
class Xena(ITrafficGenerator):
@@ -51,8 +61,12 @@ class Xena(ITrafficGenerator):
def __init__(self):
self.mono_pipe = None
+ self.xmanager = None
self._params = {}
+ self._xsocket = None
self._duration = None
+ self.tx_stats = None
+ self.rx_stats = None
@property
def traffic_defaults(self):
@@ -127,6 +141,125 @@ class Xena(ITrafficGenerator):
return results
+ def _build_packet_header(self, reverse=False):
+ """
+ Build a packet header based on traffic profile using scapy external
+ libraries.
+ :param reverse: Swap source and destination info when building header
+ :return: packet header in hex
+ """
+ srcmac = self._params['traffic']['l2'][
+ 'srcmac'] if not reverse else self._params['traffic']['l2'][
+ 'dstmac']
+ dstmac = self._params['traffic']['l2'][
+ 'dstmac'] if not reverse else self._params['traffic']['l2'][
+ 'srcmac']
+ srcip = self._params['traffic']['l3'][
+ 'srcip'] if not reverse else self._params['traffic']['l3']['dstip']
+ dstip = self._params['traffic']['l3'][
+ 'dstip'] if not reverse else self._params['traffic']['l3']['srcip']
+ layer2 = inet.Ether(src=srcmac, dst=dstmac)
+ layer3 = inet.IP(src=srcip, dst=dstip,
+ proto=self._params['traffic']['l3']['proto'])
+ layer4 = inet.UDP(sport=self._params['traffic']['l4']['srcport'],
+ dport=self._params['traffic']['l4']['dstport'])
+ if self._params['traffic']['vlan']['enabled']:
+ vlan = inet.Dot1Q(vlan=self._params['traffic']['vlan']['id'],
+ prio=self._params['traffic']['vlan']['priority'],
+ id=self._params['traffic']['vlan']['cfi'])
+ else:
+ vlan = None
+ packet = layer2/vlan/layer3/layer4 if vlan else layer2/layer3/layer4
+ packet_bytes = bytes(packet)
+ packet_hex = '0x' + binascii.hexlify(packet_bytes).decode('utf-8')
+ return packet_hex
+
+ def _create_api_result(self):
+ """
+ Create result dictionary per trafficgen specifications from socket API
+ stats. If stats are not available return values of 0.
+ :return: ResultsConstants as dictionary
+ """
+ # Handle each case of statistics based on if the data is available.
+ # This prevents uncaught exceptions when the stats aren't available.
+ result_dict = OrderedDict()
+ if self.tx_stats.data.get(self.tx_stats.pt_stream_keys[0]):
+ result_dict[ResultsConstants.TX_FRAMES] = self.tx_stats.data[
+ self.tx_stats.pt_stream_keys[0]]['packets']
+ result_dict[ResultsConstants.TX_RATE_FPS] = self.tx_stats.data[
+ self.tx_stats.pt_stream_keys[0]]['pps']
+ result_dict[ResultsConstants.TX_RATE_MBPS] = self.tx_stats.data[
+ self.tx_stats.pt_stream_keys[0]]['bps'] / 1000000
+ result_dict[ResultsConstants.TX_BYTES] = self.tx_stats.data[
+ self.tx_stats.pt_stream_keys[0]]['bytes']
+ # tx rate percent may need to be halved if bi directional
+ result_dict[ResultsConstants.TX_RATE_PERCENT] = line_percentage(
+ self.xmanager.ports[0], self.tx_stats, self._duration,
+ self._params['traffic']['l2']['framesize']) if \
+ self._params['traffic']['bidir'] == 'False' else\
+ line_percentage(
+ self.xmanager.ports[0], self.tx_stats, self._duration,
+ self._params['traffic']['l2']['framesize']) / 2
+ else:
+ self._logger.error('Transmit stats not available.')
+ result_dict[ResultsConstants.TX_FRAMES] = 0
+ result_dict[ResultsConstants.TX_RATE_FPS] = 0
+ result_dict[ResultsConstants.TX_RATE_MBPS] = 0
+ result_dict[ResultsConstants.TX_BYTES] = 0
+ result_dict[ResultsConstants.TX_RATE_PERCENT] = 0
+
+ if self.rx_stats.data.get('pr_tpldstraffic'):
+ result_dict[ResultsConstants.RX_FRAMES] = self.rx_stats.data[
+ 'pr_tpldstraffic']['0']['packets']
+ result_dict[
+ ResultsConstants.THROUGHPUT_RX_FPS] = self.rx_stats.data[
+ 'pr_tpldstraffic']['0']['pps']
+ result_dict[
+ ResultsConstants.THROUGHPUT_RX_MBPS] = self.rx_stats.data[
+ 'pr_tpldstraffic']['0']['bps'] / 1000000
+ result_dict[ResultsConstants.RX_BYTES] = self.rx_stats.data[
+ 'pr_tpldstraffic']['0']['bytes']
+ # throughput percent may need to be halved if bi directional
+ result_dict[
+ ResultsConstants.THROUGHPUT_RX_PERCENT] = line_percentage(
+ self.xmanager.ports[1], self.rx_stats, self._duration,
+ self._params['traffic']['l2']['framesize']) if \
+ self._params['traffic']['bidir'] == 'False' else \
+ line_percentage(
+ self.xmanager.ports[1], self.rx_stats, self._duration,
+ self._params['traffic']['l2']['framesize']) / 2
+
+ else:
+ self._logger.error('Receive stats not available.')
+ result_dict[ResultsConstants.RX_FRAMES] = 0
+ result_dict[ResultsConstants.THROUGHPUT_RX_FPS] = 0
+ result_dict[ResultsConstants.THROUGHPUT_RX_MBPS] = 0
+ result_dict[ResultsConstants.RX_BYTES] = 0
+ result_dict[ResultsConstants.THROUGHPUT_RX_PERCENT] = 0
+
+ if self.rx_stats.data.get('pr_tplderrors'):
+ result_dict[ResultsConstants.PAYLOAD_ERR] = self.rx_stats.data[
+ 'pr_tplderrors']['0']['pld']
+ result_dict[ResultsConstants.SEQ_ERR] = self.rx_stats.data[
+ 'pr_tplderrors']['0']['seq']
+ else:
+ result_dict[ResultsConstants.PAYLOAD_ERR] = 0
+ result_dict[ResultsConstants.SEQ_ERR] = 0
+
+ if self.rx_stats.data.get('pr_tpldlatency'):
+ result_dict[ResultsConstants.MIN_LATENCY_NS] = self.rx_stats.data[
+ 'pr_tpldlatency']['0']['min']
+ result_dict[ResultsConstants.MAX_LATENCY_NS] = self.rx_stats.data[
+ 'pr_tpldlatency']['0']['max']
+ result_dict[ResultsConstants.AVG_LATENCY_NS] = self.rx_stats.data[
+ 'pr_tpldlatency']['0']['avg']
+ else:
+ result_dict[ResultsConstants.MIN_LATENCY_NS] = 0
+ result_dict[ResultsConstants.MAX_LATENCY_NS] = 0
+ result_dict[ResultsConstants.AVG_LATENCY_NS] = 0
+
+ return result_dict
+
def _setup_json_config(self, trials, loss_rate, testtype=None):
"""
Create a 2bUsed json file that will be used for xena2544.exe execution.
@@ -192,31 +325,160 @@ class Xena(ITrafficGenerator):
self._logger.exception("Error during Xena JSON setup: %s", exc)
raise
- def connect(self):
- """Connect to the traffic generator.
+ def _start_traffic_api(self, packet_limit):
+ """
+ Start the Xena traffic using the socket API driver
+ :param packet_limit: packet limit for stream, set to -1 for no limit
+ :return: None
+ """
+ if not self.xmanager:
+ self._xsocket = XenaSocketDriver(
+ settings.getValue('TRAFFICGEN_XENA_IP'))
+ self.xmanager = XenaManager(
+ self._xsocket, settings.getValue('TRAFFICGEN_XENA_USER'),
+ settings.getValue('TRAFFICGEN_XENA_PASSWORD'))
+
+ # for the report file version info ask the chassis directly for its
+ # software versions
+ settings.setValue('XENA_VERSION', 'XENA Socket API - {}'.format(
+ self.xmanager.get_version()))
+
+ if not len(self.xmanager.ports):
+ self.xmanager.ports[0] = self.xmanager.add_module_port(
+ settings.getValue('TRAFFICGEN_XENA_MODULE1'),
+ settings.getValue('TRAFFICGEN_XENA_PORT1'))
+ if not self.xmanager.ports[0].reserve_port():
+ self._logger.error(
+ 'Unable to reserve port 0. Please release Xena Port')
+
+ if len(self.xmanager.ports) < 2:
+ self.xmanager.ports[1] = self.xmanager.add_module_port(
+ settings.getValue('TRAFFICGEN_XENA_MODULE2'),
+ settings.getValue('TRAFFICGEN_XENA_PORT2'))
+ if not self.xmanager.ports[1].reserve_port():
+ self._logger.error(
+ 'Unable to reserve port 1. Please release Xena Port')
+
+ # Clear port configuration for a clean start
+ self.xmanager.ports[0].reset_port()
+ self.xmanager.ports[1].reset_port()
+ self.xmanager.ports[0].clear_stats()
+ self.xmanager.ports[1].clear_stats()
+
+ # set the port IP from the conf file
+ self.xmanager.ports[0].set_port_ip(
+ settings.getValue('TRAFFICGEN_XENA_PORT0_IP'),
+ settings.getValue('TRAFFICGEN_XENA_PORT0_CIDR'),
+ settings.getValue('TRAFFICGEN_XENA_PORT0_GATEWAY'))
+ self.xmanager.ports[1].set_port_ip(
+ settings.getValue('TRAFFICGEN_XENA_PORT1_IP'),
+ settings.getValue('TRAFFICGEN_XENA_PORT1_CIDR'),
+ settings.getValue('TRAFFICGEN_XENA_PORT1_GATEWAY'))
+
+ def setup_stream(stream, port, payload_id, flip_addr=False):
+ """
+ Helper function to configure streams.
+ :param stream: Stream object from XenaDriver module
+ :param port: Port object from XenaDriver module
+ :param payload_id: payload ID as int
+ :param flip_addr: Boolean if the source and destination addresses
+ should be flipped.
+ :return: None
+ """
+ stream.set_on()
+ stream.set_packet_limit(packet_limit)
+
+ stream.set_rate_fraction(
+ 10000 * self._params['traffic']['frame_rate'])
+ stream.set_packet_header(self._build_packet_header(
+ reverse=flip_addr))
+ stream.set_header_protocol(
+ 'ETHERNET VLAN IP UDP' if self._params['traffic']['vlan'][
+ 'enabled'] else 'ETHERNET IP UDP')
+ stream.set_packet_length(
+ 'fixed', self._params['traffic']['l2']['framesize'], 16383)
+ stream.set_packet_payload('incrementing', '0x00')
+ stream.set_payload_id(payload_id)
+ port.set_port_time_limit(self._duration * 1000000)
+
+ if self._params['traffic']['l2']['framesize'] == 64:
+ # set micro tpld
+ port.micro_tpld_enable()
+
+ if self._params['traffic']['multistream']:
+ stream.enable_multistream(
+ flows=self._params['traffic']['multistream'],
+ layer=self._params['traffic']['stream_type'])
+
+ s1_p0 = self.xmanager.ports[0].add_stream()
+ setup_stream(s1_p0, self.xmanager.ports[0], 0)
+
+ if self._params['traffic']['bidir'] == 'True':
+ s1_p1 = self.xmanager.ports[1].add_stream()
+ setup_stream(s1_p1, self.xmanager.ports[1], 1, flip_addr=True)
+
+ if not self.xmanager.ports[0].traffic_on():
+ self._logger.error(
+ "Failure to start port 0. Check settings and retry.")
+ if self._params['traffic']['bidir'] == 'True':
+ if not self.xmanager.ports[1].traffic_on():
+ self._logger.error(
+ "Failure to start port 1. Check settings and retry.")
+ sleep(self._duration)
+ # getting results
+ if self._params['traffic']['bidir'] == 'True':
+ # need to aggregate out both ports stats and assign that data
+ self.rx_stats = self.xmanager.ports[1].get_rx_stats()
+ self.tx_stats = self.xmanager.ports[0].get_tx_stats()
+ self.tx_stats.data = aggregate_stats(
+ self.tx_stats.data,
+ self.xmanager.ports[1].get_tx_stats().data)
+ self.rx_stats.data = aggregate_stats(
+ self.rx_stats.data,
+ self.xmanager.ports[0].get_rx_stats().data)
+ else:
+ # no need to aggregate, just grab the appropriate port stats
+ self.tx_stats = self.xmanager.ports[0].get_tx_stats()
+ self.rx_stats = self.xmanager.ports[1].get_rx_stats()
+ sleep(1)
- This is an optional function, designed for traffic generators
- which must be "connected to" (i.e. via SSH or an API) before
- they can be used. If not required, simply do nothing here.
+ def _stop_api_traffic(self):
+ """
+ Stop traffic through the socket API
+ :return: Return results from _create_api_result method
+ """
+ self.xmanager.ports[0].traffic_off()
+ if self._params['traffic']['bidir'] == 'True':
+ self.xmanager.ports[1].traffic_off()
+ sleep(5)
- Where implemented, this function should raise an exception on
- failure.
+ stat = self._create_api_result()
+ self.disconnect()
+ return stat
- :returns: None
- """
- pass
+ def connect(self):
+ self._logger.debug('Connect')
+ return self
def disconnect(self):
"""Disconnect from the traffic generator.
As with :func:`connect`, this function is optional.
+
Where implemented, this function should raise an exception on
failure.
:returns: None
"""
- pass
+ self._logger.debug('disconnect')
+ if self.xmanager:
+ self.xmanager.disconnect()
+ self.xmanager = None
+
+ if self._xsocket:
+ self._xsocket.disconnect()
+ self._xsocket = None
def send_burst_traffic(self, traffic=None, numpkts=100, duration=20):
"""Send a burst of traffic.
@@ -239,40 +501,40 @@ class Xena(ITrafficGenerator):
raise NotImplementedError('Xena burst traffic not implemented')
def send_cont_traffic(self, traffic=None, duration=20):
- """Send a continuous flow of traffic.r
-
- Send packets at ``framerate``, using ``traffic`` configuration,
- until timeout ``time`` occurs.
+ """Send a continuous flow of traffic.
- :param traffic: Detailed "traffic" spec, i.e. IP address, VLAN tags
- :param duration: Time to wait to receive packets (secs)
- :returns: dictionary of strings with following data:
- - Tx Throughput (fps),
- - Rx Throughput (fps),
- - Tx Throughput (mbps),
- - Rx Throughput (mbps),
- - Tx Throughput (% linerate),
- - Rx Throughput (% linerate),
- - Min Latency (ns),
- - Max Latency (ns),
- - Avg Latency (ns)
+ See ITrafficGenerator for description
"""
- raise NotImplementedError('Xena continuous traffic not implemented')
+ self._duration = duration
+
+ self._params.clear()
+ self._params['traffic'] = self.traffic_defaults.copy()
+ if traffic:
+ self._params['traffic'] = merge_spec(self._params['traffic'],
+ traffic)
+
+ self._start_traffic_api(-1)
+ return self._stop_api_traffic()
def start_cont_traffic(self, traffic=None, duration=20):
"""Non-blocking version of 'send_cont_traffic'.
- Start transmission and immediately return. Do not wait for
- results.
- :param traffic: Detailed "traffic" spec, i.e. IP address, VLAN tags
- :param duration: Time to wait to receive packets (secs)
+ See ITrafficGenerator for description
"""
- raise NotImplementedError('Xena continuous traffic not implemented')
+ self._duration = duration
+
+ self._params.clear()
+ self._params['traffic'] = self.traffic_defaults.copy()
+ if traffic:
+ self._params['traffic'] = merge_spec(self._params['traffic'],
+ traffic)
+
+ self._start_traffic_api(-1)
def stop_cont_traffic(self):
"""Stop continuous transmission and return results.
"""
- raise NotImplementedError('Xena continuous traffic not implemented')
+ return self._stop_api_traffic()
def send_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
lossrate=0.0):