summaryrefslogtreecommitdiffstats
path: root/nfvbench/traffic_client.py
diff options
context:
space:
mode:
authorahothan <ahothan@cisco.com>2017-07-28 17:08:46 -0700
committerahothan <ahothan@cisco.com>2017-07-31 12:34:00 -0700
commit04a7de082bd221eae3c7004f4e0b99dfa4f8be91 (patch)
treec9fb7beaedc80479772ba24c3b47c85d49c22f76 /nfvbench/traffic_client.py
parentb8f02ed4e72399840a93aceb02b8c53831bbe68a (diff)
Initial code drop from Cisco1.0.0
Change-Id: Ie2993886dc8e95c5f73ccdb871add8b96ffcc849 Signed-off-by: ahothan <ahothan@cisco.com>
Diffstat (limited to 'nfvbench/traffic_client.py')
-rw-r--r--nfvbench/traffic_client.py790
1 files changed, 790 insertions, 0 deletions
diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py
new file mode 100644
index 0000000..8bfcd76
--- /dev/null
+++ b/nfvbench/traffic_client.py
@@ -0,0 +1,790 @@
+# Copyright 2016 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from attrdict import AttrDict
+import bitmath
+from datetime import datetime
+from log import LOG
+from netaddr import IPNetwork
+from network import Interface
+import socket
+from specs import ChainType
+from stats_collector import IntervalCollector
+from stats_collector import IterationCollector
+import struct
+import time
+import traffic_gen.traffic_utils as utils
+
+
+class TrafficClientException(Exception):
+ pass
+
+
+class TrafficRunner(object):
+
+ def __init__(self, client, duration_sec, interval_sec=0):
+ self.client = client
+ self.start_time = None
+ self.duration_sec = duration_sec
+ self.interval_sec = interval_sec
+
+ def run(self):
+ LOG.info('Running traffic generator')
+ self.client.gen.clear_stats()
+ self.client.gen.start_traffic()
+ self.start_time = time.time()
+ return self.poll_stats()
+
+ def stop(self):
+ if self.is_running():
+ self.start_time = None
+ self.client.gen.stop_traffic()
+
+ def is_running(self):
+ return self.start_time is not None
+
+ def time_elapsed(self):
+ if self.is_running():
+ return time.time() - self.start_time
+ else:
+ return self.duration_sec
+
+ def poll_stats(self):
+ if not self.is_running():
+ return None
+ time_elapsed = self.time_elapsed()
+ if time_elapsed > self.duration_sec:
+ self.stop()
+ return None
+ time_left = self.duration_sec - time_elapsed
+ if self.interval_sec > 0.0:
+ if time_left <= self.interval_sec:
+ time.sleep(time_left)
+ self.stop()
+ else:
+ time.sleep(self.interval_sec)
+ else:
+ time.sleep(self.duration_sec)
+ self.stop()
+ return self.client.get_stats()
+
+
+class Device(object):
+
+ def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
+ gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
+ gateway_ip_addrs_step=None, chain_count=1, flow_count=1, vlan_tagging=False):
+ self.chain_count = chain_count
+ self.flow_count = flow_count
+ self.dst = None
+ self.port = port
+ self.switch_port = switch_port
+ self.vtep_vlan = vtep_vlan
+ self.vlan_tag = None
+ self.vlan_tagging = vlan_tagging
+ self.pci = pci
+ self.mac = None
+ self.vm_mac_list = None
+ subnet = IPNetwork(ip)
+ self.ip = subnet.ip.format()
+ self.ip_prefixlen = subnet.prefixlen
+ self.ip_addrs_step = ip_addrs_step
+ self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
+ self.gateway_ip_addrs_step = gateway_ip_addrs_step
+ self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count)
+ self.gateway_ip = gateway_ip
+ self.gateway_ip_list = self.expand_ip(self.gateway_ip,
+ self.gateway_ip_addrs_step,
+ self.chain_count)
+ self.tg_gateway_ip = tg_gateway_ip
+ self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip,
+ self.tg_gateway_ip_addrs_step,
+ self.chain_count)
+
+ def set_mac(self, mac):
+ if mac is None:
+ raise TrafficClientException('Trying to set traffic generator MAC address as None')
+ self.mac = mac
+
+ def set_destination(self, dst):
+ self.dst = dst
+
+ def set_vm_mac_list(self, vm_mac_list):
+ self.vm_mac_list = map(str, vm_mac_list)
+
+ def set_vlan_tag(self, vlan_tag):
+ if self.vlan_tagging and vlan_tag is None:
+ raise TrafficClientException('Trying to set VLAN tag as None')
+ self.vlan_tag = vlan_tag
+
+ def get_stream_configs(self, service_chain):
+ configs = []
+ flow_idx = 0
+ for chain_idx in xrange(self.chain_count):
+ current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx)
+ max_idx = flow_idx + current_flow_count - 1
+ ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \
+ self.ip_to_int(self.ip_list[flow_idx]) + 1
+ ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \
+ self.ip_to_int(self.dst.ip_list[flow_idx]) + 1
+
+ configs.append({
+ 'count': current_flow_count,
+ 'mac_src': self.mac,
+ 'mac_dst': self.dst.mac if service_chain == ChainType.EXT
+ else self.vm_mac_list[chain_idx],
+ 'ip_src_addr': self.ip_list[flow_idx],
+ 'ip_src_addr_max': self.ip_list[max_idx],
+ 'ip_src_count': ip_src_count,
+ 'ip_dst_addr': self.dst.ip_list[flow_idx],
+ 'ip_dst_addr_max': self.dst.ip_list[max_idx],
+ 'ip_dst_count': ip_dst_count,
+ 'ip_addrs_step': self.ip_addrs_step,
+ 'mac_discovery_gw': self.gateway_ip_list[chain_idx],
+ 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx],
+ 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx],
+ 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
+ })
+
+ flow_idx += current_flow_count
+ return configs
+
+ @classmethod
+ def expand_ip(cls, ip, step_ip, count):
+ if step_ip == 'random':
+ # Repeatable Random will used in the stream src/dst IP pairs, but we still need
+ # to expand the IP based on the number of chains and flows configured. So we use
+ # "0.0.0.1" as the step to have the exact IP flow ranges for every chain.
+ step_ip = '0.0.0.1'
+
+ step_ip_in_int = cls.ip_to_int(step_ip)
+ subnet = IPNetwork(ip)
+ ip_list = []
+ for _ in xrange(count):
+ ip_list.append(subnet.ip.format())
+ subnet = subnet.next(step_ip_in_int)
+ return ip_list
+
+ @staticmethod
+ def mac_to_int(mac):
+ return int(mac.translate(None, ":.- "), 16)
+
+ @staticmethod
+ def int_to_mac(i):
+ mac = format(i, 'x').zfill(12)
+ blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
+ return ':'.join(blocks)
+
+ @staticmethod
+ def ip_to_int(addr):
+ return struct.unpack("!I", socket.inet_aton(addr))[0]
+
+
+class RunningTrafficProfile(object):
+ """Represents traffic configuration for currently running traffic profile."""
+
+ DEFAULT_IP_STEP = '0.0.0.1'
+ DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
+
+ def __init__(self, config, generator_profile):
+ generator_config = self.__match_generator_profile(config.traffic_generator,
+ generator_profile)
+ self.generator_config = generator_config
+ self.service_chain = config.service_chain
+ self.service_chain_count = config.service_chain_count
+ self.flow_count = config.flow_count
+ self.host_name = generator_config.host_name
+ self.name = generator_config.name
+ self.tool = generator_config.tool
+ self.cores = generator_config.get('cores', 1)
+ self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
+ self.tg_gateway_ip_addrs_step = \
+ generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
+ self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
+ self.gateway_ips = generator_config.gateway_ip_addrs
+ self.ip = generator_config.ip
+ self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits
+ self.vlan_tagging = config.vlan_tagging
+ self.no_arp = config.no_arp
+ self.src_device = None
+ self.dst_device = None
+ self.vm_mac_list = None
+ self.__prep_interfaces(generator_config)
+
+ def to_json(self):
+ return dict(self.generator_config)
+
+ def set_vm_mac_list(self, vm_mac_list):
+ self.src_device.set_vm_mac_list(vm_mac_list[0])
+ self.dst_device.set_vm_mac_list(vm_mac_list[1])
+
+ @staticmethod
+ def __match_generator_profile(traffic_generator, generator_profile):
+ generator_config = AttrDict(traffic_generator)
+ generator_config.pop('default_profile')
+ generator_config.pop('generator_profile')
+ matching_profile = filter(lambda profile: profile.name == generator_profile,
+ traffic_generator.generator_profile)
+ if len(matching_profile) != 1:
+ raise Exception('Traffic generator profile not found: ' + generator_profile)
+
+ generator_config.update(matching_profile[0])
+
+ return generator_config
+
+ def __prep_interfaces(self, generator_config):
+ src_config = {
+ 'chain_count': self.service_chain_count,
+ 'flow_count': self.flow_count / 2,
+ 'ip': generator_config.ip_addrs[0],
+ 'ip_addrs_step': self.ip_addrs_step,
+ 'gateway_ip': self.gateway_ips[0],
+ 'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
+ 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
+ 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
+ 'vlan_tagging': self.vlan_tagging
+ }
+ dst_config = {
+ 'chain_count': self.service_chain_count,
+ 'flow_count': self.flow_count / 2,
+ 'ip': generator_config.ip_addrs[1],
+ 'ip_addrs_step': self.ip_addrs_step,
+ 'gateway_ip': self.gateway_ips[1],
+ 'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
+ 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
+ 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
+ 'vlan_tagging': self.vlan_tagging
+ }
+
+ self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
+ self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1]))
+ self.src_device.set_destination(self.dst_device)
+ self.dst_device.set_destination(self.src_device)
+
+ if self.service_chain == ChainType.EXT and not self.no_arp \
+ and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list):
+ raise Exception('Computed IP addresses are not unique, choose different base. '
+ 'Start IPs: {start}. End IPs: {end}'
+ .format(start=self.src_device.ip_list,
+ end=self.dst_device.ip_list))
+
+ def __are_unique(self, list1, list2):
+ return set(list1).isdisjoint(set(list2))
+
+ @property
+ def devices(self):
+ return [self.src_device, self.dst_device]
+
+ @property
+ def vlans(self):
+ return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan]
+
+ @property
+ def ports(self):
+ return [self.src_device.port, self.dst_device.port]
+
+ @property
+ def switch_ports(self):
+ return [self.src_device.switch_port, self.dst_device.switch_port]
+
+ @property
+ def pcis(self):
+ return [self.src_device.pci, self.dst_device.pci]
+
+
+class TrafficGeneratorFactory(object):
+
+ def __init__(self, config):
+ self.config = config
+
+ def get_tool(self):
+ return self.config.generator_config.tool
+
+ def get_generator_client(self):
+ tool = self.get_tool().lower()
+ if tool == 'trex':
+ from traffic_gen import trex
+ return trex.TRex(self.config)
+ elif tool == 'dummy':
+ from traffic_gen import dummy
+ return dummy.DummyTG(self.config)
+ else:
+ return None
+
+ def list_generator_profile(self):
+ return [profile.name for profile in self.config.traffic_generator.generator_profile]
+
+ def get_generator_config(self, generator_profile):
+ return RunningTrafficProfile(self.config, generator_profile)
+
+ def get_matching_profile(self, traffic_profile_name):
+ matching_profile = filter(lambda profile: profile.name == traffic_profile_name,
+ self.config.traffic_profile)
+
+ if len(matching_profile) > 1:
+ raise Exception('Multiple traffic profiles with the same name found.')
+ elif len(matching_profile) == 0:
+ raise Exception('No traffic profile found.')
+
+ return matching_profile[0]
+
+ def get_frame_sizes(self, traffic_profile):
+ matching_profile = self.get_matching_profile(traffic_profile)
+ return matching_profile.l2frame_size
+
+
+class TrafficClient(object):
+
+ PORTS = [0, 1]
+
+ def __init__(self, config, notifier=None):
+ generator_factory = TrafficGeneratorFactory(config)
+ self.gen = generator_factory.get_generator_client()
+ self.tool = generator_factory.get_tool()
+ self.config = config
+ self.notifier = notifier
+ self.interval_collector = None
+ self.iteration_collector = None
+ self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
+ if self.gen is None:
+ raise TrafficClientException('%s is not a supported traffic generator' % self.tool)
+
+ self.run_config = {
+ 'l2frame_size': None,
+ 'duration_sec': self.config.duration_sec,
+ 'bidirectional': True,
+ 'rates': None
+ }
+ self.current_total_rate = {'rate_percent': '10'}
+ if self.config.single_run:
+ self.current_total_rate = utils.parse_rate_str(self.config.rate)
+
+ def set_macs(self):
+ for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
+ device.set_mac(mac)
+
+ def start_traffic_generator(self):
+ self.gen.init()
+ self.gen.connect()
+
+ def setup(self):
+ self.gen.set_mode()
+ self.gen.config_interface()
+ self.gen.clear_stats()
+
+ def get_version(self):
+ return self.gen.get_version()
+
+ def ensure_end_to_end(self):
+ """
+ Ensure traffic generator receives packets it has transmitted.
+ This ensures end to end connectivity and also waits until VMs are ready to forward packets.
+
+ At this point all VMs are in active state, but forwarding does not have to work.
+ Small amount of traffic is sent to every chain. Then total of sent and received packets
+ is compared. If ratio between received and transmitted packets is higher than (N-1)/N,
+ N being number of chains, traffic flows through every chain and real measurements can be
+ performed.
+
+ Example:
+ PVP chain (1 VM per chain)
+ N = 10 (number of chains)
+ threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions)
+ if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning
+ all 10 VMs are in operational state.
+ """
+ LOG.info('Starting traffic generator to ensure end-to-end connectivity')
+ rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)}
+ self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
+
+ # ensures enough traffic is coming back
+ threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
+
+ for it in xrange(self.config.generic_retry_count):
+ self.gen.clear_stats()
+ self.gen.start_traffic()
+ LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1,
+ self.config.generic_retry_count))
+ time.sleep(self.config.generic_poll_sec)
+ self.gen.stop_traffic()
+ stats = self.gen.get_stats()
+
+ # compute total sent and received traffic on both ports
+ total_rx = 0
+ total_tx = 0
+ for port in self.PORTS:
+ total_rx += float(stats[port]['rx'].get('total_pkts', 0))
+ total_tx += float(stats[port]['tx'].get('total_pkts', 0))
+
+ # how much of traffic came back
+ ratio = total_rx / total_tx if total_tx else 0
+
+ if ratio > threshold:
+ self.gen.clear_stats()
+ self.gen.clear_streamblock()
+ LOG.info('End-to-end connectivity ensured')
+ return
+
+ time.sleep(self.config.generic_poll_sec)
+
+ raise TrafficClientException('End-to-end connectivity cannot be ensured')
+
+ def ensure_arp_successful(self):
+ if not self.gen.resolve_arp():
+ raise TrafficClientException('ARP cannot be resolved')
+
+ def set_traffic(self, frame_size, bidirectional):
+ self.run_config['bidirectional'] = bidirectional
+ self.run_config['l2frame_size'] = frame_size
+ self.run_config['rates'] = [self.get_per_direction_rate()]
+ if bidirectional:
+ self.run_config['rates'].append(self.get_per_direction_rate())
+ else:
+ unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
+ if unidir_reverse_pps > 0:
+ self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
+
+ self.gen.clear_streamblock()
+ self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
+
+ def modify_load(self, load):
+ self.current_total_rate = {'rate_percent': str(load)}
+ rate_per_direction = self.get_per_direction_rate()
+
+ self.gen.modify_rate(rate_per_direction, False)
+ self.run_config['rates'][0] = rate_per_direction
+ if self.run_config['bidirectional']:
+ self.gen.modify_rate(rate_per_direction, True)
+ self.run_config['rates'][1] = rate_per_direction
+
+ def get_ndr_and_pdr(self):
+ dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
+ targets = {}
+ if self.config.ndr_run:
+ LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
+ targets['ndr'] = self.config.measurement.NDR
+ if self.config.pdr_run:
+ LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
+ targets['pdr'] = self.config.measurement.PDR
+
+ self.run_config['start_time'] = time.time()
+ self.interval_collector = IntervalCollector(self.run_config['start_time'])
+ self.interval_collector.attach_notifier(self.notifier)
+ self.iteration_collector = IterationCollector(self.run_config['start_time'])
+ results = {}
+ self.__range_search(0.0, 200.0, targets, results)
+
+ results['iteration_stats'] = {
+ 'ndr_pdr': self.iteration_collector.get()
+ }
+
+ if self.config.ndr_run:
+ LOG.info('NDR load: %s', results['ndr']['rate_percent'])
+ results['ndr']['time_taken_sec'] = \
+ results['ndr']['timestamp_sec'] - self.run_config['start_time']
+ if self.config.pdr_run:
+ LOG.info('PDR load: %s', results['pdr']['rate_percent'])
+ results['pdr']['time_taken_sec'] = \
+ results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
+ else:
+ LOG.info('PDR load: %s', results['pdr']['rate_percent'])
+ results['pdr']['time_taken_sec'] = \
+ results['pdr']['timestamp_sec'] - self.run_config['start_time']
+ return results
+
+ def __get_dropped_rate(self, result):
+ dropped_pkts = result['rx']['dropped_pkts']
+ total_pkts = result['tx']['total_pkts']
+ if not total_pkts:
+ return float('inf')
+ else:
+ return float(dropped_pkts) / total_pkts * 100
+
+ def get_stats(self):
+ stats = self.gen.get_stats()
+ retDict = {'total_tx_rate': stats['total_tx_rate']}
+ for port in self.PORTS:
+ retDict[port] = {'tx': {}, 'rx': {}}
+
+ tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
+ rx_keys = tx_keys + ['dropped_pkts']
+
+ for port in self.PORTS:
+ for key in tx_keys:
+ retDict[port]['tx'][key] = int(stats[port]['tx'][key])
+ for key in rx_keys:
+ try:
+ retDict[port]['rx'][key] = int(stats[port]['rx'][key])
+ except ValueError:
+ retDict[port]['rx'][key] = 0
+ retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec'])
+ retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec'])
+ retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec'])
+ retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
+
+ ports = sorted(retDict.keys())
+ if self.run_config['bidirectional']:
+ retDict['overall'] = {'tx': {}, 'rx': {}}
+ for key in tx_keys:
+ retDict['overall']['tx'][key] = \
+ retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
+ for key in rx_keys:
+ retDict['overall']['rx'][key] = \
+ retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
+ total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
+ retDict[ports[1]]['rx']['total_pkts']]
+ avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
+ retDict[ports[1]]['rx']['avg_delay_usec']]
+ max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
+ retDict[ports[1]]['rx']['max_delay_usec']]
+ min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
+ retDict[ports[1]]['rx']['min_delay_usec']]
+ retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
+ retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
+ retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
+ for key in ['pkt_bit_rate', 'pkt_rate']:
+ for dirc in ['tx', 'rx']:
+ retDict['overall'][dirc][key] /= 2.0
+ else:
+ retDict['overall'] = retDict[ports[0]]
+ retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
+ return retDict
+
+ def __convert_rates(self, rate):
+ return utils.convert_rates(self.run_config['l2frame_size'],
+ rate,
+ self.config.generator_config.intf_speed)
+
+ def __ndr_pdr_found(self, tag, load):
+ rates = self.__convert_rates({'rate_percent': load})
+ self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
+ last_stats = self.iteration_collector.peek()
+ self.interval_collector.add_ndr_pdr(tag, last_stats)
+
+ def __format_output_stats(self, stats):
+ for key in (self.PORTS + ['overall']):
+ interface = stats[key]
+ stats[key] = {
+ 'tx_pkts': interface['tx']['total_pkts'],
+ 'rx_pkts': interface['rx']['total_pkts'],
+ 'drop_percentage': interface['drop_rate_percent'],
+ 'drop_pct': interface['rx']['dropped_pkts'],
+ 'avg_delay_usec': interface['rx']['avg_delay_usec'],
+ 'max_delay_usec': interface['rx']['max_delay_usec'],
+ 'min_delay_usec': interface['rx']['min_delay_usec'],
+ }
+
+ return stats
+
+ def __targets_found(self, rate, targets, results):
+ for tag, target in targets.iteritems():
+ LOG.info('Found {} ({}) load: {}'.format(tag, target, rate))
+ self.__ndr_pdr_found(tag, rate)
+ results[tag]['timestamp_sec'] = time.time()
+
+ def __range_search(self, left, right, targets, results):
+ '''Perform a binary search for a list of targets inside a [left..right] range or rate
+
+ left the left side of the range to search as a % the line rate (100 = 100% line rate)
+ indicating the rate to send on each interface
+ right the right side of the range to search as a % of line rate
+ indicating the rate to send on each interface
+ targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr')
+ results a dict to store results
+ '''
+ if len(targets) == 0:
+ return
+ LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets))
+
+ # Terminate search when gap is less than load epsilon
+ if right - left < self.config.measurement.load_epsilon:
+ self.__targets_found(left, targets, results)
+ return
+
+ # Obtain the average drop rate in for middle load
+ middle = (left + right) / 2.0
+ stats, rates = self.__run_search_iteration(middle)
+
+ # Split target dicts based on the avg drop rate
+ left_targets = {}
+ right_targets = {}
+ for tag, target in targets.iteritems():
+ if stats['overall']['drop_rate_percent'] <= target:
+ # record the best possible rate found for this target
+ results[tag] = rates
+ results[tag].update({
+ 'load_percent_per_direction': middle,
+ 'stats': self.__format_output_stats(dict(stats)),
+ 'timestamp_sec': None
+ })
+ right_targets[tag] = target
+ else:
+ left_targets[tag] = target
+
+ # search lower half
+ self.__range_search(left, middle, left_targets, results)
+
+ # search upper half only if the upper rate does not exceed
+ # 100%, this only happens when the first search at 100%
+ # yields a DR that is < target DR
+ if middle >= 100:
+ self.__targets_found(100, right_targets, results)
+ else:
+ self.__range_search(middle, right, right_targets, results)
+
+ def __run_search_iteration(self, rate):
+ # set load
+ self.modify_load(rate)
+
+ # poll interval stats and collect them
+ for stats in self.run_traffic():
+ self.interval_collector.add(stats)
+ time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
+ if time_elapsed_ratio >= 1:
+ self.cancel_traffic()
+ self.interval_collector.reset()
+
+ # get stats from the run
+ stats = self.runner.client.get_stats()
+ current_traffic_config = self.get_traffic_config()
+ warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
+ stats['total_tx_rate'])
+ if warning is not None:
+ stats['warning'] = warning
+
+ # save reliable stats from whole iteration
+ self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
+ LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent']))
+
+ return stats, current_traffic_config['direction-total']
+
+ @staticmethod
+ def log_stats(stats):
+ report = {
+ 'datetime': str(datetime.now()),
+ 'tx_packets': stats['overall']['tx']['total_pkts'],
+ 'rx_packets': stats['overall']['rx']['total_pkts'],
+ 'drop_packets': stats['overall']['rx']['dropped_pkts'],
+ 'drop_rate_percent': stats['overall']['drop_rate_percent']
+ }
+ LOG.info('TX: %(tx_packets)d; '
+ 'RX: %(rx_packets)d; '
+ 'Dropped: %(drop_packets)d; '
+ 'Drop rate: %(drop_rate_percent).4f%%',
+ report)
+
+ def run_traffic(self):
+ stats = self.runner.run()
+ while self.runner.is_running:
+ self.log_stats(stats)
+ yield stats
+ stats = self.runner.poll_stats()
+ if stats is None:
+ return
+ self.log_stats(stats)
+ LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent']))
+ yield stats
+
+ def cancel_traffic(self):
+ self.runner.stop()
+
+ def get_interface(self, port_index):
+ port = self.gen.port_handle[port_index]
+ tx, rx = 0, 0
+ if not self.config.no_traffic:
+ stats = self.get_stats()
+ if port in stats:
+ tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
+ return Interface('traffic-generator', self.tool.lower(), tx, rx)
+
+ def get_traffic_config(self):
+ config = {}
+ load_total = 0.0
+ bps_total = 0.0
+ pps_total = 0.0
+ for idx, rate in enumerate(self.run_config['rates']):
+ key = 'direction-forward' if idx == 0 else 'direction-reverse'
+ config[key] = {
+ 'l2frame_size': self.run_config['l2frame_size'],
+ 'duration_sec': self.run_config['duration_sec']
+ }
+ config[key].update(rate)
+ config[key].update(self.__convert_rates(rate))
+ load_total += float(config[key]['rate_percent'])
+ bps_total += float(config[key]['rate_bps'])
+ pps_total += float(config[key]['rate_pps'])
+ config['direction-total'] = dict(config['direction-forward'])
+ config['direction-total'].update({
+ 'rate_percent': load_total,
+ 'rate_pps': pps_total,
+ 'rate_bps': bps_total
+ })
+
+ return config
+
+ def get_run_config(self, results):
+ """Returns configuration which was used for the last run."""
+ r = {}
+ for idx, key in enumerate(["direction-forward", "direction-reverse"]):
+ tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
+ rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec
+ r[key] = {
+ "orig": self.__convert_rates(self.run_config['rates'][idx]),
+ "tx": self.__convert_rates({'rate_pps': tx_rate}),
+ "rx": self.__convert_rates({'rate_pps': rx_rate})
+ }
+
+ total = {}
+ for direction in ['orig', 'tx', 'rx']:
+ total[direction] = {}
+ for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
+ total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values()))
+
+ r['direction-total'] = total
+ return r
+
+ @staticmethod
+ def compare_tx_rates(required, actual):
+ threshold = 0.9
+ are_different = False
+ try:
+ if float(actual) / required < threshold:
+ are_different = True
+ except ZeroDivisionError:
+ are_different = True
+
+ if are_different:
+ msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
+ "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
+ "to achieve the requested TX rate.".format(r=required, a=actual)
+ LOG.info(msg)
+ return msg
+
+ return None
+
+ def get_per_direction_rate(self):
+ divisor = 2 if self.run_config['bidirectional'] else 1
+ if 'rate_percent' in self.current_total_rate:
+ # don't split rate if it's percentage
+ divisor = 1
+
+ return utils.divide_rate(self.current_total_rate, divisor)
+
+ def close(self):
+ try:
+ self.gen.stop_traffic()
+ except Exception:
+ pass
+ self.gen.clear_stats()
+ self.gen.cleanup()