summaryrefslogtreecommitdiffstats
path: root/nfvbench/traffic_gen/trex.py
diff options
context:
space:
mode:
Diffstat (limited to 'nfvbench/traffic_gen/trex.py')
-rw-r--r--nfvbench/traffic_gen/trex.py456
1 files changed, 456 insertions, 0 deletions
diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py
new file mode 100644
index 0000000..6c2a304
--- /dev/null
+++ b/nfvbench/traffic_gen/trex.py
@@ -0,0 +1,456 @@
+# Copyright 2016 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from collections import defaultdict
+from itertools import count
+from nfvbench.log import LOG
+from nfvbench.specs import ChainType
+from nfvbench.traffic_server import TRexTrafficServer
+from nfvbench.utils import timeout
+from nfvbench.utils import TimeoutError
+import os
+import random
+import time
+import traceback
+from traffic_base import AbstractTrafficGenerator
+from traffic_base import TrafficGeneratorException
+import traffic_utils as utils
+
+
+from trex_stl_lib.api import CTRexVmInsFixHwCs
+from trex_stl_lib.api import Dot1Q
+from trex_stl_lib.api import Ether
+from trex_stl_lib.api import IP
+from trex_stl_lib.api import STLClient
+from trex_stl_lib.api import STLError
+from trex_stl_lib.api import STLFlowLatencyStats
+from trex_stl_lib.api import STLFlowStats
+from trex_stl_lib.api import STLPktBuilder
+from trex_stl_lib.api import STLScVmRaw
+from trex_stl_lib.api import STLStream
+from trex_stl_lib.api import STLTXCont
+from trex_stl_lib.api import STLVmFixChecksumHw
+from trex_stl_lib.api import STLVmFlowVar
+from trex_stl_lib.api import STLVmFlowVarRepetableRandom
+from trex_stl_lib.api import STLVmWrFlowVar
+from trex_stl_lib.api import UDP
+from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
+
+
+class TRex(AbstractTrafficGenerator):
+
+ LATENCY_PPS = 1000
+
+ def __init__(self, runner):
+ AbstractTrafficGenerator.__init__(self, runner)
+ self.client = None
+ self.id = count()
+ self.latencies = defaultdict(list)
+ self.stream_ids = defaultdict(list)
+ self.port_handle = []
+ self.streamblock = defaultdict(list)
+ self.rates = []
+ self.arps = {}
+
+ def get_version(self):
+ return self.client.get_server_version()
+
+ def extract_stats(self, in_stats):
+ utils.nan_replace(in_stats)
+ LOG.debug(in_stats)
+
+ result = {}
+ for ph in self.port_handle:
+ stats = self.__combine_stats(in_stats, ph)
+ result[ph] = {
+ 'tx': {
+ 'total_pkts': stats['tx_pkts']['total'],
+ 'total_pkt_bytes': stats['tx_bytes']['total'],
+ 'pkt_rate': stats['tx_pps']['total'],
+ 'pkt_bit_rate': stats['tx_bps']['total']
+ },
+ 'rx': {
+ 'total_pkts': stats['rx_pkts']['total'],
+ 'total_pkt_bytes': stats['rx_bytes']['total'],
+ 'pkt_rate': stats['rx_pps']['total'],
+ 'pkt_bit_rate': stats['rx_bps']['total'],
+ 'dropped_pkts': stats['tx_pkts']['total'] - stats['rx_pkts']['total']
+ }
+ }
+
+ lat = self.__combine_latencies(in_stats, ph)
+ result[ph]['rx']['max_delay_usec'] = lat.get('total_max', float('nan'))
+ result[ph]['rx']['min_delay_usec'] = lat.get('total_min', float('nan'))
+ result[ph]['rx']['avg_delay_usec'] = lat.get('average', float('nan'))
+
+ total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
+ result["total_tx_rate"] = total_tx_pkts / self.config.duration_sec
+ return result
+
+ def __combine_stats(self, in_stats, port_handle):
+ """Traverses TRex result dictionary and combines stream stats. Used for combining latency
+ and regular streams together.
+ """
+ result = defaultdict(lambda: defaultdict(float))
+
+ for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
+ record = in_stats['flow_stats'][pg_id]
+ for stat_type, stat_type_values in record.iteritems():
+ for ph, value in stat_type_values.iteritems():
+ result[stat_type][ph] += value
+
+ return result
+
+ def __combine_latencies(self, in_stats, port_handle):
+ """Traverses TRex result dictionary and combines chosen latency stats."""
+ if not len(self.latencies[port_handle]):
+ return {}
+
+ result = defaultdict(float)
+ result['total_min'] = float("inf")
+ for lat_id in self.latencies[port_handle]:
+ lat = in_stats['latency'][lat_id]
+ result['dropped_pkts'] += lat['err_cntrs']['dropped']
+ result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
+ result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
+ result['average'] += lat['latency']['average']
+
+ result['average'] /= len(self.latencies[port_handle])
+
+ return result
+
+ def create_pkt(self, stream_cfg, l2frame_size):
+ # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
+ payload = 'x' * (max(64, int(l2frame_size)) - 46)
+
+ pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
+
+ if stream_cfg['vlan_tag'] is not None:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
+
+ pkt_base /= IP() / UDP()
+
+ if stream_cfg['ip_addrs_step'] == 'random':
+ src_fv = STLVmFlowVarRepetableRandom(
+ name="ip_src",
+ min_value=stream_cfg['ip_src_addr'],
+ max_value=stream_cfg['ip_src_addr_max'],
+ size=4,
+ seed=random.randint(0, 32767),
+ limit=stream_cfg['ip_src_count'])
+ dst_fv = STLVmFlowVarRepetableRandom(
+ name="ip_dst",
+ min_value=stream_cfg['ip_dst_addr'],
+ max_value=stream_cfg['ip_dst_addr_max'],
+ size=4,
+ seed=random.randint(0, 32767),
+ limit=stream_cfg['ip_dst_count'])
+ else:
+ src_fv = STLVmFlowVar(
+ name="ip_src",
+ min_value=stream_cfg['ip_src_addr'],
+ max_value=stream_cfg['ip_src_addr'],
+ size=4,
+ op="inc",
+ step=stream_cfg['ip_addrs_step'])
+ dst_fv = STLVmFlowVar(
+ name="ip_dst",
+ min_value=stream_cfg['ip_dst_addr'],
+ max_value=stream_cfg['ip_dst_addr_max'],
+ size=4,
+ op="inc",
+ step=stream_cfg['ip_addrs_step'])
+
+ vm_param = [
+ src_fv,
+ STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
+ dst_fv,
+ STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
+ STLVmFixChecksumHw(l3_offset="IP",
+ l4_offset="UDP",
+ l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
+ ]
+
+ return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
+
+ def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
+ idx_lat = None
+ streams = []
+ if l2frame == 'IMIX':
+ for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
+ pkt = self.create_pkt(stream_cfg, l2_frame_size)
+ streams.append(STLStream(packet=pkt,
+ isg=0.1 * t,
+ flow_stats=STLFlowStats(
+ pg_id=self.stream_ids[port_handle]),
+ mode=STLTXCont(pps=ratio)))
+
+ if latency:
+ idx_lat = self.id.next()
+ sl = STLStream(packet=pkt,
+ isg=isg,
+ flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
+ mode=STLTXCont(pps=self.LATENCY_PPS))
+ streams.append(sl)
+ else:
+ pkt = self.create_pkt(stream_cfg, l2frame)
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
+ mode=STLTXCont()))
+
+ if latency:
+ idx_lat = self.id.next()
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
+ mode=STLTXCont(pps=self.LATENCY_PPS)))
+
+ if latency:
+ self.latencies[port_handle].append(idx_lat)
+
+ return streams
+
+ def init(self):
+ pass
+
+ @timeout(5)
+ def __connect(self, client):
+ client.connect()
+
+ def __connect_after_start(self):
+ # after start, Trex may take a bit of time to initialize
+ # so we need to retry a few times
+ for it in xrange(self.config.generic_retry_count):
+ try:
+ time.sleep(1)
+ self.client.connect()
+ break
+ except Exception as ex:
+ if it == (self.config.generic_retry_count - 1):
+ raise ex
+ LOG.info("Retrying connection to TRex (%s)...", ex.message)
+
+ def connect(self):
+ LOG.info("Connecting to TRex...")
+ server_ip = self.config.generator_config.ip
+
+ # Connect to TRex server
+ self.client = STLClient(server=server_ip)
+ try:
+ self.__connect(self.client)
+ except (TimeoutError, STLError) as e:
+ if server_ip == '127.0.0.1':
+ try:
+ self.__start_server()
+ self.__connect_after_start()
+ except (TimeoutError, STLError) as e:
+ LOG.error('Cannot connect to TRex')
+ LOG.error(traceback.format_exc())
+ logpath = '/tmp/trex.log'
+ if os.path.isfile(logpath):
+ # Wait for TRex to finish writing error message
+ last_size = 0
+ for it in xrange(self.config.generic_retry_count):
+ size = os.path.getsize(logpath)
+ if size == last_size:
+ # probably not writing anymore
+ break
+ last_size = size
+ time.sleep(1)
+ with open(logpath, 'r') as f:
+ message = f.read()
+ else:
+ message = e.message
+ raise TrafficGeneratorException(message)
+ else:
+ raise TrafficGeneratorException(e.message)
+
+ ports = list(self.config.generator_config.ports)
+ self.port_handle = ports
+ # Prepare the ports
+ self.client.reset(ports)
+
+ def set_mode(self):
+ if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
+ self.__set_l3_mode()
+ else:
+ self.__set_l2_mode()
+
+ def __set_l3_mode(self):
+ self.client.set_service_mode(ports=self.port_handle, enabled=True)
+ for port, device in zip(self.port_handle, self.config.generator_config.devices):
+ try:
+ self.client.set_l3_mode(port=port,
+ src_ipv4=device.tg_gateway_ip,
+ dst_ipv4=device.dst.gateway_ip,
+ vlan=device.vlan_tag if device.vlan_tagging else None)
+ except STLError:
+ # TRex tries to resolve ARP already, doesn't have to be successful yet
+ continue
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
+
+ def __set_l2_mode(self):
+ self.client.set_service_mode(ports=self.port_handle, enabled=True)
+ for port, device in zip(self.port_handle, self.config.generator_config.devices):
+ for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
+ self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
+
+ def __start_server(self):
+ server = TRexTrafficServer()
+ server.run_server(self.config.generator_config)
+
+ def resolve_arp(self):
+ self.client.set_service_mode(ports=self.port_handle)
+ LOG.info('Polling ARP until successful')
+ resolved = 0
+ attempt = 0
+ for port, device in zip(self.port_handle, self.config.generator_config.devices):
+ ctx = self.client.create_service_ctx(port=port)
+
+ arps = [
+ STLServiceARP(ctx,
+ src_ip=cfg['ip_src_tg_gw'],
+ dst_ip=cfg['mac_discovery_gw'],
+ vlan=device.vlan_tag if device.vlan_tagging else None)
+ for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
+ ]
+
+ for _ in xrange(self.config.generic_retry_count):
+ attempt += 1
+ try:
+ ctx.run(arps)
+ except STLError:
+ LOG.error(traceback.format_exc())
+ continue
+
+ self.arps[port] = [arp.get_record().dst_mac for arp in arps
+ if arp.get_record().dst_mac is not None]
+
+ if len(self.arps[port]) == self.config.service_chain_count:
+ resolved += 1
+ LOG.info('ARP resolved successfully for port {}'.format(port))
+ break
+ else:
+ failed = [arp.get_record().dst_ip for arp in arps
+ if arp.get_record().dst_mac is None]
+ LOG.info('Retrying ARP for: {} ({} / {})'.format(
+ failed, attempt, self.config.generic_retry_count))
+ time.sleep(self.config.generic_poll_sec)
+
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
+ return resolved == len(self.port_handle)
+
+ def config_interface(self):
+ pass
+
+ def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
+ """Check if rate provided by user is above requirements. Applies only if latency is True."""
+ intf_speed = self.config.generator_config.intf_speed
+ if latency:
+ if bidirectional:
+ mult = 2
+ total_rate = 0
+ for rate in rates:
+ r = utils.convert_rates(l2frame_size, rate, intf_speed)
+ total_rate += int(r['rate_pps'])
+ else:
+ mult = 1
+ total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
+ # rate must be enough for latency stream and at least 1 pps for base stream per chain
+ required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
+ result = utils.convert_rates(l2frame_size,
+ {'rate_pps': required_rate},
+ intf_speed * mult)
+ result['result'] = total_rate >= required_rate
+ return result
+
+ return {'result': True}
+
+ def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
+ r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
+ if not r['result']:
+ raise TrafficGeneratorException(
+ 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
+ .format(pps=r['rate_pps'],
+ bps=r['rate_bps'],
+ load=r['rate_percent']))
+
+ stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
+ for d in self.config.generator_config.devices]
+ self.rates = map(lambda rate: utils.to_rate_str(rate), rates)
+
+ for ph in self.port_handle:
+ # generate one pg_id for each direction
+ self.stream_ids[ph] = self.id.next()
+
+ for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
+ if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
+ fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
+ rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
+
+ self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
+ fwd_stream_cfg,
+ l2frame_size,
+ latency=latency))
+ if len(self.rates) > 1:
+ self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
+ rev_stream_cfg,
+ l2frame_size,
+ isg=10.0,
+ latency=bidirectional and latency))
+
+ for ph in self.port_handle:
+ self.client.add_streams(self.streamblock[ph], ports=ph)
+ LOG.info('Created traffic stream for port %s.' % ph)
+
+ def modify_rate(self, rate, reverse):
+ port_index = int(reverse)
+ port = self.port_handle[port_index]
+ self.rates[port_index] = utils.to_rate_str(rate)
+ LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate)))
+
+ def clear_streamblock(self):
+ self.streamblock = defaultdict(list)
+ self.latencies = defaultdict(list)
+ self.stream_ids = defaultdict(list)
+ self.rates = []
+ self.client.reset(self.port_handle)
+ LOG.info('Cleared all existing streams.')
+
+ def get_stats(self):
+ stats = self.client.get_pgid_stats()
+ return self.extract_stats(stats)
+
+ def get_macs(self):
+ return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
+
+ def clear_stats(self):
+ if self.port_handle:
+ self.client.clear_stats()
+
+ def start_traffic(self):
+ for port, rate in zip(self.port_handle, self.rates):
+ self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
+
+ def stop_traffic(self):
+ self.client.stop(ports=self.port_handle)
+
+ def cleanup(self):
+ if self.client:
+ try:
+ self.client.reset(self.port_handle)
+ self.client.disconnect()
+ except STLError:
+ # TRex does not like a reset while in disconnected state
+ pass