aboutsummaryrefslogtreecommitdiffstats
path: root/nfvbench/traffic_gen
diff options
context:
space:
mode:
Diffstat (limited to 'nfvbench/traffic_gen')
-rw-r--r--nfvbench/traffic_gen/dummy.py21
-rw-r--r--nfvbench/traffic_gen/traffic_base.py57
-rw-r--r--nfvbench/traffic_gen/traffic_utils.py28
-rw-r--r--nfvbench/traffic_gen/trex.py700
-rw-r--r--nfvbench/traffic_gen/trex_gen.py1208
5 files changed, 1283 insertions, 731 deletions
diff --git a/nfvbench/traffic_gen/dummy.py b/nfvbench/traffic_gen/dummy.py
index 9beea28..95147ab 100644
--- a/nfvbench/traffic_gen/dummy.py
+++ b/nfvbench/traffic_gen/dummy.py
@@ -13,8 +13,8 @@
# under the License.
from nfvbench.log import LOG
-from traffic_base import AbstractTrafficGenerator
-import traffic_utils as utils
+from .traffic_base import AbstractTrafficGenerator
+from . import traffic_utils as utils
class DummyTG(AbstractTrafficGenerator):
@@ -95,14 +95,14 @@ class DummyTG(AbstractTrafficGenerator):
ports = list(self.traffic_client.generator_config.ports)
self.port_handle = ports
- def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
+ def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False):
self.rates = [utils.to_rate_str(rate) for rate in rates]
self.l2_frame_size = l2frame_size
def clear_streamblock(self):
pass
- def get_stats(self):
+ def get_stats(self, ifstats=None):
"""Get stats from current run.
The binary search mainly looks at 2 results to make the decision:
@@ -147,6 +147,12 @@ class DummyTG(AbstractTrafficGenerator):
total_tx_pps += tx_pps
# actual total tx rate in pps
result['total_tx_rate'] = total_tx_pps
+ # actual offered tx rate in bps
+ avg_packet_size = utils.get_average_packet_size(self.l2_frame_size)
+ total_tx_bps = utils.pps_to_bps(total_tx_pps, avg_packet_size)
+ result['offered_tx_rate_bps'] = total_tx_bps
+
+ result.update(self.get_theoretical_rates(avg_packet_size))
return result
def get_stream_stats(self, tg_stats, if_stats, latencies, chain_idx):
@@ -176,8 +182,8 @@ class DummyTG(AbstractTrafficGenerator):
def fetch_capture_packets(self):
def _get_packet_capture(mac):
# convert text to binary
- src_mac = mac.replace(':', '').decode('hex')
- return {'binary': 'SSSSSS' + src_mac}
+ src_mac = bytearray.fromhex(mac.replace(':', '')).decode()
+ return {'binary': bytes('SSSSSS' + src_mac, 'ascii')}
# for packet capture, generate 2*scc random packets
# normally we should generate packets coming from the right dest macs
@@ -201,6 +207,9 @@ class DummyTG(AbstractTrafficGenerator):
def set_mode(self):
pass
+ def set_service_mode(self, enabled=True):
+ pass
+
def resolve_arp(self):
"""Resolve ARP sucessfully."""
def get_macs(port, scc):
diff --git a/nfvbench/traffic_gen/traffic_base.py b/nfvbench/traffic_gen/traffic_base.py
index 0360591..30aec6e 100644
--- a/nfvbench/traffic_gen/traffic_base.py
+++ b/nfvbench/traffic_gen/traffic_base.py
@@ -16,7 +16,9 @@ import abc
import sys
from nfvbench.log import LOG
-import traffic_utils
+from . import traffic_utils
+from hdrh.histogram import HdrHistogram
+from functools import reduce
class Latency(object):
@@ -27,29 +29,42 @@ class Latency(object):
latency_list: aggregate all latency values from list if not None
"""
- self.min_usec = sys.maxint
+ self.min_usec = sys.maxsize
self.max_usec = 0
self.avg_usec = 0
+ self.hdrh = None
if latency_list:
+ hdrh_list = []
for lat in latency_list:
if lat.available():
self.min_usec = min(self.min_usec, lat.min_usec)
self.max_usec = max(self.max_usec, lat.max_usec)
self.avg_usec += lat.avg_usec
+ if lat.hdrh_available():
+ hdrh_list.append(HdrHistogram.decode(lat.hdrh))
+
+ # aggregate histograms if any
+ if hdrh_list:
+ def add_hdrh(x, y):
+ x.add(y)
+ return x
+ decoded_hdrh = reduce(add_hdrh, hdrh_list)
+ self.hdrh = HdrHistogram.encode(decoded_hdrh).decode('utf-8')
+
# round to nearest usec
self.avg_usec = int(round(float(self.avg_usec) / len(latency_list)))
def available(self):
"""Return True if latency information is available."""
- return self.min_usec != sys.maxint
+ return self.min_usec != sys.maxsize
+ def hdrh_available(self):
+ """Return True if latency histogram information is available."""
+ return self.hdrh is not None
class TrafficGeneratorException(Exception):
"""Exception for traffic generator."""
- pass
-
-
class AbstractTrafficGenerator(object):
def __init__(self, traffic_client):
@@ -68,7 +83,7 @@ class AbstractTrafficGenerator(object):
return None
@abc.abstractmethod
- def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
+ def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False):
# Must be implemented by sub classes
return None
@@ -84,7 +99,7 @@ class AbstractTrafficGenerator(object):
LOG.info('Modified traffic stream for port %s, new rate=%s.', port, self.rates[port_index])
@abc.abstractmethod
- def get_stats(self):
+ def get_stats(self, ifstats):
# Must be implemented by sub classes
return None
@@ -105,7 +120,6 @@ class AbstractTrafficGenerator(object):
def clear_streamblock(self):
"""Clear all streams from the traffic generator."""
- pass
@abc.abstractmethod
def resolve_arp(self):
@@ -115,7 +129,6 @@ class AbstractTrafficGenerator(object):
else a dict of list of dest macs indexed by port#
the dest macs in the list are indexed by the chain id
"""
- pass
@abc.abstractmethod
def get_macs(self):
@@ -123,7 +136,6 @@ class AbstractTrafficGenerator(object):
return: a list of MAC addresses indexed by the port#
"""
- pass
@abc.abstractmethod
def get_port_speed_gbps(self):
@@ -131,4 +143,25 @@ class AbstractTrafficGenerator(object):
return: a list of speed in Gbps indexed by the port#
"""
- pass
+
+ def get_theoretical_rates(self, avg_packet_size):
+
+ result = {}
+
+ # actual interface speed? (may be a virtual override)
+ intf_speed = self.config.intf_speed_used
+
+ if hasattr(self.config, 'user_info') and self.config.user_info is not None:
+ if "extra_encapsulation_bytes" in self.config.user_info:
+ frame_size_full_encapsulation = avg_packet_size + self.config.user_info[
+ "extra_encapsulation_bytes"]
+ result['theoretical_tx_rate_pps'] = traffic_utils.bps_to_pps(
+ intf_speed, frame_size_full_encapsulation) * 2
+ result['theoretical_tx_rate_bps'] = traffic_utils.pps_to_bps(
+ result['theoretical_tx_rate_pps'], avg_packet_size)
+ else:
+ result['theoretical_tx_rate_pps'] = traffic_utils.bps_to_pps(intf_speed,
+ avg_packet_size) * 2
+ result['theoretical_tx_rate_bps'] = traffic_utils.pps_to_bps(
+ result['theoretical_tx_rate_pps'], avg_packet_size)
+ return result
diff --git a/nfvbench/traffic_gen/traffic_utils.py b/nfvbench/traffic_gen/traffic_utils.py
index f856267..4366a6c 100644
--- a/nfvbench/traffic_gen/traffic_utils.py
+++ b/nfvbench/traffic_gen/traffic_utils.py
@@ -14,7 +14,6 @@
import bitmath
-from nfvbench.utils import multiplier_map
# IMIX frame size including the 4-byte FCS field
IMIX_L2_SIZES = [64, 594, 1518]
@@ -23,6 +22,11 @@ IMIX_RATIOS = [7, 4, 1]
IMIX_AVG_L2_FRAME_SIZE = sum(
[1.0 * imix[0] * imix[1] for imix in zip(IMIX_L2_SIZES, IMIX_RATIOS)]) / sum(IMIX_RATIOS)
+multiplier_map = {
+ 'K': 1000,
+ 'M': 1000000,
+ 'G': 1000000000
+}
def convert_rates(l2frame_size, rate, intf_speed):
"""Convert a given rate unit into the other rate units.
@@ -54,12 +58,11 @@ def convert_rates(l2frame_size, rate, intf_speed):
pps = bps_to_pps(bps, avg_packet_size)
else:
raise Exception('Traffic config needs to have a rate type key')
-
return {
'initial_rate_type': initial_rate_type,
- 'rate_pps': int(pps),
+ 'rate_pps': int(float(pps)),
'rate_percent': load,
- 'rate_bps': int(bps)
+ 'rate_bps': int(float(bps))
}
@@ -113,23 +116,22 @@ def parse_rate_str(rate_str):
rate_pps = rate_pps[:-1]
except KeyError:
multiplier = 1
- rate_pps = int(rate_pps.strip()) * multiplier
+ rate_pps = int(float(rate_pps.strip()) * multiplier)
if rate_pps <= 0:
raise Exception('%s is out of valid range' % rate_str)
return {'rate_pps': str(rate_pps)}
- elif rate_str.endswith('ps'):
+ if rate_str.endswith('ps'):
rate = rate_str.replace('ps', '').strip()
bit_rate = bitmath.parse_string(rate).bits
if bit_rate <= 0:
raise Exception('%s is out of valid range' % rate_str)
return {'rate_bps': str(int(bit_rate))}
- elif rate_str.endswith('%'):
+ if rate_str.endswith('%'):
rate_percent = float(rate_str.replace('%', '').strip())
if rate_percent <= 0 or rate_percent > 100.0:
raise Exception('%s is out of valid range (must be 1-100%%)' % rate_str)
return {'rate_percent': str(rate_percent)}
- else:
- raise Exception('Unknown rate string format %s' % rate_str)
+ raise Exception('Unknown rate string format %s' % rate_str)
def get_load_from_rate(rate_str, avg_frame_size=64, line_rate='10Gbps'):
'''From any rate string (with unit) return the corresponding load (in % unit)
@@ -172,10 +174,10 @@ def to_rate_str(rate):
if 'rate_pps' in rate:
pps = rate['rate_pps']
return '{}pps'.format(pps)
- elif 'rate_bps' in rate:
+ if 'rate_bps' in rate:
bps = rate['rate_bps']
return '{}bps'.format(bps)
- elif 'rate_percent' in rate:
+ if 'rate_percent' in rate:
load = rate['rate_percent']
return '{}%'.format(load)
assert False
@@ -185,7 +187,7 @@ def to_rate_str(rate):
def nan_replace(d):
"""Replaces every occurence of 'N/A' with float nan."""
- for k, v in d.iteritems():
+ for k, v in d.items():
if isinstance(v, dict):
nan_replace(v)
elif v == 'N/A':
@@ -200,5 +202,5 @@ def mac_to_int(mac):
def int_to_mac(i):
"""Converts integer representation of MAC address to hex string."""
mac = format(i, 'x').zfill(12)
- blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
+ blocks = [mac[x:x + 2] for x in range(0, len(mac), 2)]
return ':'.join(blocks)
diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py
deleted file mode 100644
index 1f460f6..0000000
--- a/nfvbench/traffic_gen/trex.py
+++ /dev/null
@@ -1,700 +0,0 @@
-# Copyright 2016 Cisco Systems, Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""Driver module for TRex traffic generator."""
-
-import math
-import os
-import random
-import time
-import traceback
-
-from itertools import count
-from nfvbench.log import LOG
-from nfvbench.traffic_server import TRexTrafficServer
-from nfvbench.utils import cast_integer
-from nfvbench.utils import timeout
-from nfvbench.utils import TimeoutError
-from traffic_base import AbstractTrafficGenerator
-from traffic_base import TrafficGeneratorException
-import traffic_utils as utils
-from traffic_utils import IMIX_AVG_L2_FRAME_SIZE
-from traffic_utils import IMIX_L2_SIZES
-from traffic_utils import IMIX_RATIOS
-
-# pylint: disable=import-error
-from trex_stl_lib.api import bind_layers
-from trex_stl_lib.api import CTRexVmInsFixHwCs
-from trex_stl_lib.api import Dot1Q
-from trex_stl_lib.api import Ether
-from trex_stl_lib.api import FlagsField
-from trex_stl_lib.api import IP
-from trex_stl_lib.api import Packet
-from trex_stl_lib.api import STLClient
-from trex_stl_lib.api import STLError
-from trex_stl_lib.api import STLFlowLatencyStats
-from trex_stl_lib.api import STLFlowStats
-from trex_stl_lib.api import STLPktBuilder
-from trex_stl_lib.api import STLScVmRaw
-from trex_stl_lib.api import STLStream
-from trex_stl_lib.api import STLTXCont
-from trex_stl_lib.api import STLVmFixChecksumHw
-from trex_stl_lib.api import STLVmFlowVar
-from trex_stl_lib.api import STLVmFlowVarRepetableRandom
-from trex_stl_lib.api import STLVmWrFlowVar
-from trex_stl_lib.api import ThreeBytesField
-from trex_stl_lib.api import UDP
-from trex_stl_lib.api import XByteField
-from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
-
-
-# pylint: enable=import-error
-
-class VXLAN(Packet):
- """VxLAN class."""
-
- _VXLAN_FLAGS = ['R' * 27] + ['I'] + ['R' * 5]
- name = "VXLAN"
- fields_desc = [FlagsField("flags", 0x08000000, 32, _VXLAN_FLAGS),
- ThreeBytesField("vni", 0),
- XByteField("reserved", 0x00)]
-
- def mysummary(self):
- """Summary."""
- return self.sprintf("VXLAN (vni=%VXLAN.vni%)")
-
-class TRex(AbstractTrafficGenerator):
- """TRex traffic generator driver."""
-
- LATENCY_PPS = 1000
- CHAIN_PG_ID_MASK = 0x007F
- PORT_PG_ID_MASK = 0x0080
- LATENCY_PG_ID_MASK = 0x0100
-
- def __init__(self, traffic_client):
- """Trex driver."""
- AbstractTrafficGenerator.__init__(self, traffic_client)
- self.client = None
- self.id = count()
- self.port_handle = []
- self.chain_count = self.generator_config.service_chain_count
- self.rates = []
- self.capture_id = None
- self.packet_list = []
-
- def get_version(self):
- """Get the Trex version."""
- return self.client.get_server_version() if self.client else ''
-
- def get_pg_id(self, port, chain_id):
- """Calculate the packet group IDs to use for a given port/stream type/chain_id.
-
- port: 0 or 1
- chain_id: identifies to which chain the pg_id is associated (0 to 255)
- return: pg_id, lat_pg_id
-
- We use a bit mask to set up the 3 fields:
- 0x007F: chain ID (8 bits for a max of 128 chains)
- 0x0080: port bit
- 0x0100: latency bit
- """
- pg_id = port * TRex.PORT_PG_ID_MASK | chain_id
- return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK
-
- def extract_stats(self, in_stats):
- """Extract stats from dict returned by Trex API.
-
- :param in_stats: dict as returned by TRex api
- """
- utils.nan_replace(in_stats)
- # LOG.debug(in_stats)
-
- result = {}
- # port_handles should have only 2 elements: [0, 1]
- # so (1 - ph) will be the index for the far end port
- for ph in self.port_handle:
- stats = in_stats[ph]
- far_end_stats = in_stats[1 - ph]
- result[ph] = {
- 'tx': {
- 'total_pkts': cast_integer(stats['opackets']),
- 'total_pkt_bytes': cast_integer(stats['obytes']),
- 'pkt_rate': cast_integer(stats['tx_pps']),
- 'pkt_bit_rate': cast_integer(stats['tx_bps'])
- },
- 'rx': {
- 'total_pkts': cast_integer(stats['ipackets']),
- 'total_pkt_bytes': cast_integer(stats['ibytes']),
- 'pkt_rate': cast_integer(stats['rx_pps']),
- 'pkt_bit_rate': cast_integer(stats['rx_bps']),
- # how many pkts were dropped in RX direction
- # need to take the tx counter on the far end port
- 'dropped_pkts': cast_integer(
- far_end_stats['opackets'] - stats['ipackets'])
- }
- }
- self.__combine_latencies(in_stats, result[ph]['rx'], ph)
-
- total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
- result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
- result["flow_stats"] = in_stats["flow_stats"]
- result["latency"] = in_stats["latency"]
- return result
-
- def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx):
- """Extract the aggregated stats for a given chain.
-
- trex_stats: stats as returned by get_stats()
- if_stats: a list of 2 interface stats to update (port 0 and 1)
- latencies: a list of 2 Latency instances to update for this chain (port 0 and 1)
- latencies[p] is the latency for packets sent on port p
- if there are no latency streams, the Latency instances are not modified
- chain_idx: chain index of the interface stats
-
- The packet counts include normal and latency streams.
-
- Trex returns flows stats as follows:
-
- 'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0},
- 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
- 'rx_bytes': {0: nan, 1: nan, 'total': nan},
- 'rx_pkts': {0: 0, 1: 15001, 'total': 15001},
- 'rx_pps': {0: 0, 1: 0, 'total': 0},
- 'tx_bps': {0: 0, 1: 0, 'total': 0},
- 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
- 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068},
- 'tx_pkts': {0: 15001, 1: 0, 'total': 15001},
- 'tx_pps': {0: 0, 1: 0, 'total': 0}},
- 1: {'rx_bps': {0: 0, 1: 0, 'total': 0},
- 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
- 'rx_bytes': {0: nan, 1: nan, 'total': nan},
- 'rx_pkts': {0: 0, 1: 15001, 'total': 15001},
- 'rx_pps': {0: 0, 1: 0, 'total': 0},
- 'tx_bps': {0: 0, 1: 0, 'total': 0},
- 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
- 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068},
- 'tx_pkts': {0: 15001, 1: 0, 'total': 15001},
- 'tx_pps': {0: 0, 1: 0, 'total': 0}},
- 128: {'rx_bps': {0: 0, 1: 0, 'total': 0},
- 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
- 'rx_bytes': {0: nan, 1: nan, 'total': nan},
- 'rx_pkts': {0: 15001, 1: 0, 'total': 15001},
- 'rx_pps': {0: 0, 1: 0, 'total': 0},
- 'tx_bps': {0: 0, 1: 0, 'total': 0},
- 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
- 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068},
- 'tx_pkts': {0: 0, 1: 15001, 'total': 15001},
- 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc...
-
- the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the
- get_pg_id() method.
- packet counters for a given stream sent on port p are reported as:
- - tx_pkts[p] on port p
- - rx_pkts[1-p] on the far end port
-
- This is a tricky/critical counter transposition operation because
- the results are grouped by port (not by stream):
- tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0]
- rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0]
- tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1]
- rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1]
-
- or using a more generic formula:
- tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p]
- rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p]
-
- the second formula is equivalent to
- rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p]
-
- If there are latency streams, those same counters need to be added in the same way
- """
- def get_latency(lval):
- try:
- return int(round(lval))
- except ValueError:
- return 0
-
- for ifs in if_stats:
- ifs.tx = ifs.rx = 0
- for port in range(2):
- pg_id, lat_pg_id = self.get_pg_id(port, chain_idx)
- for pid in [pg_id, lat_pg_id]:
- try:
- pg_stats = trex_stats['flow_stats'][pid]
- if_stats[port].tx += pg_stats['tx_pkts'][port]
- if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port]
- except KeyError:
- pass
- try:
- lat = trex_stats['latency'][lat_pg_id]['latency']
- # dropped_pkts += lat['err_cntrs']['dropped']
- latencies[port].max_usec = get_latency(lat['total_max'])
- if math.isnan(lat['total_min']):
- latencies[port].min_usec = 0
- latencies[port].avg_usec = 0
- else:
- latencies[port].min_usec = get_latency(lat['total_min'])
- latencies[port].avg_usec = get_latency(lat['average'])
- except KeyError:
- pass
-
- def __combine_latencies(self, in_stats, results, port_handle):
- """Traverse TRex result dictionary and combines chosen latency stats."""
- total_max = 0
- average = 0
- total_min = float("inf")
- for chain_id in range(self.chain_count):
- try:
- _, lat_pg_id = self.get_pg_id(port_handle, chain_id)
- lat = in_stats['latency'][lat_pg_id]['latency']
- # dropped_pkts += lat['err_cntrs']['dropped']
- total_max = max(lat['total_max'], total_max)
- total_min = min(lat['total_min'], total_min)
- average += lat['average']
- except KeyError:
- pass
- if total_min == float("inf"):
- total_min = 0
- results['min_delay_usec'] = total_min
- results['max_delay_usec'] = total_max
- results['avg_delay_usec'] = int(average / self.chain_count)
-
- def _bind_vxlan(self):
- bind_layers(UDP, VXLAN, dport=4789)
- bind_layers(VXLAN, Ether)
-
- def _create_pkt(self, stream_cfg, l2frame_size):
- """Create a packet of given size.
-
- l2frame_size: size of the L2 frame in bytes (including the 32-bit FCS)
- """
- # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size
- frame_size = int(l2frame_size) - 4
-
- if stream_cfg['vxlan'] is True:
- self._bind_vxlan()
- encap_level = '1'
- pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac'])
- if stream_cfg['vtep_vlan'] is not None:
- pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
- pkt_base /= IP(src=stream_cfg['vtep_src_ip'], dst=stream_cfg['vtep_dst_ip'])
- pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789)
- pkt_base /= VXLAN(vni=stream_cfg['net_vni'])
- pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
- else:
- encap_level = '0'
- pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
-
- if stream_cfg['vlan_tag'] is not None:
- pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
-
- udp_args = {}
- if stream_cfg['udp_src_port']:
- udp_args['sport'] = int(stream_cfg['udp_src_port'])
- if stream_cfg['udp_dst_port']:
- udp_args['dport'] = int(stream_cfg['udp_dst_port'])
- pkt_base /= IP() / UDP(**udp_args)
-
- if stream_cfg['ip_addrs_step'] == 'random':
- src_fv = STLVmFlowVarRepetableRandom(
- name="ip_src",
- min_value=stream_cfg['ip_src_addr'],
- max_value=stream_cfg['ip_src_addr_max'],
- size=4,
- seed=random.randint(0, 32767),
- limit=stream_cfg['ip_src_count'])
- dst_fv = STLVmFlowVarRepetableRandom(
- name="ip_dst",
- min_value=stream_cfg['ip_dst_addr'],
- max_value=stream_cfg['ip_dst_addr_max'],
- size=4,
- seed=random.randint(0, 32767),
- limit=stream_cfg['ip_dst_count'])
- else:
- src_fv = STLVmFlowVar(
- name="ip_src",
- min_value=stream_cfg['ip_src_addr'],
- max_value=stream_cfg['ip_src_addr'],
- size=4,
- op="inc",
- step=stream_cfg['ip_addrs_step'])
- dst_fv = STLVmFlowVar(
- name="ip_dst",
- min_value=stream_cfg['ip_dst_addr'],
- max_value=stream_cfg['ip_dst_addr_max'],
- size=4,
- op="inc",
- step=stream_cfg['ip_addrs_step'])
-
- vm_param = [
- src_fv,
- STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)),
- dst_fv,
- STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)),
- STLVmFixChecksumHw(l3_offset="IP:{}".format(encap_level),
- l4_offset="UDP:{}".format(encap_level),
- l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
- ]
- pad = max(0, frame_size - len(pkt_base)) * 'x'
-
- return STLPktBuilder(pkt=pkt_base / pad, vm=STLScVmRaw(vm_param))
-
- def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True):
- """Create a list of streams corresponding to a given chain and stream config.
-
- port: port where the streams originate (0 or 1)
- chain_id: the chain to which the streams are associated to
- stream_cfg: stream configuration
- l2frame: L2 frame size (including 4-byte FCS) or 'IMIX'
- latency: if True also create a latency stream
- """
- streams = []
- pg_id, lat_pg_id = self.get_pg_id(port, chain_id)
- if l2frame == 'IMIX':
- for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES):
- pkt = self._create_pkt(stream_cfg, l2_frame_size)
- streams.append(STLStream(packet=pkt,
- flow_stats=STLFlowStats(pg_id=pg_id),
- mode=STLTXCont(pps=ratio)))
-
- if latency:
- # for IMIX, the latency packets have the average IMIX packet size
- pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE)
-
- else:
- l2frame_size = int(l2frame)
- pkt = self._create_pkt(stream_cfg, l2frame_size)
- streams.append(STLStream(packet=pkt,
- flow_stats=STLFlowStats(pg_id=pg_id),
- mode=STLTXCont()))
- # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging
- # without vlan, the min l2 frame size is 64
- # with vlan it is 68
- # This only applies to the latency stream
- if latency and stream_cfg['vlan_tag'] and l2frame_size < 68:
- pkt = self._create_pkt(stream_cfg, 68)
-
- if latency:
- streams.append(STLStream(packet=pkt,
- flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id),
- mode=STLTXCont(pps=self.LATENCY_PPS)))
- return streams
-
- @timeout(5)
- def __connect(self, client):
- client.connect()
-
- def __connect_after_start(self):
- # after start, Trex may take a bit of time to initialize
- # so we need to retry a few times
- for it in xrange(self.config.generic_retry_count):
- try:
- time.sleep(1)
- self.client.connect()
- break
- except Exception as ex:
- if it == (self.config.generic_retry_count - 1):
- raise
- LOG.info("Retrying connection to TRex (%s)...", ex.message)
-
- def connect(self):
- """Connect to the TRex server."""
- server_ip = self.generator_config.ip
- LOG.info("Connecting to TRex (%s)...", server_ip)
-
- # Connect to TRex server
- self.client = STLClient(server=server_ip)
- try:
- self.__connect(self.client)
- except (TimeoutError, STLError) as e:
- if server_ip == '127.0.0.1':
- try:
- self.__start_server()
- self.__connect_after_start()
- except (TimeoutError, STLError) as e:
- LOG.error('Cannot connect to TRex')
- LOG.error(traceback.format_exc())
- logpath = '/tmp/trex.log'
- if os.path.isfile(logpath):
- # Wait for TRex to finish writing error message
- last_size = 0
- for _ in xrange(self.config.generic_retry_count):
- size = os.path.getsize(logpath)
- if size == last_size:
- # probably not writing anymore
- break
- last_size = size
- time.sleep(1)
- with open(logpath, 'r') as f:
- message = f.read()
- else:
- message = e.message
- raise TrafficGeneratorException(message)
- else:
- raise TrafficGeneratorException(e.message)
-
- ports = list(self.generator_config.ports)
- self.port_handle = ports
- # Prepare the ports
- self.client.reset(ports)
- # Read HW information from each port
- # this returns an array of dict (1 per port)
- """
- Example of output for Intel XL710
- [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True,
- 'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes',
- u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']},
- u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE',
- u'description': u'Ethernet Controller XL710 for 40GbE QSFP+',
- 'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-',
- u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match',
- 'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0',
- 'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off',
- 'layer_mode': 'Ethernet', u'numa': 0}, ...]
- """
- self.port_info = self.client.get_port_info(ports)
- LOG.info('Connected to TRex')
- for id, port in enumerate(self.port_info):
- LOG.info(' Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s',
- id, port['description'], port['speed'], port['src_mac'],
- port['pci_addr'], port['driver'])
- # Make sure the 2 ports have the same speed
- if self.port_info[0]['speed'] != self.port_info[1]['speed']:
- raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' %
- (self.port_info[0]['speed'],
- self.port_info[1]['speed']))
-
- def __start_server(self):
- server = TRexTrafficServer()
- server.run_server(self.generator_config)
-
- def resolve_arp(self):
- """Resolve all configured remote IP addresses.
-
- return: None if ARP failed to resolve for all IP addresses
- else a dict of list of dest macs indexed by port#
- the dest macs in the list are indexed by the chain id
- """
- self.client.set_service_mode(ports=self.port_handle)
- LOG.info('Polling ARP until successful...')
- arp_dest_macs = {}
- for port, device in zip(self.port_handle, self.generator_config.devices):
- # there should be 1 stream config per chain
- stream_configs = device.get_stream_configs()
- chain_count = len(stream_configs)
- ctx = self.client.create_service_ctx(port=port)
- # all dest macs on this port indexed by chain ID
- dst_macs = [None] * chain_count
- dst_macs_count = 0
- # the index in the list is the chain id
- if self.config.vxlan:
- arps = [
- STLServiceARP(ctx,
- src_ip=device.vtep_src_ip,
- dst_ip=device.vtep_dst_ip,
- vlan=device.vtep_vlan)
- for cfg in stream_configs
- ]
- else:
- arps = [
- STLServiceARP(ctx,
- src_ip=cfg['ip_src_tg_gw'],
- dst_ip=cfg['mac_discovery_gw'],
- # will be None if no vlan tagging
- vlan=cfg['vlan_tag'])
- for cfg in stream_configs
- ]
-
- for attempt in range(self.config.generic_retry_count):
- try:
- ctx.run(arps)
- except STLError:
- LOG.error(traceback.format_exc())
- continue
-
- unresolved = []
- for chain_id, mac in enumerate(dst_macs):
- if not mac:
- arp_record = arps[chain_id].get_record()
- if arp_record.dst_mac:
- dst_macs[chain_id] = arp_record.dst_mac
- dst_macs_count += 1
- LOG.info(' ARP: port=%d chain=%d src IP=%s dst IP=%s -> MAC=%s',
- port, chain_id,
- arp_record.src_ip,
- arp_record.dst_ip, arp_record.dst_mac)
- else:
- unresolved.append(arp_record.dst_ip)
- if dst_macs_count == chain_count:
- arp_dest_macs[port] = dst_macs
- LOG.info('ARP resolved successfully for port %s', port)
- break
- else:
- retry = attempt + 1
- LOG.info('Retrying ARP for: %s (retry %d/%d)',
- unresolved, retry, self.config.generic_retry_count)
- if retry < self.config.generic_retry_count:
- time.sleep(self.config.generic_poll_sec)
- else:
- LOG.error('ARP timed out for port %s (resolved %d out of %d)',
- port,
- dst_macs_count,
- chain_count)
- break
-
- self.client.set_service_mode(ports=self.port_handle, enabled=False)
- if len(arp_dest_macs) == len(self.port_handle):
- return arp_dest_macs
- return None
-
- def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
- """Check if rate provided by user is above requirements. Applies only if latency is True."""
- intf_speed = self.generator_config.intf_speed
- if latency:
- if bidirectional:
- mult = 2
- total_rate = 0
- for rate in rates:
- r = utils.convert_rates(l2frame_size, rate, intf_speed)
- total_rate += int(r['rate_pps'])
- else:
- mult = 1
- total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
- # rate must be enough for latency stream and at least 1 pps for base stream per chain
- required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
- result = utils.convert_rates(l2frame_size,
- {'rate_pps': required_rate},
- intf_speed * mult)
- result['result'] = total_rate >= required_rate
- return result
-
- return {'result': True}
-
- def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
- """Program all the streams in Trex server.
-
- l2frame_size: L2 frame size or IMIX
- rates: a list of 2 rates to run each direction
- each rate is a dict like {'rate_pps': '10kpps'}
- bidirectional: True if bidirectional
- latency: True if latency measurement is needed
- """
- r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
- if not r['result']:
- raise TrafficGeneratorException(
- 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
- .format(pps=r['rate_pps'],
- bps=r['rate_bps'],
- load=r['rate_percent']))
- # a dict of list of streams indexed by port#
- # in case of fixed size, has self.chain_count * 2 * 2 streams
- # (1 normal + 1 latency stream per direction per chain)
- # for IMIX, has self.chain_count * 2 * 4 streams
- # (3 normal + 1 latency stream per direction per chain)
- streamblock = {}
- for port in self.port_handle:
- streamblock[port] = []
- stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices]
- self.rates = [utils.to_rate_str(rate) for rate in rates]
- for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
- streamblock[0].extend(self.generate_streams(self.port_handle[0],
- chain_id,
- fwd_stream_cfg,
- l2frame_size,
- latency=latency))
- if len(self.rates) > 1:
- streamblock[1].extend(self.generate_streams(self.port_handle[1],
- chain_id,
- rev_stream_cfg,
- l2frame_size,
- latency=bidirectional and latency))
-
- for port in self.port_handle:
- self.client.add_streams(streamblock[port], ports=port)
- LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port)
-
- def clear_streamblock(self):
- """Clear all streams from TRex."""
- self.rates = []
- self.client.reset(self.port_handle)
- LOG.info('Cleared all existing streams')
-
- def get_stats(self):
- """Get stats from Trex."""
- stats = self.client.get_stats()
- return self.extract_stats(stats)
-
- def get_macs(self):
- """Return the Trex local port MAC addresses.
-
- return: a list of MAC addresses indexed by the port#
- """
- return [port['src_mac'] for port in self.port_info]
-
- def get_port_speed_gbps(self):
- """Return the Trex local port MAC addresses.
-
- return: a list of speed in Gbps indexed by the port#
- """
- return [port['speed'] for port in self.port_info]
-
- def clear_stats(self):
- """Clear all stats in the traffic gneerator."""
- if self.port_handle:
- self.client.clear_stats()
-
- def start_traffic(self):
- """Start generating traffic in all ports."""
- for port, rate in zip(self.port_handle, self.rates):
- self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
-
- def stop_traffic(self):
- """Stop generating traffic."""
- self.client.stop(ports=self.port_handle)
-
- def start_capture(self):
- """Capture all packets on both ports that are unicast to us."""
- if self.capture_id:
- self.stop_capture()
- # Need to filter out unwanted packets so we do not end up counting
- # src MACs of frames that are not unicast to us
- src_mac_list = self.get_macs()
- bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1])
- # ports must be set in service in order to enable capture
- self.client.set_service_mode(ports=self.port_handle)
- self.capture_id = self.client.start_capture(rx_ports=self.port_handle,
- bpf_filter=bpf_filter)
-
- def fetch_capture_packets(self):
- """Fetch capture packets in capture mode."""
- if self.capture_id:
- self.packet_list = []
- self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
- output=self.packet_list)
-
- def stop_capture(self):
- """Stop capturing packets."""
- if self.capture_id:
- self.client.stop_capture(capture_id=self.capture_id['id'])
- self.capture_id = None
- self.client.set_service_mode(ports=self.port_handle, enabled=False)
-
- def cleanup(self):
- """Cleanup Trex driver."""
- if self.client:
- try:
- self.client.reset(self.port_handle)
- self.client.disconnect()
- except STLError:
- # TRex does not like a reset while in disconnected state
- pass
diff --git a/nfvbench/traffic_gen/trex_gen.py b/nfvbench/traffic_gen/trex_gen.py
new file mode 100644
index 0000000..dff72ac
--- /dev/null
+++ b/nfvbench/traffic_gen/trex_gen.py
@@ -0,0 +1,1208 @@
+# Copyright 2016 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Driver module for TRex traffic generator."""
+
+import math
+import os
+import sys
+import random
+import time
+import traceback
+from functools import reduce
+
+from itertools import count
+# pylint: disable=import-error
+from scapy.contrib.mpls import MPLS # flake8: noqa
+# pylint: enable=import-error
+from nfvbench.log import LOG
+from nfvbench.specs import ChainType
+from nfvbench.traffic_server import TRexTrafficServer
+from nfvbench.utils import cast_integer
+from nfvbench.utils import timeout
+from nfvbench.utils import TimeoutError
+
+from hdrh.histogram import HdrHistogram
+
+# pylint: disable=import-error
+from trex.common.services.trex_service_arp import ServiceARP
+from trex.stl.api import ARP
+from trex.stl.api import bind_layers
+from trex.stl.api import CTRexVmInsFixHwCs
+from trex.stl.api import Dot1Q
+from trex.stl.api import Ether
+from trex.stl.api import FlagsField
+from trex.stl.api import IP
+from trex.stl.api import Packet
+from trex.stl.api import STLClient
+from trex.stl.api import STLError
+from trex.stl.api import STLFlowLatencyStats
+from trex.stl.api import STLFlowStats
+from trex.stl.api import STLPktBuilder
+from trex.stl.api import STLScVmRaw
+from trex.stl.api import STLStream
+from trex.stl.api import STLTXCont
+from trex.stl.api import STLTXMultiBurst
+from trex.stl.api import STLVmFixChecksumHw
+from trex.stl.api import STLVmFixIpv4
+from trex.stl.api import STLVmFlowVar
+from trex.stl.api import STLVmFlowVarRepeatableRandom
+from trex.stl.api import STLVmTupleGen
+from trex.stl.api import STLVmWrFlowVar
+from trex.stl.api import ThreeBytesField
+from trex.stl.api import UDP
+from trex.stl.api import XByteField
+
+# pylint: enable=import-error
+
+from .traffic_base import AbstractTrafficGenerator
+from .traffic_base import TrafficGeneratorException
+from . import traffic_utils as utils
+from .traffic_utils import IMIX_AVG_L2_FRAME_SIZE
+from .traffic_utils import IMIX_L2_SIZES
+from .traffic_utils import IMIX_RATIOS
+
+class VXLAN(Packet):
+ """VxLAN class."""
+
+ _VXLAN_FLAGS = ['R' * 27] + ['I'] + ['R' * 5]
+ name = "VXLAN"
+ fields_desc = [FlagsField("flags", 0x08000000, 32, _VXLAN_FLAGS),
+ ThreeBytesField("vni", 0),
+ XByteField("reserved", 0x00)]
+
+ def mysummary(self):
+ """Summary."""
+ return self.sprintf("VXLAN (vni=%VXLAN.vni%)")
+
+class TRex(AbstractTrafficGenerator):
+ """TRex traffic generator driver."""
+
+ LATENCY_PPS = 1000
+ CHAIN_PG_ID_MASK = 0x007F
+ PORT_PG_ID_MASK = 0x0080
+ LATENCY_PG_ID_MASK = 0x0100
+
+ def __init__(self, traffic_client):
+ """Trex driver."""
+ AbstractTrafficGenerator.__init__(self, traffic_client)
+ self.client = None
+ self.id = count()
+ self.port_handle = []
+ self.chain_count = self.generator_config.service_chain_count
+ self.rates = []
+ self.capture_id = None
+ self.packet_list = []
+ self.l2_frame_size = 0
+
+ def get_version(self):
+ """Get the Trex version."""
+ return self.client.get_server_version() if self.client else ''
+
+ def get_pg_id(self, port, chain_id):
+ """Calculate the packet group IDs to use for a given port/stream type/chain_id.
+
+ port: 0 or 1
+ chain_id: identifies to which chain the pg_id is associated (0 to 255)
+ return: pg_id, lat_pg_id
+
+ We use a bit mask to set up the 3 fields:
+ 0x007F: chain ID (8 bits for a max of 128 chains)
+ 0x0080: port bit
+ 0x0100: latency bit
+ """
+ pg_id = port * TRex.PORT_PG_ID_MASK | chain_id
+ return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK
+
+ def extract_stats(self, in_stats, ifstats):
+ """Extract stats from dict returned by Trex API.
+
+ :param in_stats: dict as returned by TRex api
+ """
+ utils.nan_replace(in_stats)
+ # LOG.debug(in_stats)
+
+ result = {}
+ # port_handles should have only 2 elements: [0, 1]
+ # so (1 - ph) will be the index for the far end port
+ for ph in self.port_handle:
+ stats = in_stats[ph]
+ far_end_stats = in_stats[1 - ph]
+ result[ph] = {
+ 'tx': {
+ 'total_pkts': cast_integer(stats['opackets']),
+ 'total_pkt_bytes': cast_integer(stats['obytes']),
+ 'pkt_rate': cast_integer(stats['tx_pps']),
+ 'pkt_bit_rate': cast_integer(stats['tx_bps'])
+ },
+ 'rx': {
+ 'total_pkts': cast_integer(stats['ipackets']),
+ 'total_pkt_bytes': cast_integer(stats['ibytes']),
+ 'pkt_rate': cast_integer(stats['rx_pps']),
+ 'pkt_bit_rate': cast_integer(stats['rx_bps']),
+ # how many pkts were dropped in RX direction
+ # need to take the tx counter on the far end port
+ 'dropped_pkts': cast_integer(
+ far_end_stats['opackets'] - stats['ipackets'])
+ }
+ }
+ self.__combine_latencies(in_stats, result[ph]['rx'], ph)
+
+ total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
+
+ # in case of GARP packets we need to base total_tx_pkts value using flow_stats
+ # as no GARP packets have no flow stats and will not be received on the other port
+ if self.config.periodic_gratuitous_arp:
+ if not self.config.no_flow_stats and not self.config.no_latency_stats:
+ global_total_tx_pkts = total_tx_pkts
+ total_tx_pkts = 0
+ if ifstats:
+ for chain_id, _ in enumerate(ifstats):
+ for ph in self.port_handle:
+ pg_id, lat_pg_id = self.get_pg_id(ph, chain_id)
+ flows_tx_pkts = in_stats['flow_stats'][pg_id]['tx_pkts']['total'] + \
+ in_stats['flow_stats'][lat_pg_id]['tx_pkts']['total']
+ result[ph]['tx']['total_pkts'] = flows_tx_pkts
+ total_tx_pkts += flows_tx_pkts
+ else:
+ for pg_id in in_stats['flow_stats']:
+ if pg_id != 'global':
+ total_tx_pkts += in_stats['flow_stats'][pg_id]['tx_pkts']['total']
+ result["garp_total_tx_rate"] = cast_integer(
+ (global_total_tx_pkts - total_tx_pkts) / self.config.duration_sec)
+ else:
+ LOG.warning("Gratuitous ARP are not received by the other port so TRex and NFVbench"
+ " see these packets as dropped. Please do not activate no_flow_stats"
+ " and no_latency_stats properties to have a better drop rate.")
+
+ result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
+ # actual offered tx rate in bps
+ avg_packet_size = utils.get_average_packet_size(self.l2_frame_size)
+ total_tx_bps = utils.pps_to_bps(result["total_tx_rate"], avg_packet_size)
+ result['offered_tx_rate_bps'] = total_tx_bps
+
+ result.update(self.get_theoretical_rates(avg_packet_size))
+
+ result["flow_stats"] = in_stats["flow_stats"]
+ result["latency"] = in_stats["latency"]
+
+ # Merge HDRHistogram to have an overall value for all chains and ports
+ # (provided that the histogram exists in the stats returned by T-Rex)
+ # Of course, empty histograms will produce an empty (invalid) histogram.
+ try:
+ hdrh_list = []
+ if ifstats:
+ for chain_id, _ in enumerate(ifstats):
+ for ph in self.port_handle:
+ _, lat_pg_id = self.get_pg_id(ph, chain_id)
+ hdrh_list.append(
+ HdrHistogram.decode(in_stats['latency'][lat_pg_id]['latency']['hdrh']))
+ else:
+ for pg_id in in_stats['latency']:
+ if pg_id != 'global':
+ hdrh_list.append(
+ HdrHistogram.decode(in_stats['latency'][pg_id]['latency']['hdrh']))
+
+ def add_hdrh(x, y):
+ x.add(y)
+ return x
+ decoded_hdrh = reduce(add_hdrh, hdrh_list)
+ result["overall_hdrh"] = HdrHistogram.encode(decoded_hdrh).decode('utf-8')
+ except KeyError:
+ pass
+
+ return result
+
+ def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx):
+ """Extract the aggregated stats for a given chain.
+
+ trex_stats: stats as returned by get_stats()
+ if_stats: a list of 2 interface stats to update (port 0 and 1)
+ latencies: a list of 2 Latency instances to update for this chain (port 0 and 1)
+ latencies[p] is the latency for packets sent on port p
+ if there are no latency streams, the Latency instances are not modified
+ chain_idx: chain index of the interface stats
+
+ The packet counts include normal and latency streams.
+
+ Trex returns flows stats as follows:
+
+ 'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0},
+ 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
+ 'rx_bytes': {0: nan, 1: nan, 'total': nan},
+ 'rx_pkts': {0: 0, 1: 15001, 'total': 15001},
+ 'rx_pps': {0: 0, 1: 0, 'total': 0},
+ 'tx_bps': {0: 0, 1: 0, 'total': 0},
+ 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
+ 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068},
+ 'tx_pkts': {0: 15001, 1: 0, 'total': 15001},
+ 'tx_pps': {0: 0, 1: 0, 'total': 0}},
+ 1: {'rx_bps': {0: 0, 1: 0, 'total': 0},
+ 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
+ 'rx_bytes': {0: nan, 1: nan, 'total': nan},
+ 'rx_pkts': {0: 0, 1: 15001, 'total': 15001},
+ 'rx_pps': {0: 0, 1: 0, 'total': 0},
+ 'tx_bps': {0: 0, 1: 0, 'total': 0},
+ 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
+ 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068},
+ 'tx_pkts': {0: 15001, 1: 0, 'total': 15001},
+ 'tx_pps': {0: 0, 1: 0, 'total': 0}},
+ 128: {'rx_bps': {0: 0, 1: 0, 'total': 0},
+ 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
+ 'rx_bytes': {0: nan, 1: nan, 'total': nan},
+ 'rx_pkts': {0: 15001, 1: 0, 'total': 15001},
+ 'rx_pps': {0: 0, 1: 0, 'total': 0},
+ 'tx_bps': {0: 0, 1: 0, 'total': 0},
+ 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
+ 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068},
+ 'tx_pkts': {0: 0, 1: 15001, 'total': 15001},
+ 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc...
+
+ the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the
+ get_pg_id() method.
+ packet counters for a given stream sent on port p are reported as:
+ - tx_pkts[p] on port p
+ - rx_pkts[1-p] on the far end port
+
+ This is a tricky/critical counter transposition operation because
+ the results are grouped by port (not by stream):
+ tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0]
+ rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0]
+ tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1]
+ rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1]
+
+ or using a more generic formula:
+ tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p]
+ rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p]
+
+ the second formula is equivalent to
+ rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p]
+
+ If there are latency streams, those same counters need to be added in the same way
+ """
+ def get_latency(lval):
+ try:
+ return int(round(lval))
+ except ValueError:
+ return 0
+
+ for ifs in if_stats:
+ ifs.tx = ifs.rx = 0
+ for port in range(2):
+ pg_id, lat_pg_id = self.get_pg_id(port, chain_idx)
+ for pid in [pg_id, lat_pg_id]:
+ try:
+ pg_stats = trex_stats['flow_stats'][pid]
+ if_stats[port].tx += pg_stats['tx_pkts'][port]
+ if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port]
+ except KeyError:
+ pass
+ try:
+ lat = trex_stats['latency'][lat_pg_id]['latency']
+ # dropped_pkts += lat['err_cntrs']['dropped']
+ latencies[port].max_usec = get_latency(lat['total_max'])
+ if math.isnan(lat['total_min']):
+ latencies[port].min_usec = 0
+ latencies[port].avg_usec = 0
+ else:
+ latencies[port].min_usec = get_latency(lat['total_min'])
+ latencies[port].avg_usec = get_latency(lat['average'])
+ # pick up the HDR histogram if present (otherwise will raise KeyError)
+ latencies[port].hdrh = lat['hdrh']
+ except KeyError:
+ pass
+
+ def __combine_latencies(self, in_stats, results, port_handle):
+ """Traverse TRex result dictionary and combines chosen latency stats.
+
+ example of latency dict returned by trex (2 chains):
+ 'latency': {256: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 26.5,
+ 'hdrh': u'HISTFAAAAEx4nJNpmSgz...bFRgxi',
+ 'histogram': {20: 303,
+ 30: 320,
+ 40: 300,
+ 50: 73,
+ 60: 4,
+ 70: 1},
+ 'jitter': 14,
+ 'last_max': 63,
+ 'total_max': 63,
+ 'total_min': 20}},
+ 257: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 29.75,
+ 'hdrh': u'HISTFAAAAEV4nJN...CALilDG0=',
+ 'histogram': {20: 261,
+ 30: 431,
+ 40: 3,
+ 50: 80,
+ 60: 225},
+ 'jitter': 23,
+ 'last_max': 67,
+ 'total_max': 67,
+ 'total_min': 20}},
+ 384: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 18.0,
+ 'hdrh': u'HISTFAAAADR4nJNpm...MjCwDDxAZG',
+ 'histogram': {20: 987, 30: 14},
+ 'jitter': 0,
+ 'last_max': 34,
+ 'total_max': 34,
+ 'total_min': 20}},
+ 385: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 19.0,
+ 'hdrh': u'HISTFAAAADR4nJNpm...NkYmJgDdagfK',
+ 'histogram': {20: 989, 30: 11},
+ 'jitter': 0,
+ 'last_max': 38,
+ 'total_max': 38,
+ 'total_min': 20}},
+ 'global': {'bad_hdr': 0, 'old_flow': 0}},
+ """
+ total_max = 0
+ average = 0
+ total_min = float("inf")
+ for chain_id in range(self.chain_count):
+ try:
+ _, lat_pg_id = self.get_pg_id(port_handle, chain_id)
+ lat = in_stats['latency'][lat_pg_id]['latency']
+ # dropped_pkts += lat['err_cntrs']['dropped']
+ total_max = max(lat['total_max'], total_max)
+ total_min = min(lat['total_min'], total_min)
+ average += lat['average']
+ except KeyError:
+ pass
+ if total_min == float("inf"):
+ total_min = 0
+ results['min_delay_usec'] = total_min
+ results['max_delay_usec'] = total_max
+ results['avg_delay_usec'] = int(average / self.chain_count)
+
+ def _bind_vxlan(self):
+ bind_layers(UDP, VXLAN, dport=4789)
+ bind_layers(VXLAN, Ether)
+
+ def _create_pkt(self, stream_cfg, l2frame_size, disable_random_latency_flow=False):
+ """Create a packet of given size.
+
+ l2frame_size: size of the L2 frame in bytes (including the 32-bit FCS)
+ """
+ # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size
+ frame_size = int(l2frame_size) - 4
+ vm_param = []
+ if stream_cfg['vxlan'] is True:
+ self._bind_vxlan()
+ encap_level = '1'
+ pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac'])
+ if stream_cfg['vtep_vlan'] is not None:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
+ pkt_base /= IP(src=stream_cfg['vtep_src_ip'], dst=stream_cfg['vtep_dst_ip'])
+ pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789)
+ pkt_base /= VXLAN(vni=stream_cfg['net_vni'])
+ pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
+ # need to randomize the outer header UDP src port based on flow
+ vxlan_udp_src_fv = STLVmFlowVar(
+ name="vxlan_udp_src",
+ min_value=1337,
+ max_value=32767,
+ size=2,
+ op="random")
+ vm_param = [vxlan_udp_src_fv,
+ STLVmWrFlowVar(fv_name="vxlan_udp_src", pkt_offset="UDP.sport")]
+ elif stream_cfg['mpls'] is True:
+ encap_level = '0'
+ pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac'])
+ if stream_cfg['vtep_vlan'] is not None:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
+ if stream_cfg['mpls_outer_label'] is not None:
+ pkt_base /= MPLS(label=stream_cfg['mpls_outer_label'], cos=1, s=0, ttl=255)
+ if stream_cfg['mpls_inner_label'] is not None:
+ pkt_base /= MPLS(label=stream_cfg['mpls_inner_label'], cos=1, s=1, ttl=255)
+ # Flow stats and MPLS labels randomization TBD
+ pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
+ else:
+ encap_level = '0'
+ pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
+
+ if stream_cfg['vlan_tag'] is not None:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
+
+ udp_args = {}
+ if stream_cfg['udp_src_port']:
+ udp_args['sport'] = int(stream_cfg['udp_src_port'])
+ if stream_cfg['udp_port_step'] == 'random':
+ step = 1
+ else:
+ step = stream_cfg['udp_port_step']
+ udp_args['sport_step'] = int(step)
+ udp_args['sport_max'] = int(stream_cfg['udp_src_port_max'])
+ if stream_cfg['udp_dst_port']:
+ udp_args['dport'] = int(stream_cfg['udp_dst_port'])
+ if stream_cfg['udp_port_step'] == 'random':
+ step = 1
+ else:
+ step = stream_cfg['udp_port_step']
+ udp_args['dport_step'] = int(step)
+ udp_args['dport_max'] = int(stream_cfg['udp_dst_port_max'])
+
+ pkt_base /= IP(src=stream_cfg['ip_src_addr'], dst=stream_cfg['ip_dst_addr']) / \
+ UDP(dport=udp_args['dport'], sport=udp_args['sport'])
+
+ # STLVmTupleGen need flow count >= cores used by TRex, if FC < cores we used STLVmFlowVar
+ if stream_cfg['ip_addrs_step'] == '0.0.0.1' and stream_cfg['udp_port_step'] == '1' and \
+ stream_cfg['count'] >= self.generator_config.cores:
+ src_fv = STLVmTupleGen(ip_min=stream_cfg['ip_src_addr'],
+ ip_max=stream_cfg['ip_src_addr_max'],
+ port_min=udp_args['sport'],
+ port_max=udp_args['sport_max'],
+ name="tuple_src",
+ limit_flows=stream_cfg['count'])
+ dst_fv = STLVmTupleGen(ip_min=stream_cfg['ip_dst_addr'],
+ ip_max=stream_cfg['ip_dst_addr_max'],
+ port_min=udp_args['dport'],
+ port_max=udp_args['dport_max'],
+ name="tuple_dst",
+ limit_flows=stream_cfg['count'])
+ vm_param = [
+ src_fv,
+ STLVmWrFlowVar(fv_name="tuple_src.ip",
+ pkt_offset="IP:{}.src".format(encap_level)),
+ STLVmWrFlowVar(fv_name="tuple_src.port",
+ pkt_offset="UDP:{}.sport".format(encap_level)),
+ dst_fv,
+ STLVmWrFlowVar(fv_name="tuple_dst.ip",
+ pkt_offset="IP:{}.dst".format(encap_level)),
+ STLVmWrFlowVar(fv_name="tuple_dst.port",
+ pkt_offset="UDP:{}.dport".format(encap_level)),
+ ]
+ else:
+ if disable_random_latency_flow:
+ src_fv_ip = STLVmFlowVar(
+ name="ip_src",
+ min_value=stream_cfg['ip_src_addr'],
+ max_value=stream_cfg['ip_src_addr'],
+ size=4)
+ dst_fv_ip = STLVmFlowVar(
+ name="ip_dst",
+ min_value=stream_cfg['ip_dst_addr'],
+ max_value=stream_cfg['ip_dst_addr'],
+ size=4)
+ elif stream_cfg['ip_addrs_step'] == 'random':
+ src_fv_ip = STLVmFlowVarRepeatableRandom(
+ name="ip_src",
+ min_value=stream_cfg['ip_src_addr'],
+ max_value=stream_cfg['ip_src_addr_max'],
+ size=4,
+ seed=random.randint(0, 32767),
+ limit=stream_cfg['ip_src_count'])
+ dst_fv_ip = STLVmFlowVarRepeatableRandom(
+ name="ip_dst",
+ min_value=stream_cfg['ip_dst_addr'],
+ max_value=stream_cfg['ip_dst_addr_max'],
+ size=4,
+ seed=random.randint(0, 32767),
+ limit=stream_cfg['ip_dst_count'])
+ else:
+ src_fv_ip = STLVmFlowVar(
+ name="ip_src",
+ min_value=stream_cfg['ip_src_addr'],
+ max_value=stream_cfg['ip_src_addr_max'],
+ size=4,
+ op="inc",
+ step=stream_cfg['ip_addrs_step'])
+ dst_fv_ip = STLVmFlowVar(
+ name="ip_dst",
+ min_value=stream_cfg['ip_dst_addr'],
+ max_value=stream_cfg['ip_dst_addr_max'],
+ size=4,
+ op="inc",
+ step=stream_cfg['ip_addrs_step'])
+
+ if disable_random_latency_flow:
+ src_fv_port = STLVmFlowVar(
+ name="p_src",
+ min_value=udp_args['sport'],
+ max_value=udp_args['sport'],
+ size=2)
+ dst_fv_port = STLVmFlowVar(
+ name="p_dst",
+ min_value=udp_args['dport'],
+ max_value=udp_args['dport'],
+ size=2)
+ elif stream_cfg['udp_port_step'] == 'random':
+ src_fv_port = STLVmFlowVarRepeatableRandom(
+ name="p_src",
+ min_value=udp_args['sport'],
+ max_value=udp_args['sport_max'],
+ size=2,
+ seed=random.randint(0, 32767),
+ limit=stream_cfg['udp_src_count'])
+ dst_fv_port = STLVmFlowVarRepeatableRandom(
+ name="p_dst",
+ min_value=udp_args['dport'],
+ max_value=udp_args['dport_max'],
+ size=2,
+ seed=random.randint(0, 32767),
+ limit=stream_cfg['udp_dst_count'])
+ else:
+ src_fv_port = STLVmFlowVar(
+ name="p_src",
+ min_value=udp_args['sport'],
+ max_value=udp_args['sport_max'],
+ size=2,
+ op="inc",
+ step=udp_args['sport_step'])
+ dst_fv_port = STLVmFlowVar(
+ name="p_dst",
+ min_value=udp_args['dport'],
+ max_value=udp_args['dport_max'],
+ size=2,
+ op="inc",
+ step=udp_args['dport_step'])
+ vm_param = [
+ src_fv_ip,
+ STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)),
+ src_fv_port,
+ STLVmWrFlowVar(fv_name="p_src", pkt_offset="UDP:{}.sport".format(encap_level)),
+ dst_fv_ip,
+ STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)),
+ dst_fv_port,
+ STLVmWrFlowVar(fv_name="p_dst", pkt_offset="UDP:{}.dport".format(encap_level)),
+ ]
+ # Use HW Offload to calculate the outter IP/UDP packet
+ vm_param.append(STLVmFixChecksumHw(l3_offset="IP:0",
+ l4_offset="UDP:0",
+ l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP))
+ # Use software to fix the inner IP/UDP payload for VxLAN packets
+ if int(encap_level):
+ vm_param.append(STLVmFixIpv4(offset="IP:1"))
+ pad = max(0, frame_size - len(pkt_base)) * 'x'
+
+ return STLPktBuilder(pkt=pkt_base / pad,
+ vm=STLScVmRaw(vm_param, cache_size=int(self.config.cache_size)))
+
+ def _create_gratuitous_arp_pkt(self, stream_cfg):
+ """Create a GARP packet.
+
+ """
+ pkt_base = Ether(src=stream_cfg['mac_src'], dst="ff:ff:ff:ff:ff:ff")
+
+ if self.config.vxlan or self.config.mpls:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
+ elif stream_cfg['vlan_tag'] is not None:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
+
+ pkt_base /= ARP(psrc=stream_cfg['ip_src_tg_gw'], hwsrc=stream_cfg['mac_src'],
+ hwdst=stream_cfg['mac_src'], pdst=stream_cfg['ip_src_tg_gw'])
+
+ return STLPktBuilder(pkt=pkt_base)
+
+ def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True,
+ e2e=False):
+ """Create a list of streams corresponding to a given chain and stream config.
+
+ port: port where the streams originate (0 or 1)
+ chain_id: the chain to which the streams are associated to
+ stream_cfg: stream configuration
+ l2frame: L2 frame size (including 4-byte FCS) or 'IMIX'
+ latency: if True also create a latency stream
+ e2e: True if performing "end to end" connectivity check
+ """
+ streams = []
+ pg_id, lat_pg_id = self.get_pg_id(port, chain_id)
+ if l2frame == 'IMIX':
+ for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES):
+ pkt = self._create_pkt(stream_cfg, l2_frame_size)
+ if e2e or stream_cfg['mpls']:
+ streams.append(STLStream(packet=pkt,
+ mode=STLTXCont(pps=ratio)))
+ else:
+ if stream_cfg['vxlan'] is True:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id,
+ vxlan=True)
+ if not self.config.no_flow_stats else None,
+ mode=STLTXCont(pps=ratio)))
+ else:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id)
+ if not self.config.no_flow_stats else None,
+ mode=STLTXCont(pps=ratio)))
+
+ if latency:
+ # for IMIX, the latency packets have the average IMIX packet size
+ if stream_cfg['ip_addrs_step'] == 'random' or \
+ stream_cfg['udp_port_step'] == 'random':
+ # Force latency flow to only one flow to avoid creating flows
+ # over requested flow count
+ pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE, True)
+ else:
+ pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE)
+
+ else:
+ l2frame_size = int(l2frame)
+ pkt = self._create_pkt(stream_cfg, l2frame_size)
+ if self.config.periodic_gratuitous_arp:
+ requested_pps = int(utils.parse_rate_str(self.rates[0])[
+ 'rate_pps']) - self.config.gratuitous_arp_pps
+ if latency:
+ requested_pps -= self.LATENCY_PPS
+ stltx_cont = STLTXCont(pps=requested_pps)
+ else:
+ stltx_cont = STLTXCont()
+ if e2e or stream_cfg['mpls']:
+ streams.append(STLStream(packet=pkt,
+ # Flow stats is disabled for MPLS now
+ # flow_stats=STLFlowStats(pg_id=pg_id),
+ mode=stltx_cont))
+ else:
+ if stream_cfg['vxlan'] is True:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id,
+ vxlan=True)
+ if not self.config.no_flow_stats else None,
+ mode=stltx_cont))
+ else:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id)
+ if not self.config.no_flow_stats else None,
+ mode=stltx_cont))
+ # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging
+ # without vlan, the min l2 frame size is 64
+ # with vlan it is 68
+ # This only applies to the latency stream
+ if latency:
+ if stream_cfg['vlan_tag'] and l2frame_size < 68:
+ l2frame_size = 68
+ if stream_cfg['ip_addrs_step'] == 'random' or \
+ stream_cfg['udp_port_step'] == 'random':
+ # Force latency flow to only one flow to avoid creating flows
+ # over requested flow count
+ pkt = self._create_pkt(stream_cfg, l2frame_size, True)
+ else:
+ pkt = self._create_pkt(stream_cfg, l2frame_size)
+
+ if latency:
+ if self.config.no_latency_stats:
+ LOG.info("Latency flow statistics are disabled.")
+ if stream_cfg['vxlan'] is True:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id,
+ vxlan=True)
+ if not self.config.no_latency_stats else None,
+ mode=STLTXCont(pps=self.LATENCY_PPS)))
+ else:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id)
+ if not self.config.no_latency_stats else None,
+ mode=STLTXCont(pps=self.LATENCY_PPS)))
+
+ if self.config.periodic_gratuitous_arp and (
+ self.config.l3_router or self.config.service_chain == ChainType.EXT):
+ # In case of L3 router feature or EXT chain with router
+ # and depending on ARP stale time SUT configuration
+ # Gratuitous ARP from TG port to the router is needed to keep traffic up
+ garp_pkt = self._create_gratuitous_arp_pkt(stream_cfg)
+ ibg = self.config.gratuitous_arp_pps * 1000000.0
+ packets_count = int(self.config.duration_sec / self.config.gratuitous_arp_pps)
+ streams.append(
+ STLStream(packet=garp_pkt,
+ mode=STLTXMultiBurst(pkts_per_burst=1, count=packets_count, ibg=ibg)))
+ return streams
+
+ @timeout(5)
+ def __connect(self, client):
+ client.connect()
+
+ def __local_server_status(self):
+ """ The TRex server may have started but failed initializing... and stopped.
+ This piece of code is especially designed to address
+ the case when a fatal failure occurs on a DPDK init call.
+ The TRex algorihm should be revised to include some missing timeouts (?)
+ status returned:
+ 0: no error detected
+ 1: fatal error detected - should lead to exiting the run
+ 2: error detected that could be solved by starting again
+ The diagnostic is based on parsing the local trex log file (improvable)
+ """
+ status = 0
+ message = None
+ failure = None
+ exited = None
+ cause = None
+ error = None
+ before = None
+ after = None
+ last = None
+ try:
+ with open('/tmp/trex.log', 'r', encoding="utf-8") as trex_log:
+ for _line in trex_log:
+ line = _line.strip()
+ if line.startswith('Usage:'):
+ break
+ if 'ports are bound' in line:
+ continue
+ if 'please wait' in line:
+ continue
+ if 'exit' in line.lower():
+ exited = line
+ elif 'cause' in line.lower():
+ cause = line
+ elif 'fail' in line.lower():
+ failure = line
+ elif 'msg' in line.lower():
+ message = line
+ elif (error is not None) and line:
+ after = line
+ elif line.startswith('Error:') or line.startswith('ERROR'):
+ error = line
+ before = last
+ last = line
+ except FileNotFoundError:
+ pass
+ if exited is not None:
+ status = 1
+ LOG.info("\x1b[1m%s\x1b[0m %s", 'TRex failed initializing:', exited)
+ if cause is not None:
+ LOG.info("TRex [cont'd] %s", cause)
+ if failure is not None:
+ LOG.info("TRex [cont'd] %s", failure)
+ if message is not None:
+ LOG.info("TRex [cont'd] %s", message)
+ if 'not supported yet' in message.lower():
+ LOG.info("TRex [cont'd] Try starting again!")
+ status = 2
+ elif error is not None:
+ status = 1
+ LOG.info("\x1b[1m%s\x1b[0m %s", 'TRex failed initializing:', error)
+ if after is not None:
+ LOG.info("TRex [cont'd] %s", after)
+ elif before is not None:
+ LOG.info("TRex [cont'd] %s", before)
+ return status
+
+ def __connect_after_start(self):
+ # after start, Trex may take a bit of time to initialize
+ # so we need to retry a few times
+ # we try to capture recoverable error cases (checking status)
+ status = 0
+ for it in range(self.config.generic_retry_count):
+ try:
+ time.sleep(1)
+ self.client.connect()
+ break
+ except Exception as ex:
+ if it == (self.config.generic_retry_count - 1):
+ raise
+ status = self.__local_server_status()
+ if status > 0:
+ # No need to wait anymore, something went wrong and TRex exited
+ if status == 1:
+ LOG.info("\x1b[1m%s\x1b[0m", 'TRex failed starting!')
+ print("More information? Try the command: "
+ + "\x1b[1mnfvbench --show-trex-log\x1b[0m")
+ sys.exit(0)
+ if status == 2:
+ # a new start will follow
+ return status
+ LOG.info("Retrying connection to TRex (%s)...", ex.msg)
+ return status
+
+ def connect(self):
+ """Connect to the TRex server."""
+ status = 0
+ server_ip = self.generator_config.ip
+ LOG.info("Connecting to TRex (%s)...", server_ip)
+
+ # Connect to TRex server
+ self.client = STLClient(server=server_ip, sync_port=self.generator_config.zmq_rpc_port,
+ async_port=self.generator_config.zmq_pub_port)
+ try:
+ self.__connect(self.client)
+ if server_ip == '127.0.0.1':
+ config_updated = self.__check_config()
+ if config_updated or self.config.restart:
+ status = self.__restart()
+ except (TimeoutError, STLError) as e:
+ if server_ip == '127.0.0.1':
+ status = self.__start_local_server()
+ else:
+ raise TrafficGeneratorException(e.message) from e
+
+ if status == 2:
+ # Workaround in case of a failed TRex server initialization
+ # we try to start it again (twice maximum)
+ # which may allow low level initialization to complete.
+ if self.__start_local_server() == 2:
+ self.__start_local_server()
+
+ ports = list(self.generator_config.ports)
+ self.port_handle = ports
+ # Prepare the ports
+ self.client.reset(ports)
+ # Read HW information from each port
+ # this returns an array of dict (1 per port)
+ """
+ Example of output for Intel XL710
+ [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True,
+ 'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes',
+ u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']},
+ u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE',
+ u'description': u'Ethernet Controller XL710 for 40GbE QSFP+',
+ 'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-',
+ u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match',
+ 'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0',
+ 'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off',
+ 'layer_mode': 'Ethernet', u'numa': 0}, ...]
+ """
+ self.port_info = self.client.get_port_info(ports)
+ LOG.info('Connected to TRex')
+ for id, port in enumerate(self.port_info):
+ LOG.info(' Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s',
+ id, port['description'], port['speed'], port['src_mac'],
+ port['pci_addr'], port['driver'])
+ # Make sure the 2 ports have the same speed
+ if self.port_info[0]['speed'] != self.port_info[1]['speed']:
+ raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' %
+ (self.port_info[0]['speed'],
+ self.port_info[1]['speed']))
+
+ def __start_local_server(self):
+ try:
+ LOG.info("Starting TRex ...")
+ self.__start_server()
+ status = self.__connect_after_start()
+ except (TimeoutError, STLError) as e:
+ LOG.error('Cannot connect to TRex')
+ LOG.error(traceback.format_exc())
+ logpath = '/tmp/trex.log'
+ if os.path.isfile(logpath):
+ # Wait for TRex to finish writing error message
+ last_size = 0
+ for _ in range(self.config.generic_retry_count):
+ size = os.path.getsize(logpath)
+ if size == last_size:
+ # probably not writing anymore
+ break
+ last_size = size
+ time.sleep(1)
+ with open(logpath, 'r', encoding="utf-8") as f:
+ message = f.read()
+ else:
+ message = e.message
+ raise TrafficGeneratorException(message) from e
+ return status
+
+ def __start_server(self):
+ server = TRexTrafficServer()
+ server.run_server(self.generator_config)
+
+ def __check_config(self):
+ server = TRexTrafficServer()
+ return server.check_config_updated(self.generator_config)
+
+ def __restart(self):
+ LOG.info("Restarting TRex ...")
+ self.__stop_server()
+ # Wait for server stopped
+ for _ in range(self.config.generic_retry_count):
+ time.sleep(1)
+ if not self.client.is_connected():
+ LOG.info("TRex is stopped...")
+ break
+ # Start and report a possible failure
+ return self.__start_local_server()
+
+ def __stop_server(self):
+ if self.generator_config.ip == '127.0.0.1':
+ ports = self.client.get_acquired_ports()
+ LOG.info('Release ports %s and stopping TRex...', ports)
+ try:
+ if ports:
+ self.client.release(ports=ports)
+ self.client.server_shutdown()
+ except STLError as e:
+ LOG.warning('Unable to stop TRex. Error: %s', e)
+ else:
+ LOG.info('Using remote TRex. Unable to stop TRex')
+
+ def resolve_arp(self):
+ """Resolve all configured remote IP addresses.
+
+ return: None if ARP failed to resolve for all IP addresses
+ else a dict of list of dest macs indexed by port#
+ the dest macs in the list are indexed by the chain id
+ """
+ self.client.set_service_mode(ports=self.port_handle)
+ LOG.info('Polling ARP until successful...')
+ arp_dest_macs = {}
+ for port, device in zip(self.port_handle, self.generator_config.devices):
+ # there should be 1 stream config per chain
+ stream_configs = device.get_stream_configs()
+ chain_count = len(stream_configs)
+ ctx = self.client.create_service_ctx(port=port)
+ # all dest macs on this port indexed by chain ID
+ dst_macs = [None] * chain_count
+ dst_macs_count = 0
+ # the index in the list is the chain id
+ if self.config.vxlan or self.config.mpls:
+ arps = [
+ ServiceARP(ctx,
+ src_ip=device.vtep_src_ip,
+ dst_ip=device.vtep_dst_ip,
+ vlan=device.vtep_vlan)
+ for cfg in stream_configs
+ ]
+ else:
+ arps = [
+ ServiceARP(ctx,
+ src_ip=cfg['ip_src_tg_gw'],
+ dst_ip=cfg['mac_discovery_gw'],
+ # will be None if no vlan tagging
+ vlan=cfg['vlan_tag'])
+ for cfg in stream_configs
+ ]
+
+ for attempt in range(self.config.generic_retry_count):
+ try:
+ ctx.run(arps)
+ except STLError:
+ LOG.error(traceback.format_exc())
+ continue
+
+ unresolved = []
+ for chain_id, mac in enumerate(dst_macs):
+ if not mac:
+ arp_record = arps[chain_id].get_record()
+ if arp_record.dst_mac:
+ dst_macs[chain_id] = arp_record.dst_mac
+ dst_macs_count += 1
+ LOG.info(' ARP: port=%d chain=%d src IP=%s dst IP=%s -> MAC=%s',
+ port, chain_id,
+ arp_record.src_ip,
+ arp_record.dst_ip, arp_record.dst_mac)
+ else:
+ unresolved.append(arp_record.dst_ip)
+ if dst_macs_count == chain_count:
+ arp_dest_macs[port] = dst_macs
+ LOG.info('ARP resolved successfully for port %s', port)
+ break
+
+ retry = attempt + 1
+ LOG.info('Retrying ARP for: %s (retry %d/%d)',
+ unresolved, retry, self.config.generic_retry_count)
+ if retry < self.config.generic_retry_count:
+ time.sleep(self.config.generic_poll_sec)
+ else:
+ LOG.error('ARP timed out for port %s (resolved %d out of %d)',
+ port,
+ dst_macs_count,
+ chain_count)
+ break
+
+ # A traffic capture may have been started (from a T-Rex console) at this time.
+ # If asked so, we keep the service mode enabled here, and disable it otherwise.
+ # | Disabling the service mode while a capture is in progress
+ # | would cause the application to stop/crash with an error.
+ if not self.config.service_mode:
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
+ if len(arp_dest_macs) == len(self.port_handle):
+ return arp_dest_macs
+ return None
+
+ def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
+ """Check if rate provided by user is above requirements. Applies only if latency is True."""
+ intf_speed = self.generator_config.intf_speed
+ if latency:
+ if bidirectional:
+ mult = 2
+ total_rate = 0
+ for rate in rates:
+ r = utils.convert_rates(l2frame_size, rate, intf_speed)
+ total_rate += int(r['rate_pps'])
+ else:
+ mult = 1
+ r = utils.convert_rates(l2frame_size, rates[0], intf_speed)
+ total_rate = int(r['rate_pps'])
+ # rate must be enough for latency stream and at least 1 pps for base stream per chain
+ if self.config.periodic_gratuitous_arp:
+ required_rate = (self.LATENCY_PPS + 1 + self.config.gratuitous_arp_pps) \
+ * self.config.service_chain_count * mult
+ else:
+ required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
+ result = utils.convert_rates(l2frame_size,
+ {'rate_pps': required_rate},
+ intf_speed * mult)
+ result['result'] = total_rate >= required_rate
+ return result
+
+ return {'result': True}
+
+ def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False):
+ """Program all the streams in Trex server.
+
+ l2frame_size: L2 frame size or IMIX
+ rates: a list of 2 rates to run each direction
+ each rate is a dict like {'rate_pps': '10kpps'}
+ bidirectional: True if bidirectional
+ latency: True if latency measurement is needed
+ e2e: True if performing "end to end" connectivity check
+ """
+ if self.config.no_flow_stats:
+ LOG.info("Traffic flow statistics are disabled.")
+ r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
+ if not r['result']:
+ raise TrafficGeneratorException(
+ 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
+ .format(pps=r['rate_pps'],
+ bps=r['rate_bps'],
+ load=r['rate_percent']))
+ self.l2_frame_size = l2frame_size
+ # a dict of list of streams indexed by port#
+ # in case of fixed size, has self.chain_count * 2 * 2 streams
+ # (1 normal + 1 latency stream per direction per chain)
+ # for IMIX, has self.chain_count * 2 * 4 streams
+ # (3 normal + 1 latency stream per direction per chain)
+ streamblock = {}
+ for port in self.port_handle:
+ streamblock[port] = []
+ stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices]
+ if self.generator_config.ip_addrs_step == 'random' \
+ or self.generator_config.gen_config.udp_port_step == 'random':
+ LOG.warning("Using random step, the number of flows can be less than "
+ "the requested number of flows due to repeatable multivariate random "
+ "generation which can reproduce the same pattern of values")
+ self.rates = [utils.to_rate_str(rate) for rate in rates]
+ for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
+ streamblock[0].extend(self.generate_streams(self.port_handle[0],
+ chain_id,
+ fwd_stream_cfg,
+ l2frame_size,
+ latency=latency,
+ e2e=e2e))
+ if len(self.rates) > 1:
+ streamblock[1].extend(self.generate_streams(self.port_handle[1],
+ chain_id,
+ rev_stream_cfg,
+ l2frame_size,
+ latency=bidirectional and latency,
+ e2e=e2e))
+
+ for port in self.port_handle:
+ if self.config.vxlan:
+ self.client.set_port_attr(ports=port, vxlan_fs=[4789])
+ else:
+ self.client.set_port_attr(ports=port, vxlan_fs=None)
+ self.client.add_streams(streamblock[port], ports=port)
+ LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port)
+
+ def clear_streamblock(self):
+ """Clear all streams from TRex."""
+ self.rates = []
+ self.client.reset(self.port_handle)
+ LOG.info('Cleared all existing streams')
+
+ def get_stats(self, ifstats=None):
+ """Get stats from Trex."""
+ stats = self.client.get_stats()
+ return self.extract_stats(stats, ifstats)
+
+ def get_macs(self):
+ """Return the Trex local port MAC addresses.
+
+ return: a list of MAC addresses indexed by the port#
+ """
+ return [port['src_mac'] for port in self.port_info]
+
+ def get_port_speed_gbps(self):
+ """Return the Trex local port MAC addresses.
+
+ return: a list of speed in Gbps indexed by the port#
+ """
+ return [port['speed'] for port in self.port_info]
+
+ def clear_stats(self):
+ """Clear all stats in the traffic gneerator."""
+ if self.port_handle:
+ self.client.clear_stats()
+
+ def start_traffic(self):
+ """Start generating traffic in all ports."""
+ for port, rate in zip(self.port_handle, self.rates):
+ self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
+
+ def stop_traffic(self):
+ """Stop generating traffic."""
+ self.client.stop(ports=self.port_handle)
+
+ def start_capture(self):
+ """Capture all packets on both ports that are unicast to us."""
+ if self.capture_id:
+ self.stop_capture()
+ # Need to filter out unwanted packets so we do not end up counting
+ # src MACs of frames that are not unicast to us
+ src_mac_list = self.get_macs()
+ bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1])
+ # ports must be set in service in order to enable capture
+ self.client.set_service_mode(ports=self.port_handle)
+ self.capture_id = self.client.start_capture \
+ (rx_ports=self.port_handle, bpf_filter=bpf_filter)
+
+ def fetch_capture_packets(self):
+ """Fetch capture packets in capture mode."""
+ if self.capture_id:
+ self.packet_list = []
+ self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
+ output=self.packet_list)
+
+ def stop_capture(self):
+ """Stop capturing packets."""
+ if self.capture_id:
+ self.client.stop_capture(capture_id=self.capture_id['id'])
+ self.capture_id = None
+ # A traffic capture may have been started (from a T-Rex console) at this time.
+ # If asked so, we keep the service mode enabled here, and disable it otherwise.
+ # | Disabling the service mode while a capture is in progress
+ # | would cause the application to stop/crash with an error.
+ if not self.config.service_mode:
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
+
+ def cleanup(self):
+ """Cleanup Trex driver."""
+ if self.client:
+ try:
+ self.client.reset(self.port_handle)
+ self.client.disconnect()
+ except STLError:
+ # TRex does not like a reset while in disconnected state
+ pass
+
+ def set_service_mode(self, enabled=True):
+ """Enable/disable the 'service' mode."""
+ self.client.set_service_mode(ports=self.port_handle, enabled=enabled)