summaryrefslogtreecommitdiffstats
path: root/tools/pkt_gen/xena/xena.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/pkt_gen/xena/xena.py')
-rwxr-xr-xtools/pkt_gen/xena/xena.py88
1 files changed, 54 insertions, 34 deletions
diff --git a/tools/pkt_gen/xena/xena.py b/tools/pkt_gen/xena/xena.py
index 88dd3700..969a5a88 100755
--- a/tools/pkt_gen/xena/xena.py
+++ b/tools/pkt_gen/xena/xena.py
@@ -23,7 +23,6 @@ Xena Traffic Generator Model
"""
# python imports
-import inspect
import logging
import subprocess
import sys
@@ -74,19 +73,10 @@ class Xena(ITrafficGenerator):
:param root: root dictionary from xml import
:return: Results Ordered dictionary based off ResultsConstants
"""
- throughput_test = False
- back2back_test = False
- # get the calling method so we know how to return the stats
- caller = inspect.stack()[1][3]
- if 'throughput' in caller:
- throughput_test = True
- elif 'back2back' in caller:
- back2back_test = True
- else:
- raise NotImplementedError(
- "Unknown implementation for result return")
+ # get the test type from the report file
+ test_type = root[0][1].get('TestType')
- if throughput_test:
+ if test_type == 'Throughput':
results = OrderedDict()
results[ResultsConstants.THROUGHPUT_RX_FPS] = int(
root[0][1][0][1].get('PortRxPps'))
@@ -122,8 +112,16 @@ class Xena(ITrafficGenerator):
# Stats for latency returned as N/A so just post them
results[ResultsConstants.AVG_LATENCY_NS] = root[0][1][0][0].get(
'AvgLatency')
- elif back2back_test:
- raise NotImplementedError('Back to back results not implemented')
+ elif test_type == 'Back2Back':
+ results = OrderedDict()
+
+ # Just mimic what Ixia does and only return the b2b frame count.
+ # This may change later once its decided the common results stats
+ # to be returned should be.
+ results[ResultsConstants.B2B_FRAMES] = root[0][1][0][0].get(
+ 'TotalTxBurstFrames')
+ else:
+ raise NotImplementedError('Unknown test type in report file.')
return results
@@ -176,7 +174,7 @@ class Xena(ITrafficGenerator):
flows=self._params['traffic']['multistream'],
multistream_layer=self._params['traffic']['stream_type'])
# set duplex mode
- if self._params['traffic']['bidir']:
+ if bool(self._params['traffic']['bidir']):
j_file.set_topology_mesh()
else:
j_file.set_topology_blocks()
@@ -328,36 +326,58 @@ class Xena(ITrafficGenerator):
lossrate=0.0):
"""Send traffic per RFC2544 back2back test specifications.
- Send packets at a fixed rate, using ``traffic``
- configuration, until minimum time at which no packet loss is
- detected is found.
+ See ITrafficGenerator for description
+ """
+ self._duration = duration
- :param traffic: Detailed "traffic" spec, i.e. IP address, VLAN
- tags
- :param trials: Number of trials to execute
- :param duration: Per iteration duration
- :param lossrate: Acceptable loss percentage
+ self._params.clear()
+ self._params['traffic'] = self.traffic_defaults.copy()
+ if traffic:
+ self._params['traffic'] = merge_spec(self._params['traffic'],
+ traffic)
- :returns: Named tuple of Rx Throughput (fps), Rx Throughput (mbps),
- Tx Rate (% linerate), Rx Rate (% linerate), Tx Count (frames),
- Back to Back Count (frames), Frame Loss (frames), Frame Loss (%)
- :rtype: :class:`Back2BackResult`
- """
- raise NotImplementedError('Xena back2back not implemented')
+ self._setup_json_config(trials, lossrate, '2544_b2b')
+
+ args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
+ "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
+ "./tools/pkt_gen/xena", "-u",
+ settings.getValue('TRAFFICGEN_XENA_USER')]
+ self.mono_pipe = subprocess.Popen(
+ args, stdout=sys.stdout)
+ self.mono_pipe.communicate()
+ root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
+ return Xena._create_throughput_result(root)
def start_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
lossrate=0.0):
"""Non-blocking version of 'send_rfc2544_back2back'.
- Start transmission and immediately return. Do not wait for
- results.
+ See ITrafficGenerator for description
"""
- raise NotImplementedError('Xena back2back not implemented')
+ self._duration = duration
+
+ self._params.clear()
+ self._params['traffic'] = self.traffic_defaults.copy()
+ if traffic:
+ self._params['traffic'] = merge_spec(self._params['traffic'],
+ traffic)
+
+ self._setup_json_config(trials, lossrate, '2544_b2b')
+
+ args = ["mono", "./tools/pkt_gen/xena/Xena2544.exe", "-c",
+ "./tools/pkt_gen/xena/profiles/2bUsed.x2544", "-e", "-r",
+ "./tools/pkt_gen/xena", "-u",
+ settings.getValue('TRAFFICGEN_XENA_USER')]
+ self.mono_pipe = subprocess.Popen(
+ args, stdout=sys.stdout)
def wait_rfc2544_back2back(self):
"""Wait and set results of RFC2544 test.
"""
- raise NotImplementedError('Xena back2back not implemented')
+ self.mono_pipe.communicate()
+ sleep(2)
+ root = ET.parse(r'./tools/pkt_gen/xena/xena2544-report.xml').getroot()
+ return Xena._create_throughput_result(root)
if __name__ == "__main__":