diff options
Diffstat (limited to 'nfvbench/traffic_gen')
-rw-r--r-- | nfvbench/traffic_gen/dummy.py | 99 | ||||
-rw-r--r-- | nfvbench/traffic_gen/traffic_base.py | 147 | ||||
-rw-r--r-- | nfvbench/traffic_gen/traffic_utils.py | 53 | ||||
-rw-r--r-- | nfvbench/traffic_gen/trex.py | 460 | ||||
-rw-r--r-- | nfvbench/traffic_gen/trex_gen.py | 1208 |
5 files changed, 1425 insertions, 542 deletions
diff --git a/nfvbench/traffic_gen/dummy.py b/nfvbench/traffic_gen/dummy.py index 6f57f4d..95147ab 100644 --- a/nfvbench/traffic_gen/dummy.py +++ b/nfvbench/traffic_gen/dummy.py @@ -12,8 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. -from traffic_base import AbstractTrafficGenerator -import traffic_utils as utils +from nfvbench.log import LOG +from .traffic_base import AbstractTrafficGenerator +from . import traffic_utils as utils class DummyTG(AbstractTrafficGenerator): @@ -23,28 +24,26 @@ class DummyTG(AbstractTrafficGenerator): Useful for unit testing without actually generating any traffic. """ - def __init__(self, config): - AbstractTrafficGenerator.__init__(self, config) + def __init__(self, traffic_client): + AbstractTrafficGenerator.__init__(self, traffic_client) self.port_handle = [] self.rates = [] self.l2_frame_size = 0 - self.duration_sec = self.config.duration_sec - self.intf_speed = config.generator_config.intf_speed + self.duration_sec = traffic_client.config.duration_sec + self.intf_speed = traffic_client.generator_config.intf_speed self.set_response_curve() + self.packet_list = None def get_version(self): return "0.1" - def init(self): - pass - def get_tx_pps_dropped_pps(self, tx_rate): - '''Get actual tx packets based on requested tx rate + """Get actual tx packets based on requested tx rate. :param tx_rate: requested TX rate with unit ('40%', '1Mbps', '1000pps') :return: the actual TX pps and the dropped pps corresponding to the requested TX rate - ''' + """ dr, tx = self.__get_dr_actual_tx(tx_rate) actual_tx_bps = utils.load_to_bps(tx, self.intf_speed) avg_packet_size = utils.get_average_packet_size(self.l2_frame_size) @@ -56,14 +55,14 @@ class DummyTG(AbstractTrafficGenerator): return int(tx_packets), int(dropped) def set_response_curve(self, lr_dr=0, ndr=100, max_actual_tx=100, max_11_tx=100): - '''Set traffic gen response characteristics + """Set traffic gen response characteristics. Specifies the drop rate curve and the actual TX curve :param float lr_dr: The actual drop rate at TX line rate (in %, 0..100) :param float ndr: The true NDR (0 packet drop) in % (0..100) of line rate" :param float max_actual_tx: highest actual TX when requested TX is 100% :param float max_11_tx: highest requested TX that results in same actual TX - ''' + """ self.target_ndr = ndr if ndr < 100: self.dr_slope = float(lr_dr) / (100 - ndr) @@ -77,10 +76,11 @@ class DummyTG(AbstractTrafficGenerator): self.tx_slope = 0 def __get_dr_actual_tx(self, requested_tx_rate): - '''Get drop rate at given requested tx rate + """Get drop rate at given requested tx rate. + :param float requested_tx_rate: requested tx rate in % (0..100) :return: the drop rate and actual tx rate at that requested_tx_rate in % (0..100) - ''' + """ if requested_tx_rate <= self.max_11_tx: actual_tx = requested_tx_rate else: @@ -92,24 +92,18 @@ class DummyTG(AbstractTrafficGenerator): return dr, actual_tx def connect(self): - ports = list(self.config.generator_config.ports) + ports = list(self.traffic_client.generator_config.ports) self.port_handle = ports - def is_arp_successful(self): - return True - - def config_interface(self): - pass - - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): self.rates = [utils.to_rate_str(rate) for rate in rates] self.l2_frame_size = l2frame_size def clear_streamblock(self): pass - def get_stats(self): - '''Get stats from current run. + def get_stats(self, ifstats=None): + """Get stats from current run. The binary search mainly looks at 2 results to make the decision: actual tx packets @@ -117,7 +111,7 @@ class DummyTG(AbstractTrafficGenerator): From the Requested TX rate - we get the Actual TX rate and the RX drop rate From the Run duration and actual TX rate - we get the actual total tx packets From the Actual tx packets and RX drop rate - we get the RX dropped packets - ''' + """ result = {} total_tx_pps = 0 @@ -153,10 +147,31 @@ class DummyTG(AbstractTrafficGenerator): total_tx_pps += tx_pps # actual total tx rate in pps result['total_tx_rate'] = total_tx_pps + # actual offered tx rate in bps + avg_packet_size = utils.get_average_packet_size(self.l2_frame_size) + total_tx_bps = utils.pps_to_bps(total_tx_pps, avg_packet_size) + result['offered_tx_rate_bps'] = total_tx_bps + + result.update(self.get_theoretical_rates(avg_packet_size)) return result + def get_stream_stats(self, tg_stats, if_stats, latencies, chain_idx): + for port in range(2): + if_stats[port].tx = 1000 + if_stats[port].rx = 1000 + latencies[port].min_usec = 10 + latencies[port].max_usec = 100 + latencies[port].avg_usec = 50 + def get_macs(self): - return ['00.00.00.00.00.01', '00.00.00.00.00.02'] + return ['00:00:00:00:00:01', '00:00:00:00:00:02'] + + def get_port_speed_gbps(self): + """Return the local port speeds. + + return: a list of speed in Gbps indexed by the port# + """ + return [10, 10] def clear_stats(self): pass @@ -164,14 +179,42 @@ class DummyTG(AbstractTrafficGenerator): def start_traffic(self): pass + def fetch_capture_packets(self): + def _get_packet_capture(mac): + # convert text to binary + src_mac = bytearray.fromhex(mac.replace(':', '')).decode() + return {'binary': bytes('SSSSSS' + src_mac, 'ascii')} + + # for packet capture, generate 2*scc random packets + # normally we should generate packets coming from the right dest macs + self.packet_list = [] + for dest_macs in self.traffic_client.generator_config.get_dest_macs(): + for mac in dest_macs: + self.packet_list.append(_get_packet_capture(mac)) + def stop_traffic(self): pass + def start_capture(self): + pass + + def stop_capture(self): + pass + def cleanup(self): pass def set_mode(self): pass + def set_service_mode(self, enabled=True): + pass + def resolve_arp(self): - return True + """Resolve ARP sucessfully.""" + def get_macs(port, scc): + return ['00:00:00:00:%02x:%02x' % (port, chain) for chain in range(scc)] + scc = self.traffic_client.generator_config.service_chain_count + res = [get_macs(port, scc) for port in range(2)] + LOG.info('Dummy TG ARP: %s', str(res)) + return res diff --git a/nfvbench/traffic_gen/traffic_base.py b/nfvbench/traffic_gen/traffic_base.py index 817ecc8..30aec6e 100644 --- a/nfvbench/traffic_gen/traffic_base.py +++ b/nfvbench/traffic_gen/traffic_base.py @@ -13,29 +13,64 @@ # under the License. import abc +import sys from nfvbench.log import LOG -import traffic_utils - +from . import traffic_utils +from hdrh.histogram import HdrHistogram +from functools import reduce + + +class Latency(object): + """A class to hold latency data.""" + + def __init__(self, latency_list=None): + """Create a latency instance. + + latency_list: aggregate all latency values from list if not None + """ + self.min_usec = sys.maxsize + self.max_usec = 0 + self.avg_usec = 0 + self.hdrh = None + if latency_list: + hdrh_list = [] + for lat in latency_list: + if lat.available(): + self.min_usec = min(self.min_usec, lat.min_usec) + self.max_usec = max(self.max_usec, lat.max_usec) + self.avg_usec += lat.avg_usec + if lat.hdrh_available(): + hdrh_list.append(HdrHistogram.decode(lat.hdrh)) + + # aggregate histograms if any + if hdrh_list: + def add_hdrh(x, y): + x.add(y) + return x + decoded_hdrh = reduce(add_hdrh, hdrh_list) + self.hdrh = HdrHistogram.encode(decoded_hdrh).decode('utf-8') + + # round to nearest usec + self.avg_usec = int(round(float(self.avg_usec) / len(latency_list))) + + def available(self): + """Return True if latency information is available.""" + return self.min_usec != sys.maxsize + + def hdrh_available(self): + """Return True if latency histogram information is available.""" + return self.hdrh is not None class TrafficGeneratorException(Exception): - pass - + """Exception for traffic generator.""" class AbstractTrafficGenerator(object): - # src_mac (6) + dst_mac (6) + mac_type (2) + frame_check (4) = 18 - l2_header_size = 18 - - imix_l2_sizes = [64, 594, 1518] - imix_l3_sizes = [size - l2_header_size for size in imix_l2_sizes] - imix_ratios = [7, 4, 1] - imix_avg_l2_size = sum( - [1.0 * imix[0] * imix[1] for imix in zip(imix_l2_sizes, imix_ratios)]) / sum(imix_ratios) - - traffic_utils.imix_avg_l2_size = imix_avg_l2_size - def __init__(self, config): - self.config = config + def __init__(self, traffic_client): + self.traffic_client = traffic_client + self.generator_config = traffic_client.generator_config + self.config = traffic_client.config @abc.abstractmethod def get_version(self): @@ -43,42 +78,28 @@ class AbstractTrafficGenerator(object): return None @abc.abstractmethod - def init(self): - # Must be implemented by sub classes - return None - - @abc.abstractmethod def connect(self): # Must be implemented by sub classes return None @abc.abstractmethod - def config_interface(self): - # Must be implemented by sub classes - return None - - @abc.abstractmethod - def create_traffic(self): + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): # Must be implemented by sub classes return None def modify_rate(self, rate, reverse): + """Change the rate per port. + + rate: new rate in % (0 to 100) + reverse: 0 for port 0, 1 for port 1 + """ port_index = int(reverse) port = self.port_handle[port_index] self.rates[port_index] = traffic_utils.to_rate_str(rate) - LOG.info('Modified traffic stream for %s, new rate=%s.', port, - traffic_utils.to_rate_str(rate)) - - def modify_traffic(self): - # Must be implemented by sub classes - return None + LOG.info('Modified traffic stream for port %s, new rate=%s.', port, self.rates[port_index]) @abc.abstractmethod - def get_stats(self): - # Must be implemented by sub classes - return None - - def clear_traffic(self): + def get_stats(self, ifstats): # Must be implemented by sub classes return None @@ -94,5 +115,53 @@ class AbstractTrafficGenerator(object): @abc.abstractmethod def cleanup(self): - # Must be implemented by sub classes + """Cleanup the traffic generator.""" return None + + def clear_streamblock(self): + """Clear all streams from the traffic generator.""" + + @abc.abstractmethod + def resolve_arp(self): + """Resolve all configured remote IP addresses. + + return: None if ARP failed to resolve for all IP addresses + else a dict of list of dest macs indexed by port# + the dest macs in the list are indexed by the chain id + """ + + @abc.abstractmethod + def get_macs(self): + """Return the local port MAC addresses. + + return: a list of MAC addresses indexed by the port# + """ + + @abc.abstractmethod + def get_port_speed_gbps(self): + """Return the local port speeds. + + return: a list of speed in Gbps indexed by the port# + """ + + def get_theoretical_rates(self, avg_packet_size): + + result = {} + + # actual interface speed? (may be a virtual override) + intf_speed = self.config.intf_speed_used + + if hasattr(self.config, 'user_info') and self.config.user_info is not None: + if "extra_encapsulation_bytes" in self.config.user_info: + frame_size_full_encapsulation = avg_packet_size + self.config.user_info[ + "extra_encapsulation_bytes"] + result['theoretical_tx_rate_pps'] = traffic_utils.bps_to_pps( + intf_speed, frame_size_full_encapsulation) * 2 + result['theoretical_tx_rate_bps'] = traffic_utils.pps_to_bps( + result['theoretical_tx_rate_pps'], avg_packet_size) + else: + result['theoretical_tx_rate_pps'] = traffic_utils.bps_to_pps(intf_speed, + avg_packet_size) * 2 + result['theoretical_tx_rate_bps'] = traffic_utils.pps_to_bps( + result['theoretical_tx_rate_pps'], avg_packet_size) + return result diff --git a/nfvbench/traffic_gen/traffic_utils.py b/nfvbench/traffic_gen/traffic_utils.py index 4a7f855..4366a6c 100644 --- a/nfvbench/traffic_gen/traffic_utils.py +++ b/nfvbench/traffic_gen/traffic_utils.py @@ -14,42 +14,66 @@ import bitmath -from nfvbench.utils import multiplier_map -imix_avg_l2_size = None +# IMIX frame size including the 4-byte FCS field +IMIX_L2_SIZES = [64, 594, 1518] +IMIX_RATIOS = [7, 4, 1] +# weighted average l2 frame size includng the 4-byte FCS +IMIX_AVG_L2_FRAME_SIZE = sum( + [1.0 * imix[0] * imix[1] for imix in zip(IMIX_L2_SIZES, IMIX_RATIOS)]) / sum(IMIX_RATIOS) +multiplier_map = { + 'K': 1000, + 'M': 1000000, + 'G': 1000000000 +} def convert_rates(l2frame_size, rate, intf_speed): + """Convert a given rate unit into the other rate units. + + l2frame_size: size of the L2 frame in bytes (includes 32-bit FCS) or 'IMIX' + rate: a dict that has at least one of the following key: + 'rate_pps', 'rate_bps', 'rate_percent' + with the corresponding input value + intf_speed: the line rate speed in bits per second + """ avg_packet_size = get_average_packet_size(l2frame_size) if 'rate_pps' in rate: + # input = packets/sec initial_rate_type = 'rate_pps' pps = rate['rate_pps'] bps = pps_to_bps(pps, avg_packet_size) load = bps_to_load(bps, intf_speed) elif 'rate_bps' in rate: + # input = bits per second initial_rate_type = 'rate_bps' bps = rate['rate_bps'] load = bps_to_load(bps, intf_speed) pps = bps_to_pps(bps, avg_packet_size) elif 'rate_percent' in rate: + # input = percentage of the line rate (between 0.0 and 100.0) initial_rate_type = 'rate_percent' load = rate['rate_percent'] bps = load_to_bps(load, intf_speed) pps = bps_to_pps(bps, avg_packet_size) else: raise Exception('Traffic config needs to have a rate type key') - return { 'initial_rate_type': initial_rate_type, - 'rate_pps': int(pps), + 'rate_pps': int(float(pps)), 'rate_percent': load, - 'rate_bps': int(bps) + 'rate_bps': int(float(bps)) } def get_average_packet_size(l2frame_size): + """Retrieve the average L2 frame size + + l2frame_size: an L2 frame size in bytes (including FCS) or 'IMIX' + return: average l2 frame size inlcuding the 32-bit FCS + """ if l2frame_size.upper() == 'IMIX': - return imix_avg_l2_size + return IMIX_AVG_L2_FRAME_SIZE return float(l2frame_size) @@ -92,23 +116,22 @@ def parse_rate_str(rate_str): rate_pps = rate_pps[:-1] except KeyError: multiplier = 1 - rate_pps = int(rate_pps.strip()) * multiplier + rate_pps = int(float(rate_pps.strip()) * multiplier) if rate_pps <= 0: raise Exception('%s is out of valid range' % rate_str) return {'rate_pps': str(rate_pps)} - elif rate_str.endswith('ps'): + if rate_str.endswith('ps'): rate = rate_str.replace('ps', '').strip() bit_rate = bitmath.parse_string(rate).bits if bit_rate <= 0: raise Exception('%s is out of valid range' % rate_str) return {'rate_bps': str(int(bit_rate))} - elif rate_str.endswith('%'): + if rate_str.endswith('%'): rate_percent = float(rate_str.replace('%', '').strip()) if rate_percent <= 0 or rate_percent > 100.0: raise Exception('%s is out of valid range (must be 1-100%%)' % rate_str) return {'rate_percent': str(rate_percent)} - else: - raise Exception('Unknown rate string format %s' % rate_str) + raise Exception('Unknown rate string format %s' % rate_str) def get_load_from_rate(rate_str, avg_frame_size=64, line_rate='10Gbps'): '''From any rate string (with unit) return the corresponding load (in % unit) @@ -151,10 +174,10 @@ def to_rate_str(rate): if 'rate_pps' in rate: pps = rate['rate_pps'] return '{}pps'.format(pps) - elif 'rate_bps' in rate: + if 'rate_bps' in rate: bps = rate['rate_bps'] return '{}bps'.format(bps) - elif 'rate_percent' in rate: + if 'rate_percent' in rate: load = rate['rate_percent'] return '{}%'.format(load) assert False @@ -164,7 +187,7 @@ def to_rate_str(rate): def nan_replace(d): """Replaces every occurence of 'N/A' with float nan.""" - for k, v in d.iteritems(): + for k, v in d.items(): if isinstance(v, dict): nan_replace(v) elif v == 'N/A': @@ -179,5 +202,5 @@ def mac_to_int(mac): def int_to_mac(i): """Converts integer representation of MAC address to hex string.""" mac = format(i, 'x').zfill(12) - blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)] + blocks = [mac[x:x + 2] for x in range(0, len(mac), 2)] return ':'.join(blocks) diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py deleted file mode 100644 index 23faebc..0000000 --- a/nfvbench/traffic_gen/trex.py +++ /dev/null @@ -1,460 +0,0 @@ -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import random -import time -import traceback - -from collections import defaultdict -from itertools import count -from nfvbench.log import LOG -from nfvbench.specs import ChainType -from nfvbench.traffic_server import TRexTrafficServer -from nfvbench.utils import cast_integer -from nfvbench.utils import timeout -from nfvbench.utils import TimeoutError -from traffic_base import AbstractTrafficGenerator -from traffic_base import TrafficGeneratorException -import traffic_utils as utils - -# pylint: disable=import-error -from trex_stl_lib.api import CTRexVmInsFixHwCs -from trex_stl_lib.api import Dot1Q -from trex_stl_lib.api import Ether -from trex_stl_lib.api import IP -from trex_stl_lib.api import STLClient -from trex_stl_lib.api import STLError -from trex_stl_lib.api import STLFlowLatencyStats -from trex_stl_lib.api import STLFlowStats -from trex_stl_lib.api import STLPktBuilder -from trex_stl_lib.api import STLScVmRaw -from trex_stl_lib.api import STLStream -from trex_stl_lib.api import STLTXCont -from trex_stl_lib.api import STLVmFixChecksumHw -from trex_stl_lib.api import STLVmFlowVar -from trex_stl_lib.api import STLVmFlowVarRepetableRandom -from trex_stl_lib.api import STLVmWrFlowVar -from trex_stl_lib.api import UDP -from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP -# pylint: enable=import-error - - -class TRex(AbstractTrafficGenerator): - LATENCY_PPS = 1000 - - def __init__(self, runner): - AbstractTrafficGenerator.__init__(self, runner) - self.client = None - self.id = count() - self.latencies = defaultdict(list) - self.stream_ids = defaultdict(list) - self.port_handle = [] - self.streamblock = defaultdict(list) - self.rates = [] - self.arps = {} - - def get_version(self): - return self.client.get_server_version() - - def extract_stats(self, in_stats): - utils.nan_replace(in_stats) - LOG.debug(in_stats) - - result = {} - for ph in self.port_handle: - stats = self.__combine_stats(in_stats, ph) - result[ph] = { - 'tx': { - 'total_pkts': cast_integer(stats['tx_pkts']['total']), - 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']), - 'pkt_rate': cast_integer(stats['tx_pps']['total']), - 'pkt_bit_rate': cast_integer(stats['tx_bps']['total']) - }, - 'rx': { - 'total_pkts': cast_integer(stats['rx_pkts']['total']), - 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']), - 'pkt_rate': cast_integer(stats['rx_pps']['total']), - 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']), - 'dropped_pkts': cast_integer( - stats['tx_pkts']['total'] - stats['rx_pkts']['total']) - } - } - - lat = self.__combine_latencies(in_stats, ph) - result[ph]['rx']['max_delay_usec'] = cast_integer( - lat['total_max']) if 'total_max' in lat else float('nan') - result[ph]['rx']['min_delay_usec'] = cast_integer( - lat['total_min']) if 'total_min' in lat else float('nan') - result[ph]['rx']['avg_delay_usec'] = cast_integer( - lat['average']) if 'average' in lat else float('nan') - total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] - result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec) - return result - - def __combine_stats(self, in_stats, port_handle): - """Traverses TRex result dictionary and combines stream stats. Used for combining latency - and regular streams together. - """ - result = defaultdict(lambda: defaultdict(float)) - - for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]: - record = in_stats['flow_stats'][pg_id] - for stat_type, stat_type_values in record.iteritems(): - for ph, value in stat_type_values.iteritems(): - result[stat_type][ph] += value - - return result - - def __combine_latencies(self, in_stats, port_handle): - """Traverses TRex result dictionary and combines chosen latency stats.""" - if not self.latencies[port_handle]: - return {} - - result = defaultdict(float) - result['total_min'] = float("inf") - for lat_id in self.latencies[port_handle]: - lat = in_stats['latency'][lat_id] - result['dropped_pkts'] += lat['err_cntrs']['dropped'] - result['total_max'] = max(lat['latency']['total_max'], result['total_max']) - result['total_min'] = min(lat['latency']['total_min'], result['total_min']) - result['average'] += lat['latency']['average'] - - result['average'] /= len(self.latencies[port_handle]) - - return result - - def create_pkt(self, stream_cfg, l2frame_size): - # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP) - payload = 'x' * (max(64, int(l2frame_size)) - 46) - - pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) - - if stream_cfg['vlan_tag'] is not None: - pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) - - udp_args = {} - if stream_cfg['udp_src_port']: - udp_args['sport'] = int(stream_cfg['udp_src_port']) - if stream_cfg['udp_dst_port']: - udp_args['dport'] = int(stream_cfg['udp_dst_port']) - pkt_base /= IP() / UDP(**udp_args) - - if stream_cfg['ip_addrs_step'] == 'random': - src_fv = STLVmFlowVarRepetableRandom( - name="ip_src", - min_value=stream_cfg['ip_src_addr'], - max_value=stream_cfg['ip_src_addr_max'], - size=4, - seed=random.randint(0, 32767), - limit=stream_cfg['ip_src_count']) - dst_fv = STLVmFlowVarRepetableRandom( - name="ip_dst", - min_value=stream_cfg['ip_dst_addr'], - max_value=stream_cfg['ip_dst_addr_max'], - size=4, - seed=random.randint(0, 32767), - limit=stream_cfg['ip_dst_count']) - else: - src_fv = STLVmFlowVar( - name="ip_src", - min_value=stream_cfg['ip_src_addr'], - max_value=stream_cfg['ip_src_addr'], - size=4, - op="inc", - step=stream_cfg['ip_addrs_step']) - dst_fv = STLVmFlowVar( - name="ip_dst", - min_value=stream_cfg['ip_dst_addr'], - max_value=stream_cfg['ip_dst_addr_max'], - size=4, - op="inc", - step=stream_cfg['ip_addrs_step']) - - vm_param = [ - src_fv, - STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"), - dst_fv, - STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"), - STLVmFixChecksumHw(l3_offset="IP", - l4_offset="UDP", - l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP) - ] - - return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param)) - - def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True): - idx_lat = None - streams = [] - if l2frame == 'IMIX': - for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)): - pkt = self.create_pkt(stream_cfg, l2_frame_size) - streams.append(STLStream(packet=pkt, - isg=0.1 * t, - flow_stats=STLFlowStats( - pg_id=self.stream_ids[port_handle]), - mode=STLTXCont(pps=ratio))) - - if latency: - idx_lat = self.id.next() - sl = STLStream(packet=pkt, - isg=isg, - flow_stats=STLFlowLatencyStats(pg_id=idx_lat), - mode=STLTXCont(pps=self.LATENCY_PPS)) - streams.append(sl) - else: - pkt = self.create_pkt(stream_cfg, l2frame) - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]), - mode=STLTXCont())) - - if latency: - idx_lat = self.id.next() - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowLatencyStats(pg_id=idx_lat), - mode=STLTXCont(pps=self.LATENCY_PPS))) - - if latency: - self.latencies[port_handle].append(idx_lat) - - return streams - - def init(self): - pass - - @timeout(5) - def __connect(self, client): - client.connect() - - def __connect_after_start(self): - # after start, Trex may take a bit of time to initialize - # so we need to retry a few times - for it in xrange(self.config.generic_retry_count): - try: - time.sleep(1) - self.client.connect() - break - except Exception as ex: - if it == (self.config.generic_retry_count - 1): - raise ex - LOG.info("Retrying connection to TRex (%s)...", ex.message) - - def connect(self): - LOG.info("Connecting to TRex...") - server_ip = self.config.generator_config.ip - - # Connect to TRex server - self.client = STLClient(server=server_ip) - try: - self.__connect(self.client) - except (TimeoutError, STLError) as e: - if server_ip == '127.0.0.1': - try: - self.__start_server() - self.__connect_after_start() - except (TimeoutError, STLError) as e: - LOG.error('Cannot connect to TRex') - LOG.error(traceback.format_exc()) - logpath = '/tmp/trex.log' - if os.path.isfile(logpath): - # Wait for TRex to finish writing error message - last_size = 0 - for _ in xrange(self.config.generic_retry_count): - size = os.path.getsize(logpath) - if size == last_size: - # probably not writing anymore - break - last_size = size - time.sleep(1) - with open(logpath, 'r') as f: - message = f.read() - else: - message = e.message - raise TrafficGeneratorException(message) - else: - raise TrafficGeneratorException(e.message) - - ports = list(self.config.generator_config.ports) - self.port_handle = ports - # Prepare the ports - self.client.reset(ports) - - def set_mode(self): - if self.config.service_chain == ChainType.EXT and not self.config.no_arp: - self.__set_l3_mode() - else: - self.__set_l2_mode() - - def __set_l3_mode(self): - self.client.set_service_mode(ports=self.port_handle, enabled=True) - for port, device in zip(self.port_handle, self.config.generator_config.devices): - try: - self.client.set_l3_mode(port=port, - src_ipv4=device.tg_gateway_ip, - dst_ipv4=device.dst.gateway_ip, - vlan=device.vlan_tag if device.vlan_tagging else None) - except STLError: - # TRex tries to resolve ARP already, doesn't have to be successful yet - continue - self.client.set_service_mode(ports=self.port_handle, enabled=False) - - def __set_l2_mode(self): - self.client.set_service_mode(ports=self.port_handle, enabled=True) - for port, device in zip(self.port_handle, self.config.generator_config.devices): - for cfg in device.get_stream_configs(self.config.generator_config.service_chain): - self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst']) - self.client.set_service_mode(ports=self.port_handle, enabled=False) - - def __start_server(self): - server = TRexTrafficServer() - server.run_server(self.config.generator_config) - - def resolve_arp(self): - self.client.set_service_mode(ports=self.port_handle) - LOG.info('Polling ARP until successful') - resolved = 0 - attempt = 0 - for port, device in zip(self.port_handle, self.config.generator_config.devices): - ctx = self.client.create_service_ctx(port=port) - - arps = [ - STLServiceARP(ctx, - src_ip=cfg['ip_src_tg_gw'], - dst_ip=cfg['mac_discovery_gw'], - vlan=device.vlan_tag if device.vlan_tagging else None) - for cfg in device.get_stream_configs(self.config.generator_config.service_chain) - ] - - for _ in xrange(self.config.generic_retry_count): - attempt += 1 - try: - ctx.run(arps) - except STLError: - LOG.error(traceback.format_exc()) - continue - - self.arps[port] = [arp.get_record().dst_mac for arp in arps - if arp.get_record().dst_mac is not None] - - if len(self.arps[port]) == self.config.service_chain_count: - resolved += 1 - LOG.info('ARP resolved successfully for port %s', port) - break - else: - failed = [arp.get_record().dst_ip for arp in arps - if arp.get_record().dst_mac is None] - LOG.info('Retrying ARP for: %d (%d / %d)', - failed, attempt, self.config.generic_retry_count) - time.sleep(self.config.generic_poll_sec) - - self.client.set_service_mode(ports=self.port_handle, enabled=False) - return resolved == len(self.port_handle) - - def config_interface(self): - pass - - def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency): - """Check if rate provided by user is above requirements. Applies only if latency is True.""" - intf_speed = self.config.generator_config.intf_speed - if latency: - if bidirectional: - mult = 2 - total_rate = 0 - for rate in rates: - r = utils.convert_rates(l2frame_size, rate, intf_speed) - total_rate += int(r['rate_pps']) - else: - mult = 1 - total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed) - # rate must be enough for latency stream and at least 1 pps for base stream per chain - required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult - result = utils.convert_rates(l2frame_size, - {'rate_pps': required_rate}, - intf_speed * mult) - result['result'] = total_rate >= required_rate - return result - - return {'result': True} - - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): - r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) - if not r['result']: - raise TrafficGeneratorException( - 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.' - .format(pps=r['rate_pps'], - bps=r['rate_bps'], - load=r['rate_percent'])) - - stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain) - for d in self.config.generator_config.devices] - self.rates = [utils.to_rate_str(rate) for rate in rates] - - for ph in self.port_handle: - # generate one pg_id for each direction - self.stream_ids[ph] = self.id.next() - - for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): - if self.config.service_chain == ChainType.EXT and not self.config.no_arp: - fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i] - rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i] - - self.streamblock[0].extend(self.generate_streams(self.port_handle[0], - fwd_stream_cfg, - l2frame_size, - latency=latency)) - if len(self.rates) > 1: - self.streamblock[1].extend(self.generate_streams(self.port_handle[1], - rev_stream_cfg, - l2frame_size, - isg=10.0, - latency=bidirectional and latency)) - - for ph in self.port_handle: - self.client.add_streams(self.streamblock[ph], ports=ph) - LOG.info('Created traffic stream for port %s.', ph) - - def clear_streamblock(self): - self.streamblock = defaultdict(list) - self.latencies = defaultdict(list) - self.stream_ids = defaultdict(list) - self.rates = [] - self.client.reset(self.port_handle) - LOG.info('Cleared all existing streams.') - - def get_stats(self): - stats = self.client.get_pgid_stats() - return self.extract_stats(stats) - - def get_macs(self): - return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle] - - def clear_stats(self): - if self.port_handle: - self.client.clear_stats() - - def start_traffic(self): - for port, rate in zip(self.port_handle, self.rates): - self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True) - - def stop_traffic(self): - self.client.stop(ports=self.port_handle) - - def cleanup(self): - if self.client: - try: - self.client.reset(self.port_handle) - self.client.disconnect() - except STLError: - # TRex does not like a reset while in disconnected state - pass diff --git a/nfvbench/traffic_gen/trex_gen.py b/nfvbench/traffic_gen/trex_gen.py new file mode 100644 index 0000000..dff72ac --- /dev/null +++ b/nfvbench/traffic_gen/trex_gen.py @@ -0,0 +1,1208 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Driver module for TRex traffic generator.""" + +import math +import os +import sys +import random +import time +import traceback +from functools import reduce + +from itertools import count +# pylint: disable=import-error +from scapy.contrib.mpls import MPLS # flake8: noqa +# pylint: enable=import-error +from nfvbench.log import LOG +from nfvbench.specs import ChainType +from nfvbench.traffic_server import TRexTrafficServer +from nfvbench.utils import cast_integer +from nfvbench.utils import timeout +from nfvbench.utils import TimeoutError + +from hdrh.histogram import HdrHistogram + +# pylint: disable=import-error +from trex.common.services.trex_service_arp import ServiceARP +from trex.stl.api import ARP +from trex.stl.api import bind_layers +from trex.stl.api import CTRexVmInsFixHwCs +from trex.stl.api import Dot1Q +from trex.stl.api import Ether +from trex.stl.api import FlagsField +from trex.stl.api import IP +from trex.stl.api import Packet +from trex.stl.api import STLClient +from trex.stl.api import STLError +from trex.stl.api import STLFlowLatencyStats +from trex.stl.api import STLFlowStats +from trex.stl.api import STLPktBuilder +from trex.stl.api import STLScVmRaw +from trex.stl.api import STLStream +from trex.stl.api import STLTXCont +from trex.stl.api import STLTXMultiBurst +from trex.stl.api import STLVmFixChecksumHw +from trex.stl.api import STLVmFixIpv4 +from trex.stl.api import STLVmFlowVar +from trex.stl.api import STLVmFlowVarRepeatableRandom +from trex.stl.api import STLVmTupleGen +from trex.stl.api import STLVmWrFlowVar +from trex.stl.api import ThreeBytesField +from trex.stl.api import UDP +from trex.stl.api import XByteField + +# pylint: enable=import-error + +from .traffic_base import AbstractTrafficGenerator +from .traffic_base import TrafficGeneratorException +from . import traffic_utils as utils +from .traffic_utils import IMIX_AVG_L2_FRAME_SIZE +from .traffic_utils import IMIX_L2_SIZES +from .traffic_utils import IMIX_RATIOS + +class VXLAN(Packet): + """VxLAN class.""" + + _VXLAN_FLAGS = ['R' * 27] + ['I'] + ['R' * 5] + name = "VXLAN" + fields_desc = [FlagsField("flags", 0x08000000, 32, _VXLAN_FLAGS), + ThreeBytesField("vni", 0), + XByteField("reserved", 0x00)] + + def mysummary(self): + """Summary.""" + return self.sprintf("VXLAN (vni=%VXLAN.vni%)") + +class TRex(AbstractTrafficGenerator): + """TRex traffic generator driver.""" + + LATENCY_PPS = 1000 + CHAIN_PG_ID_MASK = 0x007F + PORT_PG_ID_MASK = 0x0080 + LATENCY_PG_ID_MASK = 0x0100 + + def __init__(self, traffic_client): + """Trex driver.""" + AbstractTrafficGenerator.__init__(self, traffic_client) + self.client = None + self.id = count() + self.port_handle = [] + self.chain_count = self.generator_config.service_chain_count + self.rates = [] + self.capture_id = None + self.packet_list = [] + self.l2_frame_size = 0 + + def get_version(self): + """Get the Trex version.""" + return self.client.get_server_version() if self.client else '' + + def get_pg_id(self, port, chain_id): + """Calculate the packet group IDs to use for a given port/stream type/chain_id. + + port: 0 or 1 + chain_id: identifies to which chain the pg_id is associated (0 to 255) + return: pg_id, lat_pg_id + + We use a bit mask to set up the 3 fields: + 0x007F: chain ID (8 bits for a max of 128 chains) + 0x0080: port bit + 0x0100: latency bit + """ + pg_id = port * TRex.PORT_PG_ID_MASK | chain_id + return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK + + def extract_stats(self, in_stats, ifstats): + """Extract stats from dict returned by Trex API. + + :param in_stats: dict as returned by TRex api + """ + utils.nan_replace(in_stats) + # LOG.debug(in_stats) + + result = {} + # port_handles should have only 2 elements: [0, 1] + # so (1 - ph) will be the index for the far end port + for ph in self.port_handle: + stats = in_stats[ph] + far_end_stats = in_stats[1 - ph] + result[ph] = { + 'tx': { + 'total_pkts': cast_integer(stats['opackets']), + 'total_pkt_bytes': cast_integer(stats['obytes']), + 'pkt_rate': cast_integer(stats['tx_pps']), + 'pkt_bit_rate': cast_integer(stats['tx_bps']) + }, + 'rx': { + 'total_pkts': cast_integer(stats['ipackets']), + 'total_pkt_bytes': cast_integer(stats['ibytes']), + 'pkt_rate': cast_integer(stats['rx_pps']), + 'pkt_bit_rate': cast_integer(stats['rx_bps']), + # how many pkts were dropped in RX direction + # need to take the tx counter on the far end port + 'dropped_pkts': cast_integer( + far_end_stats['opackets'] - stats['ipackets']) + } + } + self.__combine_latencies(in_stats, result[ph]['rx'], ph) + + total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] + + # in case of GARP packets we need to base total_tx_pkts value using flow_stats + # as no GARP packets have no flow stats and will not be received on the other port + if self.config.periodic_gratuitous_arp: + if not self.config.no_flow_stats and not self.config.no_latency_stats: + global_total_tx_pkts = total_tx_pkts + total_tx_pkts = 0 + if ifstats: + for chain_id, _ in enumerate(ifstats): + for ph in self.port_handle: + pg_id, lat_pg_id = self.get_pg_id(ph, chain_id) + flows_tx_pkts = in_stats['flow_stats'][pg_id]['tx_pkts']['total'] + \ + in_stats['flow_stats'][lat_pg_id]['tx_pkts']['total'] + result[ph]['tx']['total_pkts'] = flows_tx_pkts + total_tx_pkts += flows_tx_pkts + else: + for pg_id in in_stats['flow_stats']: + if pg_id != 'global': + total_tx_pkts += in_stats['flow_stats'][pg_id]['tx_pkts']['total'] + result["garp_total_tx_rate"] = cast_integer( + (global_total_tx_pkts - total_tx_pkts) / self.config.duration_sec) + else: + LOG.warning("Gratuitous ARP are not received by the other port so TRex and NFVbench" + " see these packets as dropped. Please do not activate no_flow_stats" + " and no_latency_stats properties to have a better drop rate.") + + result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec) + # actual offered tx rate in bps + avg_packet_size = utils.get_average_packet_size(self.l2_frame_size) + total_tx_bps = utils.pps_to_bps(result["total_tx_rate"], avg_packet_size) + result['offered_tx_rate_bps'] = total_tx_bps + + result.update(self.get_theoretical_rates(avg_packet_size)) + + result["flow_stats"] = in_stats["flow_stats"] + result["latency"] = in_stats["latency"] + + # Merge HDRHistogram to have an overall value for all chains and ports + # (provided that the histogram exists in the stats returned by T-Rex) + # Of course, empty histograms will produce an empty (invalid) histogram. + try: + hdrh_list = [] + if ifstats: + for chain_id, _ in enumerate(ifstats): + for ph in self.port_handle: + _, lat_pg_id = self.get_pg_id(ph, chain_id) + hdrh_list.append( + HdrHistogram.decode(in_stats['latency'][lat_pg_id]['latency']['hdrh'])) + else: + for pg_id in in_stats['latency']: + if pg_id != 'global': + hdrh_list.append( + HdrHistogram.decode(in_stats['latency'][pg_id]['latency']['hdrh'])) + + def add_hdrh(x, y): + x.add(y) + return x + decoded_hdrh = reduce(add_hdrh, hdrh_list) + result["overall_hdrh"] = HdrHistogram.encode(decoded_hdrh).decode('utf-8') + except KeyError: + pass + + return result + + def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx): + """Extract the aggregated stats for a given chain. + + trex_stats: stats as returned by get_stats() + if_stats: a list of 2 interface stats to update (port 0 and 1) + latencies: a list of 2 Latency instances to update for this chain (port 0 and 1) + latencies[p] is the latency for packets sent on port p + if there are no latency streams, the Latency instances are not modified + chain_idx: chain index of the interface stats + + The packet counts include normal and latency streams. + + Trex returns flows stats as follows: + + 'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, + 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}}, + 1: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, + 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}}, + 128: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068}, + 'tx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc... + + the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the + get_pg_id() method. + packet counters for a given stream sent on port p are reported as: + - tx_pkts[p] on port p + - rx_pkts[1-p] on the far end port + + This is a tricky/critical counter transposition operation because + the results are grouped by port (not by stream): + tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0] + rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0] + tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1] + rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1] + + or using a more generic formula: + tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p] + rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p] + + the second formula is equivalent to + rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p] + + If there are latency streams, those same counters need to be added in the same way + """ + def get_latency(lval): + try: + return int(round(lval)) + except ValueError: + return 0 + + for ifs in if_stats: + ifs.tx = ifs.rx = 0 + for port in range(2): + pg_id, lat_pg_id = self.get_pg_id(port, chain_idx) + for pid in [pg_id, lat_pg_id]: + try: + pg_stats = trex_stats['flow_stats'][pid] + if_stats[port].tx += pg_stats['tx_pkts'][port] + if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port] + except KeyError: + pass + try: + lat = trex_stats['latency'][lat_pg_id]['latency'] + # dropped_pkts += lat['err_cntrs']['dropped'] + latencies[port].max_usec = get_latency(lat['total_max']) + if math.isnan(lat['total_min']): + latencies[port].min_usec = 0 + latencies[port].avg_usec = 0 + else: + latencies[port].min_usec = get_latency(lat['total_min']) + latencies[port].avg_usec = get_latency(lat['average']) + # pick up the HDR histogram if present (otherwise will raise KeyError) + latencies[port].hdrh = lat['hdrh'] + except KeyError: + pass + + def __combine_latencies(self, in_stats, results, port_handle): + """Traverse TRex result dictionary and combines chosen latency stats. + + example of latency dict returned by trex (2 chains): + 'latency': {256: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 26.5, + 'hdrh': u'HISTFAAAAEx4nJNpmSgz...bFRgxi', + 'histogram': {20: 303, + 30: 320, + 40: 300, + 50: 73, + 60: 4, + 70: 1}, + 'jitter': 14, + 'last_max': 63, + 'total_max': 63, + 'total_min': 20}}, + 257: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 29.75, + 'hdrh': u'HISTFAAAAEV4nJN...CALilDG0=', + 'histogram': {20: 261, + 30: 431, + 40: 3, + 50: 80, + 60: 225}, + 'jitter': 23, + 'last_max': 67, + 'total_max': 67, + 'total_min': 20}}, + 384: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 18.0, + 'hdrh': u'HISTFAAAADR4nJNpm...MjCwDDxAZG', + 'histogram': {20: 987, 30: 14}, + 'jitter': 0, + 'last_max': 34, + 'total_max': 34, + 'total_min': 20}}, + 385: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 19.0, + 'hdrh': u'HISTFAAAADR4nJNpm...NkYmJgDdagfK', + 'histogram': {20: 989, 30: 11}, + 'jitter': 0, + 'last_max': 38, + 'total_max': 38, + 'total_min': 20}}, + 'global': {'bad_hdr': 0, 'old_flow': 0}}, + """ + total_max = 0 + average = 0 + total_min = float("inf") + for chain_id in range(self.chain_count): + try: + _, lat_pg_id = self.get_pg_id(port_handle, chain_id) + lat = in_stats['latency'][lat_pg_id]['latency'] + # dropped_pkts += lat['err_cntrs']['dropped'] + total_max = max(lat['total_max'], total_max) + total_min = min(lat['total_min'], total_min) + average += lat['average'] + except KeyError: + pass + if total_min == float("inf"): + total_min = 0 + results['min_delay_usec'] = total_min + results['max_delay_usec'] = total_max + results['avg_delay_usec'] = int(average / self.chain_count) + + def _bind_vxlan(self): + bind_layers(UDP, VXLAN, dport=4789) + bind_layers(VXLAN, Ether) + + def _create_pkt(self, stream_cfg, l2frame_size, disable_random_latency_flow=False): + """Create a packet of given size. + + l2frame_size: size of the L2 frame in bytes (including the 32-bit FCS) + """ + # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size + frame_size = int(l2frame_size) - 4 + vm_param = [] + if stream_cfg['vxlan'] is True: + self._bind_vxlan() + encap_level = '1' + pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac']) + if stream_cfg['vtep_vlan'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + pkt_base /= IP(src=stream_cfg['vtep_src_ip'], dst=stream_cfg['vtep_dst_ip']) + pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789) + pkt_base /= VXLAN(vni=stream_cfg['net_vni']) + pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + # need to randomize the outer header UDP src port based on flow + vxlan_udp_src_fv = STLVmFlowVar( + name="vxlan_udp_src", + min_value=1337, + max_value=32767, + size=2, + op="random") + vm_param = [vxlan_udp_src_fv, + STLVmWrFlowVar(fv_name="vxlan_udp_src", pkt_offset="UDP.sport")] + elif stream_cfg['mpls'] is True: + encap_level = '0' + pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac']) + if stream_cfg['vtep_vlan'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + if stream_cfg['mpls_outer_label'] is not None: + pkt_base /= MPLS(label=stream_cfg['mpls_outer_label'], cos=1, s=0, ttl=255) + if stream_cfg['mpls_inner_label'] is not None: + pkt_base /= MPLS(label=stream_cfg['mpls_inner_label'], cos=1, s=1, ttl=255) + # Flow stats and MPLS labels randomization TBD + pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + else: + encap_level = '0' + pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + + if stream_cfg['vlan_tag'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) + + udp_args = {} + if stream_cfg['udp_src_port']: + udp_args['sport'] = int(stream_cfg['udp_src_port']) + if stream_cfg['udp_port_step'] == 'random': + step = 1 + else: + step = stream_cfg['udp_port_step'] + udp_args['sport_step'] = int(step) + udp_args['sport_max'] = int(stream_cfg['udp_src_port_max']) + if stream_cfg['udp_dst_port']: + udp_args['dport'] = int(stream_cfg['udp_dst_port']) + if stream_cfg['udp_port_step'] == 'random': + step = 1 + else: + step = stream_cfg['udp_port_step'] + udp_args['dport_step'] = int(step) + udp_args['dport_max'] = int(stream_cfg['udp_dst_port_max']) + + pkt_base /= IP(src=stream_cfg['ip_src_addr'], dst=stream_cfg['ip_dst_addr']) / \ + UDP(dport=udp_args['dport'], sport=udp_args['sport']) + + # STLVmTupleGen need flow count >= cores used by TRex, if FC < cores we used STLVmFlowVar + if stream_cfg['ip_addrs_step'] == '0.0.0.1' and stream_cfg['udp_port_step'] == '1' and \ + stream_cfg['count'] >= self.generator_config.cores: + src_fv = STLVmTupleGen(ip_min=stream_cfg['ip_src_addr'], + ip_max=stream_cfg['ip_src_addr_max'], + port_min=udp_args['sport'], + port_max=udp_args['sport_max'], + name="tuple_src", + limit_flows=stream_cfg['count']) + dst_fv = STLVmTupleGen(ip_min=stream_cfg['ip_dst_addr'], + ip_max=stream_cfg['ip_dst_addr_max'], + port_min=udp_args['dport'], + port_max=udp_args['dport_max'], + name="tuple_dst", + limit_flows=stream_cfg['count']) + vm_param = [ + src_fv, + STLVmWrFlowVar(fv_name="tuple_src.ip", + pkt_offset="IP:{}.src".format(encap_level)), + STLVmWrFlowVar(fv_name="tuple_src.port", + pkt_offset="UDP:{}.sport".format(encap_level)), + dst_fv, + STLVmWrFlowVar(fv_name="tuple_dst.ip", + pkt_offset="IP:{}.dst".format(encap_level)), + STLVmWrFlowVar(fv_name="tuple_dst.port", + pkt_offset="UDP:{}.dport".format(encap_level)), + ] + else: + if disable_random_latency_flow: + src_fv_ip = STLVmFlowVar( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr'], + size=4) + dst_fv_ip = STLVmFlowVar( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr'], + size=4) + elif stream_cfg['ip_addrs_step'] == 'random': + src_fv_ip = STLVmFlowVarRepeatableRandom( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr_max'], + size=4, + seed=random.randint(0, 32767), + limit=stream_cfg['ip_src_count']) + dst_fv_ip = STLVmFlowVarRepeatableRandom( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr_max'], + size=4, + seed=random.randint(0, 32767), + limit=stream_cfg['ip_dst_count']) + else: + src_fv_ip = STLVmFlowVar( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr_max'], + size=4, + op="inc", + step=stream_cfg['ip_addrs_step']) + dst_fv_ip = STLVmFlowVar( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr_max'], + size=4, + op="inc", + step=stream_cfg['ip_addrs_step']) + + if disable_random_latency_flow: + src_fv_port = STLVmFlowVar( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport'], + size=2) + dst_fv_port = STLVmFlowVar( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport'], + size=2) + elif stream_cfg['udp_port_step'] == 'random': + src_fv_port = STLVmFlowVarRepeatableRandom( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport_max'], + size=2, + seed=random.randint(0, 32767), + limit=stream_cfg['udp_src_count']) + dst_fv_port = STLVmFlowVarRepeatableRandom( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport_max'], + size=2, + seed=random.randint(0, 32767), + limit=stream_cfg['udp_dst_count']) + else: + src_fv_port = STLVmFlowVar( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport_max'], + size=2, + op="inc", + step=udp_args['sport_step']) + dst_fv_port = STLVmFlowVar( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport_max'], + size=2, + op="inc", + step=udp_args['dport_step']) + vm_param = [ + src_fv_ip, + STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)), + src_fv_port, + STLVmWrFlowVar(fv_name="p_src", pkt_offset="UDP:{}.sport".format(encap_level)), + dst_fv_ip, + STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)), + dst_fv_port, + STLVmWrFlowVar(fv_name="p_dst", pkt_offset="UDP:{}.dport".format(encap_level)), + ] + # Use HW Offload to calculate the outter IP/UDP packet + vm_param.append(STLVmFixChecksumHw(l3_offset="IP:0", + l4_offset="UDP:0", + l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)) + # Use software to fix the inner IP/UDP payload for VxLAN packets + if int(encap_level): + vm_param.append(STLVmFixIpv4(offset="IP:1")) + pad = max(0, frame_size - len(pkt_base)) * 'x' + + return STLPktBuilder(pkt=pkt_base / pad, + vm=STLScVmRaw(vm_param, cache_size=int(self.config.cache_size))) + + def _create_gratuitous_arp_pkt(self, stream_cfg): + """Create a GARP packet. + + """ + pkt_base = Ether(src=stream_cfg['mac_src'], dst="ff:ff:ff:ff:ff:ff") + + if self.config.vxlan or self.config.mpls: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + elif stream_cfg['vlan_tag'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) + + pkt_base /= ARP(psrc=stream_cfg['ip_src_tg_gw'], hwsrc=stream_cfg['mac_src'], + hwdst=stream_cfg['mac_src'], pdst=stream_cfg['ip_src_tg_gw']) + + return STLPktBuilder(pkt=pkt_base) + + def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True, + e2e=False): + """Create a list of streams corresponding to a given chain and stream config. + + port: port where the streams originate (0 or 1) + chain_id: the chain to which the streams are associated to + stream_cfg: stream configuration + l2frame: L2 frame size (including 4-byte FCS) or 'IMIX' + latency: if True also create a latency stream + e2e: True if performing "end to end" connectivity check + """ + streams = [] + pg_id, lat_pg_id = self.get_pg_id(port, chain_id) + if l2frame == 'IMIX': + for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES): + pkt = self._create_pkt(stream_cfg, l2_frame_size) + if e2e or stream_cfg['mpls']: + streams.append(STLStream(packet=pkt, + mode=STLTXCont(pps=ratio))) + else: + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id, + vxlan=True) + if not self.config.no_flow_stats else None, + mode=STLTXCont(pps=ratio))) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id) + if not self.config.no_flow_stats else None, + mode=STLTXCont(pps=ratio))) + + if latency: + # for IMIX, the latency packets have the average IMIX packet size + if stream_cfg['ip_addrs_step'] == 'random' or \ + stream_cfg['udp_port_step'] == 'random': + # Force latency flow to only one flow to avoid creating flows + # over requested flow count + pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE, True) + else: + pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE) + + else: + l2frame_size = int(l2frame) + pkt = self._create_pkt(stream_cfg, l2frame_size) + if self.config.periodic_gratuitous_arp: + requested_pps = int(utils.parse_rate_str(self.rates[0])[ + 'rate_pps']) - self.config.gratuitous_arp_pps + if latency: + requested_pps -= self.LATENCY_PPS + stltx_cont = STLTXCont(pps=requested_pps) + else: + stltx_cont = STLTXCont() + if e2e or stream_cfg['mpls']: + streams.append(STLStream(packet=pkt, + # Flow stats is disabled for MPLS now + # flow_stats=STLFlowStats(pg_id=pg_id), + mode=stltx_cont)) + else: + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id, + vxlan=True) + if not self.config.no_flow_stats else None, + mode=stltx_cont)) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id) + if not self.config.no_flow_stats else None, + mode=stltx_cont)) + # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging + # without vlan, the min l2 frame size is 64 + # with vlan it is 68 + # This only applies to the latency stream + if latency: + if stream_cfg['vlan_tag'] and l2frame_size < 68: + l2frame_size = 68 + if stream_cfg['ip_addrs_step'] == 'random' or \ + stream_cfg['udp_port_step'] == 'random': + # Force latency flow to only one flow to avoid creating flows + # over requested flow count + pkt = self._create_pkt(stream_cfg, l2frame_size, True) + else: + pkt = self._create_pkt(stream_cfg, l2frame_size) + + if latency: + if self.config.no_latency_stats: + LOG.info("Latency flow statistics are disabled.") + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id, + vxlan=True) + if not self.config.no_latency_stats else None, + mode=STLTXCont(pps=self.LATENCY_PPS))) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id) + if not self.config.no_latency_stats else None, + mode=STLTXCont(pps=self.LATENCY_PPS))) + + if self.config.periodic_gratuitous_arp and ( + self.config.l3_router or self.config.service_chain == ChainType.EXT): + # In case of L3 router feature or EXT chain with router + # and depending on ARP stale time SUT configuration + # Gratuitous ARP from TG port to the router is needed to keep traffic up + garp_pkt = self._create_gratuitous_arp_pkt(stream_cfg) + ibg = self.config.gratuitous_arp_pps * 1000000.0 + packets_count = int(self.config.duration_sec / self.config.gratuitous_arp_pps) + streams.append( + STLStream(packet=garp_pkt, + mode=STLTXMultiBurst(pkts_per_burst=1, count=packets_count, ibg=ibg))) + return streams + + @timeout(5) + def __connect(self, client): + client.connect() + + def __local_server_status(self): + """ The TRex server may have started but failed initializing... and stopped. + This piece of code is especially designed to address + the case when a fatal failure occurs on a DPDK init call. + The TRex algorihm should be revised to include some missing timeouts (?) + status returned: + 0: no error detected + 1: fatal error detected - should lead to exiting the run + 2: error detected that could be solved by starting again + The diagnostic is based on parsing the local trex log file (improvable) + """ + status = 0 + message = None + failure = None + exited = None + cause = None + error = None + before = None + after = None + last = None + try: + with open('/tmp/trex.log', 'r', encoding="utf-8") as trex_log: + for _line in trex_log: + line = _line.strip() + if line.startswith('Usage:'): + break + if 'ports are bound' in line: + continue + if 'please wait' in line: + continue + if 'exit' in line.lower(): + exited = line + elif 'cause' in line.lower(): + cause = line + elif 'fail' in line.lower(): + failure = line + elif 'msg' in line.lower(): + message = line + elif (error is not None) and line: + after = line + elif line.startswith('Error:') or line.startswith('ERROR'): + error = line + before = last + last = line + except FileNotFoundError: + pass + if exited is not None: + status = 1 + LOG.info("\x1b[1m%s\x1b[0m %s", 'TRex failed initializing:', exited) + if cause is not None: + LOG.info("TRex [cont'd] %s", cause) + if failure is not None: + LOG.info("TRex [cont'd] %s", failure) + if message is not None: + LOG.info("TRex [cont'd] %s", message) + if 'not supported yet' in message.lower(): + LOG.info("TRex [cont'd] Try starting again!") + status = 2 + elif error is not None: + status = 1 + LOG.info("\x1b[1m%s\x1b[0m %s", 'TRex failed initializing:', error) + if after is not None: + LOG.info("TRex [cont'd] %s", after) + elif before is not None: + LOG.info("TRex [cont'd] %s", before) + return status + + def __connect_after_start(self): + # after start, Trex may take a bit of time to initialize + # so we need to retry a few times + # we try to capture recoverable error cases (checking status) + status = 0 + for it in range(self.config.generic_retry_count): + try: + time.sleep(1) + self.client.connect() + break + except Exception as ex: + if it == (self.config.generic_retry_count - 1): + raise + status = self.__local_server_status() + if status > 0: + # No need to wait anymore, something went wrong and TRex exited + if status == 1: + LOG.info("\x1b[1m%s\x1b[0m", 'TRex failed starting!') + print("More information? Try the command: " + + "\x1b[1mnfvbench --show-trex-log\x1b[0m") + sys.exit(0) + if status == 2: + # a new start will follow + return status + LOG.info("Retrying connection to TRex (%s)...", ex.msg) + return status + + def connect(self): + """Connect to the TRex server.""" + status = 0 + server_ip = self.generator_config.ip + LOG.info("Connecting to TRex (%s)...", server_ip) + + # Connect to TRex server + self.client = STLClient(server=server_ip, sync_port=self.generator_config.zmq_rpc_port, + async_port=self.generator_config.zmq_pub_port) + try: + self.__connect(self.client) + if server_ip == '127.0.0.1': + config_updated = self.__check_config() + if config_updated or self.config.restart: + status = self.__restart() + except (TimeoutError, STLError) as e: + if server_ip == '127.0.0.1': + status = self.__start_local_server() + else: + raise TrafficGeneratorException(e.message) from e + + if status == 2: + # Workaround in case of a failed TRex server initialization + # we try to start it again (twice maximum) + # which may allow low level initialization to complete. + if self.__start_local_server() == 2: + self.__start_local_server() + + ports = list(self.generator_config.ports) + self.port_handle = ports + # Prepare the ports + self.client.reset(ports) + # Read HW information from each port + # this returns an array of dict (1 per port) + """ + Example of output for Intel XL710 + [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True, + 'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes', + u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']}, + u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE', + u'description': u'Ethernet Controller XL710 for 40GbE QSFP+', + 'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-', + u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match', + 'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0', + 'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off', + 'layer_mode': 'Ethernet', u'numa': 0}, ...] + """ + self.port_info = self.client.get_port_info(ports) + LOG.info('Connected to TRex') + for id, port in enumerate(self.port_info): + LOG.info(' Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s', + id, port['description'], port['speed'], port['src_mac'], + port['pci_addr'], port['driver']) + # Make sure the 2 ports have the same speed + if self.port_info[0]['speed'] != self.port_info[1]['speed']: + raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' % + (self.port_info[0]['speed'], + self.port_info[1]['speed'])) + + def __start_local_server(self): + try: + LOG.info("Starting TRex ...") + self.__start_server() + status = self.__connect_after_start() + except (TimeoutError, STLError) as e: + LOG.error('Cannot connect to TRex') + LOG.error(traceback.format_exc()) + logpath = '/tmp/trex.log' + if os.path.isfile(logpath): + # Wait for TRex to finish writing error message + last_size = 0 + for _ in range(self.config.generic_retry_count): + size = os.path.getsize(logpath) + if size == last_size: + # probably not writing anymore + break + last_size = size + time.sleep(1) + with open(logpath, 'r', encoding="utf-8") as f: + message = f.read() + else: + message = e.message + raise TrafficGeneratorException(message) from e + return status + + def __start_server(self): + server = TRexTrafficServer() + server.run_server(self.generator_config) + + def __check_config(self): + server = TRexTrafficServer() + return server.check_config_updated(self.generator_config) + + def __restart(self): + LOG.info("Restarting TRex ...") + self.__stop_server() + # Wait for server stopped + for _ in range(self.config.generic_retry_count): + time.sleep(1) + if not self.client.is_connected(): + LOG.info("TRex is stopped...") + break + # Start and report a possible failure + return self.__start_local_server() + + def __stop_server(self): + if self.generator_config.ip == '127.0.0.1': + ports = self.client.get_acquired_ports() + LOG.info('Release ports %s and stopping TRex...', ports) + try: + if ports: + self.client.release(ports=ports) + self.client.server_shutdown() + except STLError as e: + LOG.warning('Unable to stop TRex. Error: %s', e) + else: + LOG.info('Using remote TRex. Unable to stop TRex') + + def resolve_arp(self): + """Resolve all configured remote IP addresses. + + return: None if ARP failed to resolve for all IP addresses + else a dict of list of dest macs indexed by port# + the dest macs in the list are indexed by the chain id + """ + self.client.set_service_mode(ports=self.port_handle) + LOG.info('Polling ARP until successful...') + arp_dest_macs = {} + for port, device in zip(self.port_handle, self.generator_config.devices): + # there should be 1 stream config per chain + stream_configs = device.get_stream_configs() + chain_count = len(stream_configs) + ctx = self.client.create_service_ctx(port=port) + # all dest macs on this port indexed by chain ID + dst_macs = [None] * chain_count + dst_macs_count = 0 + # the index in the list is the chain id + if self.config.vxlan or self.config.mpls: + arps = [ + ServiceARP(ctx, + src_ip=device.vtep_src_ip, + dst_ip=device.vtep_dst_ip, + vlan=device.vtep_vlan) + for cfg in stream_configs + ] + else: + arps = [ + ServiceARP(ctx, + src_ip=cfg['ip_src_tg_gw'], + dst_ip=cfg['mac_discovery_gw'], + # will be None if no vlan tagging + vlan=cfg['vlan_tag']) + for cfg in stream_configs + ] + + for attempt in range(self.config.generic_retry_count): + try: + ctx.run(arps) + except STLError: + LOG.error(traceback.format_exc()) + continue + + unresolved = [] + for chain_id, mac in enumerate(dst_macs): + if not mac: + arp_record = arps[chain_id].get_record() + if arp_record.dst_mac: + dst_macs[chain_id] = arp_record.dst_mac + dst_macs_count += 1 + LOG.info(' ARP: port=%d chain=%d src IP=%s dst IP=%s -> MAC=%s', + port, chain_id, + arp_record.src_ip, + arp_record.dst_ip, arp_record.dst_mac) + else: + unresolved.append(arp_record.dst_ip) + if dst_macs_count == chain_count: + arp_dest_macs[port] = dst_macs + LOG.info('ARP resolved successfully for port %s', port) + break + + retry = attempt + 1 + LOG.info('Retrying ARP for: %s (retry %d/%d)', + unresolved, retry, self.config.generic_retry_count) + if retry < self.config.generic_retry_count: + time.sleep(self.config.generic_poll_sec) + else: + LOG.error('ARP timed out for port %s (resolved %d out of %d)', + port, + dst_macs_count, + chain_count) + break + + # A traffic capture may have been started (from a T-Rex console) at this time. + # If asked so, we keep the service mode enabled here, and disable it otherwise. + # | Disabling the service mode while a capture is in progress + # | would cause the application to stop/crash with an error. + if not self.config.service_mode: + self.client.set_service_mode(ports=self.port_handle, enabled=False) + if len(arp_dest_macs) == len(self.port_handle): + return arp_dest_macs + return None + + def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency): + """Check if rate provided by user is above requirements. Applies only if latency is True.""" + intf_speed = self.generator_config.intf_speed + if latency: + if bidirectional: + mult = 2 + total_rate = 0 + for rate in rates: + r = utils.convert_rates(l2frame_size, rate, intf_speed) + total_rate += int(r['rate_pps']) + else: + mult = 1 + r = utils.convert_rates(l2frame_size, rates[0], intf_speed) + total_rate = int(r['rate_pps']) + # rate must be enough for latency stream and at least 1 pps for base stream per chain + if self.config.periodic_gratuitous_arp: + required_rate = (self.LATENCY_PPS + 1 + self.config.gratuitous_arp_pps) \ + * self.config.service_chain_count * mult + else: + required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult + result = utils.convert_rates(l2frame_size, + {'rate_pps': required_rate}, + intf_speed * mult) + result['result'] = total_rate >= required_rate + return result + + return {'result': True} + + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): + """Program all the streams in Trex server. + + l2frame_size: L2 frame size or IMIX + rates: a list of 2 rates to run each direction + each rate is a dict like {'rate_pps': '10kpps'} + bidirectional: True if bidirectional + latency: True if latency measurement is needed + e2e: True if performing "end to end" connectivity check + """ + if self.config.no_flow_stats: + LOG.info("Traffic flow statistics are disabled.") + r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) + if not r['result']: + raise TrafficGeneratorException( + 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.' + .format(pps=r['rate_pps'], + bps=r['rate_bps'], + load=r['rate_percent'])) + self.l2_frame_size = l2frame_size + # a dict of list of streams indexed by port# + # in case of fixed size, has self.chain_count * 2 * 2 streams + # (1 normal + 1 latency stream per direction per chain) + # for IMIX, has self.chain_count * 2 * 4 streams + # (3 normal + 1 latency stream per direction per chain) + streamblock = {} + for port in self.port_handle: + streamblock[port] = [] + stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices] + if self.generator_config.ip_addrs_step == 'random' \ + or self.generator_config.gen_config.udp_port_step == 'random': + LOG.warning("Using random step, the number of flows can be less than " + "the requested number of flows due to repeatable multivariate random " + "generation which can reproduce the same pattern of values") + self.rates = [utils.to_rate_str(rate) for rate in rates] + for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): + streamblock[0].extend(self.generate_streams(self.port_handle[0], + chain_id, + fwd_stream_cfg, + l2frame_size, + latency=latency, + e2e=e2e)) + if len(self.rates) > 1: + streamblock[1].extend(self.generate_streams(self.port_handle[1], + chain_id, + rev_stream_cfg, + l2frame_size, + latency=bidirectional and latency, + e2e=e2e)) + + for port in self.port_handle: + if self.config.vxlan: + self.client.set_port_attr(ports=port, vxlan_fs=[4789]) + else: + self.client.set_port_attr(ports=port, vxlan_fs=None) + self.client.add_streams(streamblock[port], ports=port) + LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port) + + def clear_streamblock(self): + """Clear all streams from TRex.""" + self.rates = [] + self.client.reset(self.port_handle) + LOG.info('Cleared all existing streams') + + def get_stats(self, ifstats=None): + """Get stats from Trex.""" + stats = self.client.get_stats() + return self.extract_stats(stats, ifstats) + + def get_macs(self): + """Return the Trex local port MAC addresses. + + return: a list of MAC addresses indexed by the port# + """ + return [port['src_mac'] for port in self.port_info] + + def get_port_speed_gbps(self): + """Return the Trex local port MAC addresses. + + return: a list of speed in Gbps indexed by the port# + """ + return [port['speed'] for port in self.port_info] + + def clear_stats(self): + """Clear all stats in the traffic gneerator.""" + if self.port_handle: + self.client.clear_stats() + + def start_traffic(self): + """Start generating traffic in all ports.""" + for port, rate in zip(self.port_handle, self.rates): + self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True) + + def stop_traffic(self): + """Stop generating traffic.""" + self.client.stop(ports=self.port_handle) + + def start_capture(self): + """Capture all packets on both ports that are unicast to us.""" + if self.capture_id: + self.stop_capture() + # Need to filter out unwanted packets so we do not end up counting + # src MACs of frames that are not unicast to us + src_mac_list = self.get_macs() + bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1]) + # ports must be set in service in order to enable capture + self.client.set_service_mode(ports=self.port_handle) + self.capture_id = self.client.start_capture \ + (rx_ports=self.port_handle, bpf_filter=bpf_filter) + + def fetch_capture_packets(self): + """Fetch capture packets in capture mode.""" + if self.capture_id: + self.packet_list = [] + self.client.fetch_capture_packets(capture_id=self.capture_id['id'], + output=self.packet_list) + + def stop_capture(self): + """Stop capturing packets.""" + if self.capture_id: + self.client.stop_capture(capture_id=self.capture_id['id']) + self.capture_id = None + # A traffic capture may have been started (from a T-Rex console) at this time. + # If asked so, we keep the service mode enabled here, and disable it otherwise. + # | Disabling the service mode while a capture is in progress + # | would cause the application to stop/crash with an error. + if not self.config.service_mode: + self.client.set_service_mode(ports=self.port_handle, enabled=False) + + def cleanup(self): + """Cleanup Trex driver.""" + if self.client: + try: + self.client.reset(self.port_handle) + self.client.disconnect() + except STLError: + # TRex does not like a reset while in disconnected state + pass + + def set_service_mode(self, enabled=True): + """Enable/disable the 'service' mode.""" + self.client.set_service_mode(ports=self.port_handle, enabled=enabled) |