aboutsummaryrefslogtreecommitdiffstats
path: root/nfvbench/traffic_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'nfvbench/traffic_client.py')
-rwxr-xr-x[-rw-r--r--]nfvbench/traffic_client.py1301
1 files changed, 975 insertions, 326 deletions
diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py
index 8959cab..47af265 100644..100755
--- a/nfvbench/traffic_client.py
+++ b/nfvbench/traffic_client.py
@@ -12,62 +12,88 @@
# License for the specific language governing permissions and limitations
# under the License.
-from datetime import datetime
+"""Interface to the traffic generator clients including NDR/PDR binary search."""
import socket
import struct
import time
+import sys
from attrdict import AttrDict
import bitmath
+from hdrh.histogram import HdrHistogram
from netaddr import IPNetwork
# pylint: disable=import-error
-from trex_stl_lib.api import STLError
+from trex.stl.api import Ether
+from trex.stl.api import STLError
+from trex.stl.api import UDP
+# pylint: disable=wrong-import-order
+from scapy.contrib.mpls import MPLS # flake8: noqa
+# pylint: enable=wrong-import-order
# pylint: enable=import-error
-from log import LOG
-from network import Interface
-from specs import ChainType
-from stats_collector import IntervalCollector
-from stats_collector import IterationCollector
-import traffic_gen.traffic_utils as utils
-from utils import cast_integer
-
+from .log import LOG
+from .packet_stats import InterfaceStats
+from .packet_stats import PacketPathStats
+from .stats_collector import IntervalCollector
+from .stats_collector import IterationCollector
+from .traffic_gen import traffic_utils as utils
+from .utils import cast_integer, find_max_size, find_tuples_equal_to_lcm_value, get_divisors, lcm
class TrafficClientException(Exception):
- pass
-
+ """Generic traffic client exception."""
class TrafficRunner(object):
- def __init__(self, client, duration_sec, interval_sec=0):
+ """Serialize various steps required to run traffic."""
+
+ def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
+ """Create a traffic runner."""
self.client = client
self.start_time = None
self.duration_sec = duration_sec
self.interval_sec = interval_sec
+ self.service_mode = service_mode
def run(self):
+ """Clear stats and instruct the traffic generator to start generating traffic."""
+ if self.is_running():
+ return None
LOG.info('Running traffic generator')
self.client.gen.clear_stats()
+ # Debug use only: the service_mode flag may have been set in
+ # the configuration, in order to enable the 'service' mode
+ # in the trex generator, before starting the traffic (run).
+ # From this point, a T-rex console (launched in readonly mode) would
+ # then be able to capture the transmitted and/or received traffic.
+ self.client.gen.set_service_mode(enabled=self.service_mode)
+ LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
self.client.gen.start_traffic()
self.start_time = time.time()
return self.poll_stats()
def stop(self):
+ """Stop the current run and instruct the traffic generator to stop traffic."""
if self.is_running():
self.start_time = None
self.client.gen.stop_traffic()
def is_running(self):
+ """Check if a run is still pending."""
return self.start_time is not None
def time_elapsed(self):
+ """Return time elapsed since start of run."""
if self.is_running():
return time.time() - self.start_time
return self.duration_sec
def poll_stats(self):
+ """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
+
+ return: latest stats or None if traffic is stopped
+ """
if not self.is_running():
return None
- if self.client.skip_sleep:
+ if self.client.skip_sleep():
self.stop()
return self.client.get_stats()
time_elapsed = self.time_elapsed()
@@ -88,22 +114,32 @@ class TrafficRunner(object):
class IpBlock(object):
+ """Manage a block of IP addresses."""
+
def __init__(self, base_ip, step_ip, count_ip):
+ """Create an IP block."""
self.base_ip_int = Device.ip_to_int(base_ip)
+ if step_ip == 'random':
+ step_ip = '0.0.0.1'
self.step = Device.ip_to_int(step_ip)
self.max_available = count_ip
self.next_free = 0
def get_ip(self, index=0):
- '''Return the IP address at given index
- '''
+ """Return the IP address at given index."""
if index < 0 or index >= self.max_available:
- raise IndexError('Index out of bounds')
+ raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
return Device.int_to_ip(self.base_ip_int + index * self.step)
+ def get_ip_from_chain_first_ip(self, first_ip, index=0):
+ """Return the IP address at given index starting from chain first ip."""
+ if index < 0 or index >= self.max_available:
+ raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
+ return Device.int_to_ip(first_ip + index * self.step)
+
def reserve_ip_range(self, count):
- '''Reserve a range of count consecutive IP addresses spaced by step
- '''
+ """Reserve a range of count consecutive IP addresses spaced by step.
+ """
if self.next_free + count > self.max_available:
raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
(self.next_free,
@@ -115,299 +151,652 @@ class IpBlock(object):
return (first_ip, last_ip)
def reset_reservation(self):
+ """Reset all reservations and restart with a completely unused IP block."""
self.next_free = 0
+class UdpPorts(object):
+
+ def __init__(self, src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, step):
+
+ self.src_min = int(src_min)
+ self.src_max = int(src_max)
+ self.dst_min = int(dst_min)
+ self.dst_max = int(dst_max)
+ self.udp_src_size = udp_src_size
+ self.udp_dst_size = udp_dst_size
+ self.step = step
+
+ def get_src_max(self, index=0):
+ """Return the UDP src port at given index."""
+ return int(self.src_min) + index * int(self.step)
+
+ def get_dst_max(self, index=0):
+ """Return the UDP dst port at given index."""
+ return int(self.dst_min) + index * int(self.step)
+
+
class Device(object):
- def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
- gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
- gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None,
- chain_count=1, flow_count=1, vlan_tagging=False):
- self.chain_count = chain_count
- self.flow_count = flow_count
- self.dst = None
+ """Represent a port device and all information associated to it.
+
+ In the curent version we only support 2 port devices for the traffic generator
+ identified as port 0 or port 1.
+ """
+
+ def __init__(self, port, generator_config):
+ """Create a new device for a given port."""
+ self.generator_config = generator_config
+ self.chain_count = generator_config.service_chain_count
+ if generator_config.bidirectional:
+ self.flow_count = generator_config.flow_count / 2
+ else:
+ self.flow_count = generator_config.flow_count
+
self.port = port
- self.switch_port = switch_port
- self.vtep_vlan = vtep_vlan
- self.vlan_tag = None
- self.vlan_tagging = vlan_tagging
- self.pci = pci
+ self.switch_port = generator_config.interfaces[port].get('switch_port', None)
+ self.vtep_vlan = None
+ self.vtep_src_mac = None
+ self.vxlan = False
+ self.mpls = False
+ self.inner_labels = None
+ self.outer_labels = None
+ self.pci = generator_config.interfaces[port].pci
self.mac = None
- self.vm_mac_list = None
- subnet = IPNetwork(ip)
- self.ip = subnet.ip.format()
- self.ip_prefixlen = subnet.prefixlen
- self.ip_addrs_step = ip_addrs_step
- self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
- self.gateway_ip_addrs_step = gateway_ip_addrs_step
- self.gateway_ip = gateway_ip
- self.tg_gateway_ip = tg_gateway_ip
- self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count)
- self.gw_ip_block = IpBlock(gateway_ip,
- gateway_ip_addrs_step,
- chain_count)
- self.tg_gw_ip_block = IpBlock(tg_gateway_ip,
- tg_gateway_ip_addrs_step,
- chain_count)
- self.udp_src_port = udp_src_port
- self.udp_dst_port = udp_dst_port
+ self.dest_macs = None
+ self.vtep_dst_mac = None
+ self.vtep_dst_ip = None
+ if generator_config.vteps is None:
+ self.vtep_src_ip = None
+ else:
+ self.vtep_src_ip = generator_config.vteps[port]
+ self.vnis = None
+ self.vlans = None
+ self.ip_addrs = generator_config.ip_addrs[port]
+ self.ip_src_static = generator_config.ip_src_static
+ self.ip_addrs_step = generator_config.ip_addrs_step
+ if self.ip_addrs_step == 'random':
+ # Set step to 1 to calculate the IP range size (see check_range_size below)
+ step = '0.0.0.1'
+ else:
+ step = self.ip_addrs_step
+ self.ip_size = self.check_range_size(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
+ self.ip = str(IPNetwork(self.ip_addrs).network)
+ ip_addrs_left = generator_config.ip_addrs[0]
+ ip_addrs_right = generator_config.ip_addrs[1]
+ self.ip_addrs_size = {
+ 'left': self.check_range_size(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
+ 'right': self.check_range_size(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
+ udp_src_port = generator_config.gen_config.udp_src_port
+ if udp_src_port is None:
+ udp_src_port = 53
+ udp_dst_port = generator_config.gen_config.udp_dst_port
+ if udp_dst_port is None:
+ udp_dst_port = 53
+ src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
+ dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
+ if generator_config.gen_config.udp_port_step == 'random':
+ # Set step to 1 to calculate the UDP range size
+ udp_step = 1
+ else:
+ udp_step = int(generator_config.gen_config.udp_port_step)
+ udp_src_size = self.check_range_size(int(src_max) - int(src_min) + 1, udp_step)
+ udp_dst_size = self.check_range_size(int(dst_max) - int(dst_min) + 1, udp_step)
+ lcm_port = lcm(udp_src_size, udp_dst_size)
+ if self.ip_src_static is True:
+ lcm_ip = lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
+ else:
+ lcm_ip = lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
+ flow_max = lcm(lcm_port, lcm_ip)
+ if self.flow_count > flow_max:
+ raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
+ (self.flow_count, flow_max))
+
+ self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size,
+ generator_config.gen_config.udp_port_step)
+
+ self.ip_block = IpBlock(self.ip, step, self.ip_size)
+
+ self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
+ generator_config.gateway_ip_addrs_step,
+ self.chain_count)
+ self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
+ self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
+ generator_config.tg_gateway_ip_addrs_step,
+ self.chain_count)
+
+ def limit_ip_udp_ranges(self, peer_ip_size, cur_chain_flow_count):
+ # init to min value in case of no matching values found with lcm calculation
+ new_src_ip_size = 1
+ new_peer_ip_size = 1
+ new_src_udp_size = 1
+ new_dst_udp_size = 1
+
+ if self.ip_src_static is True:
+ src_ip_size = 1
+ else:
+ src_ip_size = self.ip_size
+ ip_src_divisors = list(get_divisors(src_ip_size))
+ ip_dst_divisors = list(get_divisors(peer_ip_size))
+ udp_src_divisors = list(get_divisors(self.udp_ports.udp_src_size))
+ udp_dst_divisors = list(get_divisors(self.udp_ports.udp_dst_size))
+ fc = int(cur_chain_flow_count)
+ tuples_ip = list(find_tuples_equal_to_lcm_value(ip_src_divisors, ip_dst_divisors, fc))
+ tuples_udp = list(find_tuples_equal_to_lcm_value(udp_src_divisors, udp_dst_divisors, fc))
+
+ if tuples_ip:
+ new_src_ip_size = tuples_ip[-1][0]
+ new_peer_ip_size = tuples_ip[-1][1]
+
+ if tuples_udp:
+ new_src_udp_size = tuples_udp[-1][0]
+ new_dst_udp_size = tuples_udp[-1][1]
+
+ tuples_src = []
+ tuples_dst = []
+ if not tuples_ip and not tuples_udp:
+ # in case of not divisors in common matching LCM value (i.e. requested flow count)
+ # try to find an accurate UDP range to fit requested flow count
+ udp_src_int = range(self.udp_ports.src_min, self.udp_ports.src_max)
+ udp_dst_int = range(self.udp_ports.dst_min, self.udp_ports.dst_max)
+ tuples_src = list(find_tuples_equal_to_lcm_value(ip_src_divisors, udp_src_int, fc))
+ tuples_dst = list(find_tuples_equal_to_lcm_value(ip_dst_divisors, udp_dst_int, fc))
+
+ if not tuples_src and not tuples_dst:
+ # iterate IP and UDP ranges to find a tuple that match flow count values
+ src_ip_range = range(1,src_ip_size)
+ dst_ip_range = range(1, peer_ip_size)
+ tuples_src = list(find_tuples_equal_to_lcm_value(src_ip_range, udp_src_int, fc))
+ tuples_dst = list(find_tuples_equal_to_lcm_value(dst_ip_range, udp_dst_int, fc))
+
+ if tuples_src or tuples_dst:
+ if tuples_src:
+ new_src_ip_size = tuples_src[-1][0]
+ new_src_udp_size = tuples_src[-1][1]
+ if tuples_dst:
+ new_peer_ip_size = tuples_dst[-1][0]
+ new_dst_udp_size = tuples_dst[-1][1]
+ else:
+ if not tuples_ip:
+ if src_ip_size != 1:
+ if src_ip_size > fc:
+ new_src_ip_size = fc
+ else:
+ new_src_ip_size = find_max_size(src_ip_size, tuples_udp, fc)
+ if peer_ip_size != 1:
+ if peer_ip_size > fc:
+ new_peer_ip_size = fc
+ else:
+ new_peer_ip_size = find_max_size(peer_ip_size, tuples_udp, fc)
+
+ if not tuples_udp:
+ if self.udp_ports.udp_src_size != 1:
+ if self.udp_ports.udp_src_size > fc:
+ new_src_udp_size = fc
+ else:
+ new_src_udp_size = find_max_size(self.udp_ports.udp_src_size,
+ tuples_ip, fc)
+ if self.udp_ports.udp_dst_size != 1:
+ if self.udp_ports.udp_dst_size > fc:
+ new_dst_udp_size = fc
+ else:
+ new_dst_udp_size = find_max_size(self.udp_ports.udp_dst_size,
+ tuples_ip, fc)
+ max_possible_flows = lcm(lcm(new_src_ip_size, new_peer_ip_size),
+ lcm(new_src_udp_size, new_dst_udp_size))
+
+ LOG.debug("IP dst size: %d", new_peer_ip_size)
+ LOG.debug("LCM IP: %d", lcm(new_src_ip_size, new_peer_ip_size))
+ LOG.debug("LCM UDP: %d", lcm(new_src_udp_size, new_dst_udp_size))
+ LOG.debug("Global LCM: %d", max_possible_flows)
+ LOG.debug("IP src size: %d, IP dst size: %d, UDP src size: %d, UDP dst size: %d",
+ new_src_ip_size, new_peer_ip_size, self.udp_ports.udp_src_size,
+ self.udp_ports.udp_dst_size)
+ if not max_possible_flows == cur_chain_flow_count:
+ if (self.ip_addrs_step != '0.0.0.1' or self.udp_ports.step != '1') and not (
+ self.ip_addrs_step == 'random' and self.udp_ports.step == 'random'):
+ LOG.warning("Current values of ip_addrs_step and/or udp_port_step properties "
+ "do not allow to control an accurate flow count. "
+ "Values will be overridden as follows:")
+ if self.ip_addrs_step != '0.0.0.1':
+ LOG.info("ip_addrs_step='0.0.0.1' (previous value: ip_addrs_step='%s')",
+ self.ip_addrs_step)
+ self.ip_addrs_step = '0.0.0.1'
+
+ if self.udp_ports.step != '1':
+ LOG.info("udp_port_step='1' (previous value: udp_port_step='%s')",
+ self.udp_ports.step)
+ self.udp_ports.step = '1'
+ # override config for not logging random step warning message in trex_gen.py
+ self.generator_config.gen_config.udp_port_step = self.udp_ports.step
+ else:
+ LOG.error("Current values of ip_addrs_step and udp_port_step properties "
+ "do not allow to control an accurate flow count.")
+ else:
+ src_ip_size = new_src_ip_size
+ peer_ip_size = new_peer_ip_size
+ self.udp_ports.udp_src_size = new_src_udp_size
+ self.udp_ports.udp_dst_size = new_dst_udp_size
+ return src_ip_size, peer_ip_size
+
+ @staticmethod
+ def define_udp_range(udp_port, property_name):
+ if isinstance(udp_port, int):
+ min = udp_port
+ max = min
+ elif isinstance(udp_port, tuple):
+ min = udp_port[0]
+ max = udp_port[1]
+ else:
+ raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
+ % property_name)
+ return max, min
+
+
+ @staticmethod
+ def check_range_size(range_size, step):
+ """Check and set the available IPs or UDP ports, considering the step."""
+ try:
+ if range_size % step == 0:
+ value = range_size // step
+ else:
+ value = range_size // step + 1
+ return value
+ except ZeroDivisionError:
+ raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError
def set_mac(self, mac):
+ """Set the local MAC for this port device."""
if mac is None:
raise TrafficClientException('Trying to set traffic generator MAC address as None')
self.mac = mac
- def set_destination(self, dst):
- self.dst = dst
+ def get_peer_device(self):
+ """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
+ return self.generator_config.devices[1 - self.port]
- def set_vm_mac_list(self, vm_mac_list):
- self.vm_mac_list = map(str, vm_mac_list)
+ def set_vtep_dst_mac(self, dest_macs):
+ """Set the list of dest MACs indexed by the chain id.
- def set_vlan_tag(self, vlan_tag):
- if self.vlan_tagging and vlan_tag is None:
- raise TrafficClientException('Trying to set VLAN tag as None')
- self.vlan_tag = vlan_tag
+ This is only called in 2 cases:
+ - VM macs discovered using openstack API
+ - dest MACs provisioned in config file
+ """
+ self.vtep_dst_mac = list(map(str, dest_macs))
+
+ def set_dest_macs(self, dest_macs):
+ """Set the list of dest MACs indexed by the chain id.
+
+ This is only called in 2 cases:
+ - VM macs discovered using openstack API
+ - dest MACs provisioned in config file
+ """
+ self.dest_macs = list(map(str, dest_macs))
+
+ def get_dest_macs(self):
+ """Get the list of dest macs for this device.
+
+ If set_dest_macs was never called, assumes l2-loopback and return
+ a list of peer mac (as many as chains but normally only 1 chain)
+ """
+ if self.dest_macs:
+ return self.dest_macs
+ # assume this is l2-loopback
+ return [self.get_peer_device().mac] * self.chain_count
+
+ def set_vlans(self, vlans):
+ """Set the list of vlans to use indexed by the chain id."""
+ self.vlans = vlans
+ LOG.info("Port %d: VLANs %s", self.port, self.vlans)
+
+ def set_vtep_vlan(self, vlan):
+ """Set the vtep vlan to use indexed by specific port."""
+ self.vtep_vlan = vlan
+ self.vxlan = True
+ self.vlan_tagging = None
+ LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
+
+ def set_vxlan_endpoints(self, src_ip, dst_ip):
+ self.vtep_dst_ip = dst_ip
+ self.vtep_src_ip = src_ip
+ LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
+ self.vtep_src_ip, self.vtep_dst_ip)
+
+ def set_mpls_peers(self, src_ip, dst_ip):
+ self.mpls = True
+ self.vtep_dst_ip = dst_ip
+ self.vtep_src_ip = src_ip
+ LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
+ self.vtep_src_ip, self.vtep_dst_ip)
+
+ def set_vxlans(self, vnis):
+ self.vnis = vnis
+ LOG.info("Port %d: VNIs %s", self.port, self.vnis)
+
+ def set_mpls_inner_labels(self, labels):
+ self.inner_labels = labels
+ LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
+
+ def set_mpls_outer_labels(self, labels):
+ self.outer_labels = labels
+ LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
+
+ def set_gw_ip(self, gateway_ip):
+ self.gw_ip_block = IpBlock(gateway_ip,
+ self.generator_config.gateway_ip_addrs_step,
+ self.chain_count)
def get_gw_ip(self, chain_index):
- '''Retrieve the IP address assigned for the gateway of a given chain
- '''
+ """Retrieve the IP address assigned for the gateway of a given chain."""
return self.gw_ip_block.get_ip(chain_index)
- def get_stream_configs(self, service_chain):
+ def get_stream_configs(self):
+ """Get the stream config for a given chain on this device.
+
+ Called by the traffic generator driver to program the traffic generator properly
+ before generating traffic
+ """
configs = []
# exact flow count for each chain is calculated as follows:
# - all chains except the first will have the same flow count
# calculated as (total_flows + chain_count - 1) / chain_count
# - the first chain will have the remainder
# example 11 flows and 3 chains => 3, 4, 4
- flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
- cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
+ flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
+ cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
+ peer = self.get_peer_device()
self.ip_block.reset_reservation()
- self.dst.ip_block.reset_reservation()
+ peer.ip_block.reset_reservation()
+ dest_macs = self.get_dest_macs()
+
+ # limit ranges of UDP ports and IP to avoid overflow of the number of flows
+ peer_size = peer.ip_size // self.chain_count
+
+ for chain_idx in range(self.chain_count):
+ src_ip_size, peer_ip_size = self.limit_ip_udp_ranges(peer_size, cur_chain_flow_count)
+
+ src_ip_first, src_ip_last = self.ip_block.reserve_ip_range \
+ (src_ip_size)
+ dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range \
+ (peer_ip_size)
+
+ if self.ip_addrs_step != 'random':
+ src_ip_last = self.ip_block.get_ip_from_chain_first_ip(
+ Device.ip_to_int(src_ip_first), src_ip_size - 1)
+ dst_ip_last = peer.ip_block.get_ip_from_chain_first_ip(
+ Device.ip_to_int(dst_ip_first), peer_ip_size - 1)
+ if self.udp_ports.step != 'random':
+ self.udp_ports.src_max = self.udp_ports.get_src_max(self.udp_ports.udp_src_size - 1)
+ self.udp_ports.dst_max = self.udp_ports.get_dst_max(self.udp_ports.udp_dst_size - 1)
+ if self.ip_src_static:
+ src_ip_last = src_ip_first
+
+ LOG.info("Port %d, chain %d: IP src range [%s,%s]", self.port, chain_idx,
+ src_ip_first, src_ip_last)
+ LOG.info("Port %d, chain %d: IP dst range [%s,%s]", self.port, chain_idx,
+ dst_ip_first, dst_ip_last)
+ LOG.info("Port %d, chain %d: UDP src range [%s,%s]", self.port, chain_idx,
+ self.udp_ports.src_min, self.udp_ports.src_max)
+ LOG.info("Port %d, chain %d: UDP dst range [%s,%s]", self.port, chain_idx,
+ self.udp_ports.dst_min, self.udp_ports.dst_max)
- for chain_idx in xrange(self.chain_count):
- src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
- dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count)
configs.append({
'count': cur_chain_flow_count,
'mac_src': self.mac,
- 'mac_dst': self.dst.mac if service_chain == ChainType.EXT else self.vm_mac_list[
- chain_idx],
+ 'mac_dst': dest_macs[chain_idx],
'ip_src_addr': src_ip_first,
'ip_src_addr_max': src_ip_last,
- 'ip_src_count': cur_chain_flow_count,
+ 'ip_src_count': src_ip_size,
'ip_dst_addr': dst_ip_first,
'ip_dst_addr_max': dst_ip_last,
- 'ip_dst_count': cur_chain_flow_count,
+ 'ip_dst_count': peer_ip_size,
'ip_addrs_step': self.ip_addrs_step,
- 'udp_src_port': self.udp_src_port,
- 'udp_dst_port': self.udp_dst_port,
+ 'ip_src_static': self.ip_src_static,
+ 'udp_src_port': self.udp_ports.src_min,
+ 'udp_src_port_max': self.udp_ports.src_max,
+ 'udp_src_count': self.udp_ports.udp_src_size,
+ 'udp_dst_port': self.udp_ports.dst_min,
+ 'udp_dst_port_max': self.udp_ports.dst_max,
+ 'udp_dst_count': self.udp_ports.udp_dst_size,
+ 'udp_port_step': self.udp_ports.step,
'mac_discovery_gw': self.get_gw_ip(chain_idx),
'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
- 'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx),
- 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
+ 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
+ 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
+ 'vxlan': self.vxlan,
+ 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
+ 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
+ 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
+ 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
+ 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
+ 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
+ 'mpls': self.mpls,
+ 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
+ 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
+
})
# after first chain, fall back to the flow count for all other chains
cur_chain_flow_count = flows_per_chain
-
return configs
- def ip_range_overlaps(self):
- '''Check if this device ip range is overlapping with the dst device ip range
- '''
- src_base_ip = Device.ip_to_int(self.ip)
- dst_base_ip = Device.ip_to_int(self.dst.ip)
- src_last_ip = src_base_ip + self.flow_count - 1
- dst_last_ip = dst_base_ip + self.flow_count - 1
- return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip
-
- @staticmethod
- def mac_to_int(mac):
- return int(mac.translate(None, ":.- "), 16)
-
- @staticmethod
- def int_to_mac(i):
- mac = format(i, 'x').zfill(12)
- blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
- return ':'.join(blocks)
-
@staticmethod
def ip_to_int(addr):
+ """Convert an IP address from string to numeric."""
return struct.unpack("!I", socket.inet_aton(addr))[0]
@staticmethod
def int_to_ip(nvalue):
- return socket.inet_ntoa(struct.pack("!I", nvalue))
+ """Convert an IP address from numeric to string."""
+ return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
-class RunningTrafficProfile(object):
+class GeneratorConfig(object):
"""Represents traffic configuration for currently running traffic profile."""
DEFAULT_IP_STEP = '0.0.0.1'
DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
- def __init__(self, config, generator_profile):
- generator_config = self.__match_generator_profile(config.traffic_generator,
- generator_profile)
- self.generator_config = generator_config
+ def __init__(self, config):
+ """Create a generator config."""
+ self.config = config
+ # name of the generator profile (normally trex or dummy)
+ # pick the default one if not specified explicitly from cli options
+ if not config.generator_profile:
+ config.generator_profile = config.traffic_generator.default_profile
+ # pick up the profile dict based on the name
+ gen_config = self.__match_generator_profile(config.traffic_generator,
+ config.generator_profile)
+ self.gen_config = gen_config
+ # copy over fields from the dict
+ self.tool = gen_config.tool
+ self.ip = gen_config.ip
+ # overrides on config.cores and config.mbuf_factor
+ if config.cores:
+ self.cores = config.cores
+ else:
+ self.cores = gen_config.get('cores', 1)
+ # let's report the value actually used in the end
+ config.cores_used = self.cores
+ self.mbuf_factor = config.mbuf_factor
+ self.mbuf_64 = config.mbuf_64
+ self.hdrh = not config.disable_hdrh
+ if config.intf_speed:
+ # interface speed is overriden from the command line
+ self.intf_speed = config.intf_speed
+ elif gen_config.intf_speed:
+ # interface speed is overriden from the generator config
+ self.intf_speed = gen_config.intf_speed
+ else:
+ self.intf_speed = "auto"
+ if self.intf_speed in ("auto", "0"):
+ # interface speed is discovered/provided by the traffic generator
+ self.intf_speed = 0
+ else:
+ self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits
+ self.name = gen_config.name
+ self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
+ self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
+ self.limit_memory = gen_config.get('limit_memory', 1024)
+ self.software_mode = gen_config.get('software_mode', False)
+ self.interfaces = gen_config.interfaces
+ if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
+ raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
self.service_chain = config.service_chain
self.service_chain_count = config.service_chain_count
self.flow_count = config.flow_count
- self.host_name = generator_config.host_name
- self.name = generator_config.name
- self.tool = generator_config.tool
- self.cores = generator_config.get('cores', 1)
- self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
+ self.host_name = gen_config.host_name
+ self.bidirectional = config.traffic.bidirectional
+ self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
+ self.ip_addrs = gen_config.ip_addrs
+ self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
self.tg_gateway_ip_addrs_step = \
- generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
- self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
- self.gateway_ips = generator_config.gateway_ip_addrs
- self.ip = generator_config.ip
- self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits
+ gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
+ self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
+ self.gateway_ips = gen_config.gateway_ip_addrs
+ self.ip_src_static = gen_config.ip_src_static
+ self.vteps = gen_config.get('vteps')
+ self.devices = [Device(port, self) for port in [0, 1]]
+ # This should normally always be [0, 1]
+ self.ports = [device.port for device in self.devices]
+
+ # check that pci is not empty
+ if not gen_config.interfaces[0].get('pci', None) or \
+ not gen_config.interfaces[1].get('pci', None):
+ raise TrafficClientException("configuration interfaces pci fields cannot be empty")
+
+ self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
self.vlan_tagging = config.vlan_tagging
- self.no_arp = config.no_arp
- self.src_device = None
- self.dst_device = None
- self.vm_mac_list = None
- self.__prep_interfaces(generator_config)
-
- def to_json(self):
- return dict(self.generator_config)
- def set_vm_mac_list(self, vm_mac_list):
- self.src_device.set_vm_mac_list(vm_mac_list[0])
- self.dst_device.set_vm_mac_list(vm_mac_list[1])
+ # needed for result/summarizer
+ config['tg-name'] = gen_config.name
+ config['tg-tool'] = self.tool
- @staticmethod
- def __match_generator_profile(traffic_generator, generator_profile):
- generator_config = AttrDict(traffic_generator)
- generator_config.pop('default_profile')
- generator_config.pop('generator_profile')
- matching_profile = [profile for profile in traffic_generator.generator_profile if
- profile.name == generator_profile]
- if len(matching_profile) != 1:
- raise Exception('Traffic generator profile not found: ' + generator_profile)
+ def to_json(self):
+ """Get json form to display the content into the overall result dict."""
+ return dict(self.gen_config)
- generator_config.update(matching_profile[0])
-
- return generator_config
-
- def __prep_interfaces(self, generator_config):
- src_config = {
- 'chain_count': self.service_chain_count,
- 'flow_count': self.flow_count / 2,
- 'ip': generator_config.ip_addrs[0],
- 'ip_addrs_step': self.ip_addrs_step,
- 'gateway_ip': self.gateway_ips[0],
- 'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
- 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
- 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
- 'udp_src_port': generator_config.udp_src_port,
- 'udp_dst_port': generator_config.udp_dst_port,
- 'vlan_tagging': self.vlan_tagging
- }
- dst_config = {
- 'chain_count': self.service_chain_count,
- 'flow_count': self.flow_count / 2,
- 'ip': generator_config.ip_addrs[1],
- 'ip_addrs_step': self.ip_addrs_step,
- 'gateway_ip': self.gateway_ips[1],
- 'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
- 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
- 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
- 'udp_src_port': generator_config.udp_src_port,
- 'udp_dst_port': generator_config.udp_dst_port,
- 'vlan_tagging': self.vlan_tagging
- }
+ def set_dest_macs(self, port_index, dest_macs):
+ """Set the list of dest MACs indexed by the chain id on given port.
- self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
- self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1]))
- self.src_device.set_destination(self.dst_device)
- self.dst_device.set_destination(self.src_device)
+ port_index: the port for which dest macs must be set
+ dest_macs: a list of dest MACs indexed by chain id
+ """
+ if len(dest_macs) < self.config.service_chain_count:
+ raise TrafficClientException('Dest MAC list %s must have %d entries' %
+ (dest_macs, self.config.service_chain_count))
+ # only pass the first scc dest MACs
+ self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
+ LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
+
+ def set_vtep_dest_macs(self, port_index, dest_macs):
+ """Set the list of dest MACs indexed by the chain id on given port.
+
+ port_index: the port for which dest macs must be set
+ dest_macs: a list of dest MACs indexed by chain id
+ """
+ if len(dest_macs) != self.config.service_chain_count:
+ raise TrafficClientException('Dest MAC list %s must have %d entries' %
+ (dest_macs, self.config.service_chain_count))
+ self.devices[port_index].set_vtep_dst_mac(dest_macs)
+ LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
- if self.service_chain == ChainType.EXT and not self.no_arp \
- and self.src_device.ip_range_overlaps():
- raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' %
- self.src_device.ip,
- self.dst_device.ip,
- self.flow_count)
+ def get_dest_macs(self):
+ """Return the list of dest macs indexed by port."""
+ return [dev.get_dest_macs() for dev in self.devices]
- @property
- def devices(self):
- return [self.src_device, self.dst_device]
+ def set_vlans(self, port_index, vlans):
+ """Set the list of vlans to use indexed by the chain id on given port.
- @property
- def vlans(self):
- return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan]
+ port_index: the port for which VLANs must be set
+ vlans: a list of vlan lists indexed by chain id
+ """
+ if len(vlans) != self.config.service_chain_count:
+ raise TrafficClientException('VLAN list %s must have %d entries' %
+ (vlans, self.config.service_chain_count))
+ self.devices[port_index].set_vlans(vlans)
- @property
- def ports(self):
- return [self.src_device.port, self.dst_device.port]
+ def set_vxlans(self, port_index, vxlans):
+ """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
- @property
- def switch_ports(self):
- return [self.src_device.switch_port, self.dst_device.switch_port]
+ port_index: the port for which VXLANs must be set
+ VXLANs: a list of VNIs lists indexed by chain id
+ """
+ if len(vxlans) != self.config.service_chain_count:
+ raise TrafficClientException('VXLAN list %s must have %d entries' %
+ (vxlans, self.config.service_chain_count))
+ self.devices[port_index].set_vxlans(vxlans)
- @property
- def pcis(self):
- return [self.src_device.pci, self.dst_device.pci]
+ def set_mpls_inner_labels(self, port_index, labels):
+ """Set the list of MPLS Labels to use indexed by the chain id on given port.
+ port_index: the port for which Labels must be set
+ Labels: a list of Labels lists indexed by chain id
+ """
+ if len(labels) != self.config.service_chain_count:
+ raise TrafficClientException('Inner MPLS list %s must have %d entries' %
+ (labels, self.config.service_chain_count))
+ self.devices[port_index].set_mpls_inner_labels(labels)
-class TrafficGeneratorFactory(object):
- def __init__(self, config):
- self.config = config
+ def set_mpls_outer_labels(self, port_index, labels):
+ """Set the list of MPLS Labels to use indexed by the chain id on given port.
- def get_tool(self):
- return self.config.generator_config.tool
+ port_index: the port for which Labels must be set
+ Labels: a list of Labels lists indexed by chain id
+ """
+ if len(labels) != self.config.service_chain_count:
+ raise TrafficClientException('Outer MPLS list %s must have %d entries' %
+ (labels, self.config.service_chain_count))
+ self.devices[port_index].set_mpls_outer_labels(labels)
+
+ def set_vtep_vlan(self, port_index, vlan):
+ """Set the vtep vlan to use indexed by the chain id on given port.
+ port_index: the port for which VLAN must be set
+ """
+ self.devices[port_index].set_vtep_vlan(vlan)
- def get_generator_client(self):
- tool = self.get_tool().lower()
- if tool == 'trex':
- from traffic_gen import trex
- return trex.TRex(self.config)
- elif tool == 'dummy':
- from traffic_gen import dummy
- return dummy.DummyTG(self.config)
- return None
+ def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
+ self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
- def list_generator_profile(self):
- return [profile.name for profile in self.config.traffic_generator.generator_profile]
+ def set_mpls_peers(self, port_index, src_ip, dst_ip):
+ self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
- def get_generator_config(self, generator_profile):
- return RunningTrafficProfile(self.config, generator_profile)
+ @staticmethod
+ def __match_generator_profile(traffic_generator, generator_profile):
+ gen_config = AttrDict(traffic_generator)
+ gen_config.pop('default_profile')
+ gen_config.pop('generator_profile')
+ matching_profile = [profile for profile in traffic_generator.generator_profile if
+ profile.name == generator_profile]
+ if len(matching_profile) != 1:
+ raise Exception('Traffic generator profile not found: ' + generator_profile)
- def get_matching_profile(self, traffic_profile_name):
- matching_profile = [profile for profile in self.config.traffic_profile if
- profile.name == traffic_profile_name]
+ gen_config.update(matching_profile[0])
+ return gen_config
- if len(matching_profile) > 1:
- raise Exception('Multiple traffic profiles with the same name found.')
- elif not matching_profile:
- raise Exception('No traffic profile found.')
- return matching_profile[0]
+class TrafficClient(object):
+ """Traffic generator client with NDR/PDR binary seearch."""
- def get_frame_sizes(self, traffic_profile):
- matching_profile = self.get_matching_profile(traffic_profile)
- return matching_profile.l2frame_size
+ PORTS = [0, 1]
+ def __init__(self, config, notifier=None):
+ """Create a new TrafficClient instance.
-class TrafficClient(object):
- PORTS = [0, 1]
+ config: nfvbench config
+ notifier: notifier (optional)
- def __init__(self, config, notifier=None, skip_sleep=False):
- generator_factory = TrafficGeneratorFactory(config)
- self.gen = generator_factory.get_generator_client()
- self.tool = generator_factory.get_tool()
+ A new instance is created everytime the nfvbench config may have changed.
+ """
self.config = config
+ self.generator_config = GeneratorConfig(config)
+ self.tool = self.generator_config.tool
+ self.gen = self._get_generator()
self.notifier = notifier
self.interval_collector = None
self.iteration_collector = None
- self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
- if self.gen is None:
- raise TrafficClientException('%s is not a supported traffic generator' % self.tool)
-
+ self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
+ self.config.service_mode)
+ self.config.frame_sizes = self._get_frame_sizes()
self.run_config = {
'l2frame_size': None,
'duration_sec': self.config.duration_sec,
@@ -417,86 +806,185 @@ class TrafficClient(object):
self.current_total_rate = {'rate_percent': '10'}
if self.config.single_run:
self.current_total_rate = utils.parse_rate_str(self.config.rate)
- # UT with dummy TG can bypass all sleeps
- self.skip_sleep = skip_sleep
+ self.ifstats = None
+ # Speed is either discovered when connecting to TG or set from config
+ # This variable is 0 if not yet discovered from TG or must be the speed of
+ # each interface in bits per second
+ self.intf_speed = self.generator_config.intf_speed
+
+ def _get_generator(self):
+ tool = self.tool.lower()
+ if tool == 'trex':
+ from .traffic_gen import trex_gen
+ return trex_gen.TRex(self)
+ if tool == 'dummy':
+ from .traffic_gen import dummy
+ return dummy.DummyTG(self)
+ raise TrafficClientException('Unsupported generator tool name:' + self.tool)
- def set_macs(self):
- for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
- device.set_mac(mac)
+ def skip_sleep(self):
+ """Skip all sleeps when doing unit testing with dummy TG.
+
+ Must be overriden using mock.patch
+ """
+ return False
+
+ def _get_frame_sizes(self):
+ traffic_profile_name = self.config.traffic.profile
+ matching_profiles = [profile for profile in self.config.traffic_profile if
+ profile.name == traffic_profile_name]
+ if len(matching_profiles) > 1:
+ raise TrafficClientException('Multiple traffic profiles with name: ' +
+ traffic_profile_name)
+ if not matching_profiles:
+ raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
+ return matching_profiles[0].l2frame_size
def start_traffic_generator(self):
- self.gen.init()
+ """Start the traffic generator process (traffic not started yet)."""
self.gen.connect()
+ # pick up the interface speed if it is not set from config
+ intf_speeds = self.gen.get_port_speed_gbps()
+ # convert Gbps unit into bps
+ tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
+ if self.intf_speed:
+ # interface speed is overriden from config
+ if self.intf_speed != tg_if_speed:
+ # Warn the user if the speed in the config is different
+ LOG.warning(
+ 'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)',
+ self.intf_speed / 1000000000.0, intf_speeds[0])
+ else:
+ # interface speed not provisioned by config
+ self.intf_speed = tg_if_speed
+ # also update the speed in the tg config
+ self.generator_config.intf_speed = tg_if_speed
+ # let's report detected and actually used interface speed
+ self.config.intf_speed_detected = tg_if_speed
+ self.config.intf_speed_used = self.intf_speed
+
+ # Save the traffic generator local MAC
+ for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
+ device.set_mac(mac)
def setup(self):
- self.gen.set_mode()
- self.gen.config_interface()
+ """Set up the traffic client."""
self.gen.clear_stats()
def get_version(self):
+ """Get the traffic generator version."""
return self.gen.get_version()
def ensure_end_to_end(self):
- """
- Ensure traffic generator receives packets it has transmitted.
+ """Ensure traffic generator receives packets it has transmitted.
+
This ensures end to end connectivity and also waits until VMs are ready to forward packets.
- At this point all VMs are in active state, but forwarding does not have to work.
- Small amount of traffic is sent to every chain. Then total of sent and received packets
- is compared. If ratio between received and transmitted packets is higher than (N-1)/N,
- N being number of chains, traffic flows through every chain and real measurements can be
- performed.
+ VMs that are started and in active state may not pass traffic yet. It is imperative to make
+ sure that all VMs are passing traffic in both directions before starting any benchmarking.
+ To verify this, we need to send at a low frequency bi-directional packets and make sure
+ that we receive all packets back from all VMs. The number of flows is equal to 2 times
+ the number of chains (1 per direction) and we need to make sure we receive packets coming
+ from exactly 2 x chain count different source MAC addresses.
Example:
PVP chain (1 VM per chain)
N = 10 (number of chains)
- threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions)
- if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning
- all 10 VMs are in operational state.
+ Flow count = 20 (number of flows)
+ If the number of unique source MAC addresses from received packets is 20 then
+ all 10 VMs 10 VMs are in operational state.
"""
LOG.info('Starting traffic generator to ensure end-to-end connectivity')
- rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)}
- self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
-
+ # send 2pps on each chain and each direction
+ rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
+ self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
+ e2e=True)
# ensures enough traffic is coming back
- threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
- retry_count = (self.config.check_traffic_time_sec +
- self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
- for it in xrange(retry_count):
+ retry_count = int((self.config.check_traffic_time_sec +
+ self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
+
+ # we expect to see packets coming from 2 unique MAC per chain
+ # because there can be flooding in the case of shared net
+ # we must verify that packets from the right VMs are received
+ # and not just count unique src MAC
+ # create a dict of (port, chain) tuples indexed by dest mac
+ mac_map = {}
+ for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
+ for chain, mac in enumerate(dest_macs):
+ mac_map[mac] = (port, chain)
+ unique_src_mac_count = len(mac_map)
+ if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
+ get_mac_id = lambda packet: packet['binary'][60:66]
+ elif self.config.vxlan:
+ get_mac_id = lambda packet: packet['binary'][56:62]
+ elif self.config.mpls:
+ get_mac_id = lambda packet: packet['binary'][24:30]
+ # mpls_transport_label = lambda packet: packet['binary'][14:18]
+ else:
+ get_mac_id = lambda packet: packet['binary'][6:12]
+ for it in range(retry_count):
self.gen.clear_stats()
self.gen.start_traffic()
- LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count)
- if not self.skip_sleep:
+ self.gen.start_capture()
+ LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
+ unique_src_mac_count - len(mac_map), unique_src_mac_count,
+ it + 1, retry_count)
+ if not self.skip_sleep():
time.sleep(self.config.generic_poll_sec)
self.gen.stop_traffic()
- stats = self.gen.get_stats()
-
- # compute total sent and received traffic on both ports
- total_rx = 0
- total_tx = 0
- for port in self.PORTS:
- total_rx += float(stats[port]['rx'].get('total_pkts', 0))
- total_tx += float(stats[port]['tx'].get('total_pkts', 0))
-
- # how much of traffic came back
- ratio = total_rx / total_tx if total_tx else 0
-
- if ratio > threshold:
- self.gen.clear_stats()
- self.gen.clear_streamblock()
- LOG.info('End-to-end connectivity ensured')
- return
+ self.gen.fetch_capture_packets()
+ self.gen.stop_capture()
+ for packet in self.gen.packet_list:
+ mac_id = get_mac_id(packet).decode('latin-1')
+ src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
+ if self.config.mpls:
+ if src_mac in mac_map and self.is_mpls(packet):
+ port, chain = mac_map[src_mac]
+ LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
+ src_mac, chain, port)
+ mac_map.pop(src_mac, None)
+ else:
+ if src_mac in mac_map and self.is_udp(packet):
+ port, chain = mac_map[src_mac]
+ LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
+ src_mac, chain, port)
+ mac_map.pop(src_mac, None)
+
+ if not mac_map:
+ LOG.info('End-to-end connectivity established')
+ return
+ if self.config.l3_router and not self.config.no_arp:
+ # In case of L3 traffic mode, routers are not able to route traffic
+ # until VM interfaces are up and ARP requests are done
+ LOG.info('Waiting for loopback service completely started...')
+ LOG.info('Sending ARP request to assure end-to-end connectivity established')
+ self.ensure_arp_successful()
+ raise TrafficClientException('End-to-end connectivity cannot be ensured')
- if not self.skip_sleep:
- time.sleep(self.config.generic_poll_sec)
+ def is_udp(self, packet):
+ pkt = Ether(packet['binary'])
+ return UDP in pkt
- raise TrafficClientException('End-to-end connectivity cannot be ensured')
+ def is_mpls(self, packet):
+ pkt = Ether(packet['binary'])
+ return MPLS in pkt
def ensure_arp_successful(self):
- if not self.gen.resolve_arp():
+ """Resolve all IP using ARP and throw an exception in case of failure."""
+ dest_macs = self.gen.resolve_arp()
+ if dest_macs:
+ # all dest macs are discovered, saved them into the generator config
+ if self.config.vxlan or self.config.mpls:
+ self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
+ self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
+ else:
+ self.generator_config.set_dest_macs(0, dest_macs[0])
+ self.generator_config.set_dest_macs(1, dest_macs[1])
+ else:
raise TrafficClientException('ARP cannot be resolved')
def set_traffic(self, frame_size, bidirectional):
+ """Reconfigure the traffic generator for a new frame size."""
self.run_config['bidirectional'] = bidirectional
self.run_config['l2frame_size'] = frame_size
self.run_config['rates'] = [self.get_per_direction_rate()]
@@ -506,11 +994,21 @@ class TrafficClient(object):
unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
if unidir_reverse_pps > 0:
self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
+ # Fix for [NFVBENCH-67], convert the rate string to PPS
+ for idx, rate in enumerate(self.run_config['rates']):
+ if 'rate_pps' not in rate:
+ self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
self.gen.clear_streamblock()
- self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
- def modify_load(self, load):
+ if self.config.no_latency_streams:
+ LOG.info("Latency streams are disabled")
+ # in service mode, we must disable flow stats (e2e=True)
+ self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
+ latency=not self.config.no_latency_streams,
+ e2e=self.runner.service_mode)
+
+ def _modify_load(self, load):
self.current_total_rate = {'rate_percent': str(load)}
rate_per_direction = self.get_per_direction_rate()
@@ -521,6 +1019,7 @@ class TrafficClient(object):
self.run_config['rates'][1] = rate_per_direction
def get_ndr_and_pdr(self):
+ """Start the NDR/PDR iteration and return the results."""
dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
targets = {}
if self.config.ndr_run:
@@ -563,31 +1062,38 @@ class TrafficClient(object):
return float(dropped_pkts) / total_pkts * 100
def get_stats(self):
- stats = self.gen.get_stats()
- retDict = {'total_tx_rate': stats['total_tx_rate']}
- for port in self.PORTS:
- retDict[port] = {'tx': {}, 'rx': {}}
+ """Collect final stats for previous run."""
+ stats = self.gen.get_stats(self.ifstats)
+ retDict = {'total_tx_rate': stats['total_tx_rate'],
+ 'offered_tx_rate_bps': stats['offered_tx_rate_bps'],
+ 'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'],
+ 'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']}
+
+ if self.config.periodic_gratuitous_arp:
+ retDict['garp_total_tx_rate'] = stats['garp_total_tx_rate']
tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
rx_keys = tx_keys + ['dropped_pkts']
for port in self.PORTS:
+ port_stats = {'tx': {}, 'rx': {}}
for key in tx_keys:
- retDict[port]['tx'][key] = int(stats[port]['tx'][key])
+ port_stats['tx'][key] = int(stats[port]['tx'][key])
for key in rx_keys:
try:
- retDict[port]['rx'][key] = int(stats[port]['rx'][key])
+ port_stats['rx'][key] = int(stats[port]['rx'][key])
except ValueError:
- retDict[port]['rx'][key] = 0
- retDict[port]['rx']['avg_delay_usec'] = cast_integer(
+ port_stats['rx'][key] = 0
+ port_stats['rx']['avg_delay_usec'] = cast_integer(
stats[port]['rx']['avg_delay_usec'])
- retDict[port]['rx']['min_delay_usec'] = cast_integer(
+ port_stats['rx']['min_delay_usec'] = cast_integer(
stats[port]['rx']['min_delay_usec'])
- retDict[port]['rx']['max_delay_usec'] = cast_integer(
+ port_stats['rx']['max_delay_usec'] = cast_integer(
stats[port]['rx']['max_delay_usec'])
- retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
+ port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
+ retDict[str(port)] = port_stats
- ports = sorted(retDict.keys())
+ ports = sorted(list(retDict.keys()), key=str)
if self.run_config['bidirectional']:
retDict['overall'] = {'tx': {}, 'rx': {}}
for key in tx_keys:
@@ -613,12 +1119,28 @@ class TrafficClient(object):
else:
retDict['overall'] = retDict[ports[0]]
retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
+
+ if 'overall_hdrh' in stats:
+ retDict['overall']['hdrh'] = stats.get('overall_hdrh', None)
+ decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
+ retDict['overall']['rx']['lat_percentile'] = {}
+ # override min max and avg from hdrh (only if histogram is valid)
+ if decoded_histogram.get_total_count() != 0:
+ retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
+ retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
+ retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
+ for percentile in self.config.lat_percentiles:
+ retDict['overall']['rx']['lat_percentile'][percentile] = \
+ decoded_histogram.get_value_at_percentile(percentile)
+ else:
+ for percentile in self.config.lat_percentiles:
+ retDict['overall']['rx']['lat_percentile'][percentile] = 'n/a'
return retDict
def __convert_rates(self, rate):
return utils.convert_rates(self.run_config['l2frame_size'],
rate,
- self.config.generator_config.intf_speed)
+ self.intf_speed)
def __ndr_pdr_found(self, tag, load):
rates = self.__convert_rates({'rate_percent': load})
@@ -628,6 +1150,7 @@ class TrafficClient(object):
def __format_output_stats(self, stats):
for key in self.PORTS + ['overall']:
+ key = str(key)
interface = stats[key]
stats[key] = {
'tx_pkts': interface['tx']['total_pkts'],
@@ -639,16 +1162,32 @@ class TrafficClient(object):
'min_delay_usec': interface['rx']['min_delay_usec'],
}
+ if key == 'overall':
+ if 'hdrh' in interface:
+ stats[key]['hdrh'] = interface.get('hdrh', None)
+ decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
+ stats[key]['lat_percentile'] = {}
+ # override min max and avg from hdrh (only if histogram is valid)
+ if decoded_histogram.get_total_count() != 0:
+ stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
+ stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
+ stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
+ for percentile in self.config.lat_percentiles:
+ stats[key]['lat_percentile'][percentile] = decoded_histogram.\
+ get_value_at_percentile(percentile)
+ else:
+ for percentile in self.config.lat_percentiles:
+ stats[key]['lat_percentile'][percentile] = 'n/a'
return stats
def __targets_found(self, rate, targets, results):
- for tag, target in targets.iteritems():
+ for tag, target in list(targets.items()):
LOG.info('Found %s (%s) load: %s', tag, target, rate)
self.__ndr_pdr_found(tag, rate)
results[tag]['timestamp_sec'] = time.time()
def __range_search(self, left, right, targets, results):
- '''Perform a binary search for a list of targets inside a [left..right] range or rate
+ """Perform a binary search for a list of targets inside a [left..right] range or rate.
left the left side of the range to search as a % the line rate (100 = 100% line rate)
indicating the rate to send on each interface
@@ -657,7 +1196,7 @@ class TrafficClient(object):
targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
('ndr', 'pdr')
results a dict to store results
- '''
+ """
if not targets:
return
LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
@@ -678,7 +1217,7 @@ class TrafficClient(object):
# Split target dicts based on the avg drop rate
left_targets = {}
right_targets = {}
- for tag, target in targets.iteritems():
+ for tag, target in list(targets.items()):
if stats['overall']['drop_rate_percent'] <= target:
# record the best possible rate found for this target
results[tag] = rates
@@ -718,8 +1257,22 @@ class TrafficClient(object):
self.__range_search(middle, right, right_targets, results)
def __run_search_iteration(self, rate):
- # set load
- self.modify_load(rate)
+ """Run one iteration at the given rate level.
+
+ rate: the rate to send on each port in percent (0 to 100)
+ """
+ self._modify_load(rate)
+
+ # There used to be a inconsistency in case of interface speed override.
+ # The emulated 'intf_speed' value is unknown to the T-Rex generator which
+ # refers to the detected line rate for converting relative traffic loads.
+ # Therefore, we need to convert actual rates here, in terms of packets/s.
+
+ for idx, str_rate in enumerate(self.gen.rates):
+ if str_rate.endswith('%'):
+ float_rate = float(str_rate.replace('%', '').strip())
+ pps_rate = self.__convert_rates({'rate_percent': float_rate})['rate_pps']
+ self.gen.rates[idx] = str(pps_rate) + 'pps'
# poll interval stats and collect them
for stats in self.run_traffic():
@@ -727,11 +1280,13 @@ class TrafficClient(object):
time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
if time_elapsed_ratio >= 1:
self.cancel_traffic()
+ if not self.skip_sleep():
+ time.sleep(self.config.pause_sec)
self.interval_collector.reset()
# get stats from the run
stats = self.runner.client.get_stats()
- current_traffic_config = self.get_traffic_config()
+ current_traffic_config = self._get_traffic_config()
warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
stats['total_tx_rate'])
if warning is not None:
@@ -740,26 +1295,34 @@ class TrafficClient(object):
# save reliable stats from whole iteration
self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
-
return stats, current_traffic_config['direction-total']
- @staticmethod
- def log_stats(stats):
- report = {
- 'datetime': str(datetime.now()),
- 'tx_packets': stats['overall']['tx']['total_pkts'],
- 'rx_packets': stats['overall']['rx']['total_pkts'],
- 'drop_packets': stats['overall']['rx']['dropped_pkts'],
- 'drop_rate_percent': stats['overall']['drop_rate_percent']
- }
- LOG.info('TX: %(tx_packets)d; '
- 'RX: %(rx_packets)d; '
- 'Dropped: %(drop_packets)d; '
- 'Drop rate: %(drop_rate_percent).4f%%',
- report)
+ def log_stats(self, stats):
+ """Log estimated stats during run."""
+ # Calculate a rolling drop rate based on differential to
+ # the previous reading
+ cur_tx = stats['overall']['tx']['total_pkts']
+ cur_rx = stats['overall']['rx']['total_pkts']
+ delta_tx = cur_tx - self.prev_tx
+ delta_rx = cur_rx - self.prev_rx
+ drops = delta_tx - delta_rx
+ if delta_tx == 0:
+ LOG.info("\x1b[1mConfiguration issue!\x1b[0m (no transmission)")
+ sys.exit(0)
+ drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
+ self.prev_tx = cur_tx
+ self.prev_rx = cur_rx
+ LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
+ format(cur_tx, ',d'),
+ format(cur_rx, ',d'),
+ format(drops, ',d'),
+ drop_rate_pct)
def run_traffic(self):
+ """Start traffic and return intermediate stats for each interval."""
stats = self.runner.run()
+ self.prev_tx = 0
+ self.prev_rx = 0
while self.runner.is_running:
self.log_stats(stats)
yield stats
@@ -771,18 +1334,10 @@ class TrafficClient(object):
yield stats
def cancel_traffic(self):
+ """Stop traffic."""
self.runner.stop()
- def get_interface(self, port_index):
- port = self.gen.port_handle[port_index]
- tx, rx = 0, 0
- if not self.config.no_traffic:
- stats = self.get_stats()
- if port in stats:
- tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
- return Interface('traffic-generator', self.tool.lower(), tx, rx)
-
- def get_traffic_config(self):
+ def _get_traffic_config(self):
config = {}
load_total = 0.0
bps_total = 0.0
@@ -808,29 +1363,121 @@ class TrafficClient(object):
return config
def get_run_config(self, results):
- """Returns configuration which was used for the last run."""
+ """Return configuration which was used for the last run."""
r = {}
+ # because we want each direction to have the far end RX rates,
+ # use the far end index (1-idx) to retrieve the RX rates
for idx, key in enumerate(["direction-forward", "direction-reverse"]):
- tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
- rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec
+ tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
+ rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
+
+ orig_rate = self.run_config['rates'][idx]
+ if self.config.periodic_gratuitous_arp:
+ orig_rate['rate_pps'] = float(
+ orig_rate['rate_pps']) - self.config.gratuitous_arp_pps
+
r[key] = {
- "orig": self.__convert_rates(self.run_config['rates'][idx]),
+ "orig": self.__convert_rates(orig_rate),
"tx": self.__convert_rates({'rate_pps': tx_rate}),
"rx": self.__convert_rates({'rate_pps': rx_rate})
}
+ if self.config.periodic_gratuitous_arp:
+ r['garp-direction-total'] = {
+ "orig": self.__convert_rates({'rate_pps': self.config.gratuitous_arp_pps * 2}),
+ "tx": self.__convert_rates({'rate_pps': results["stats"]["garp_total_tx_rate"]}),
+ "rx": self.__convert_rates({'rate_pps': 0})
+ }
+
total = {}
for direction in ['orig', 'tx', 'rx']:
total[direction] = {}
for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
-
- total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
+ total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
r['direction-total'] = total
+
return r
+ def insert_interface_stats(self, pps_list):
+ """Insert interface stats to a list of packet path stats.
+
+ pps_list: a list of packet path stats instances indexed by chain index
+
+ This function will insert the packet path stats for the traffic gen ports 0 and 1
+ with itemized per chain tx/rx counters.
+ There will be as many packet path stats as chains.
+ Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
+ self.pps_list:
+ [
+ PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
+ PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
+ ...
+ ]
+ """
+ def get_if_stats(chain_idx):
+ return [InterfaceStats('p' + str(port), self.tool)
+ for port in range(2)]
+ # keep the list of list of interface stats indexed by the chain id
+ self.ifstats = [get_if_stats(chain_idx)
+ for chain_idx in range(self.config.service_chain_count)]
+ # note that we need to make a copy of the ifs list so that any modification in the
+ # list from pps will not change the list saved in self.ifstats
+ self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
+ # insert the corresponding pps in the passed list
+ pps_list.extend(self.pps_list)
+
+ def update_interface_stats(self, diff=False):
+ """Update all interface stats.
+
+ diff: if False, simply refresh the interface stats values with latest values
+ if True, diff the interface stats with the latest values
+ Make sure that the interface stats inserted in insert_interface_stats() are updated
+ with proper values.
+ self.ifstats:
+ [
+ [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
+ [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
+ ...
+ ]
+ """
+ if diff:
+ stats = self.gen.get_stats(self.ifstats)
+ for chain_idx, ifs in enumerate(self.ifstats):
+ # each ifs has exactly 2 InterfaceStats and 2 Latency instances
+ # corresponding to the
+ # port 0 and port 1 for the given chain_idx
+ # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
+ # interface stats for the pps because it could have been modified to contain
+ # additional interface stats
+ self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
+ # special handling for vxlan
+ # in case of vxlan, flow stats are not available so all rx counters will be
+ # zeros when the total rx port counter is non zero.
+ # in that case,
+ for port in range(2):
+ total_rx = 0
+ for ifs in self.ifstats:
+ total_rx += ifs[port].rx
+ if total_rx == 0:
+ # check if the total port rx from Trex is also zero
+ port_rx = stats[port]['rx']['total_pkts']
+ if port_rx:
+ # the total rx for all chains from port level stats is non zero
+ # which means that the per-chain stats are not available
+ if len(self.ifstats) == 1:
+ # only one chain, simply report the port level rx to the chain rx stats
+ self.ifstats[0][port].rx = port_rx
+ else:
+ for ifs in self.ifstats:
+ # mark this data as unavailable
+ ifs[port].rx = None
+ # pitch in the total rx only in the last chain pps
+ self.ifstats[-1][port].rx_total = port_rx
+
@staticmethod
def compare_tx_rates(required, actual):
+ """Compare the actual TX rate to the required TX rate."""
threshold = 0.9
are_different = False
try:
@@ -849,6 +1496,7 @@ class TrafficClient(object):
return None
def get_per_direction_rate(self):
+ """Get the rate for each direction."""
divisor = 2 if self.run_config['bidirectional'] else 1
if 'rate_percent' in self.current_total_rate:
# don't split rate if it's percentage
@@ -857,6 +1505,7 @@ class TrafficClient(object):
return utils.divide_rate(self.current_total_rate, divisor)
def close(self):
+ """Close this instance."""
try:
self.gen.stop_traffic()
except Exception: