diff options
Diffstat (limited to 'nfvbench')
28 files changed, 4152 insertions, 1448 deletions
diff --git a/nfvbench/cfg.default.yaml b/nfvbench/cfg.default.yaml index fa3d807..c76e738 100755..100644 --- a/nfvbench/cfg.default.yaml +++ b/nfvbench/cfg.default.yaml @@ -25,8 +25,30 @@ # The only case where this field can be empty is when measuring a system that does not run # OpenStack or when OpenStack APIs are not accessible or OpenStack APis use is not # desirable. In that case the EXT service chain must be used. +# +# If openrc is not admin some parameters are mandatory and must be filled with valid values in config file such as : +# - availability_zone +# - hypervisor_hostname +# - vlans +# WARNING: Not used if clouds_detail is sets openrc_file: +# The OpenStack clouds configuration from clouds.yaml file to use. +# clouds.yaml file must be in one of the following paths: +# - ~/.config/openstack +# - /etc/openstack +# Note: If running in a container, this path must be valid in the container. +# The only case where this field can be empty is when measuring a system that does not run +# OpenStack or when OpenStack APIs are not accessible or OpenStack APis use is not +# desirable. In that case the EXT service chain must be used. +# +# If user is not admin some parameters are mandatory and must be filled with valid values in config file such as : +# - availability_zone +# - hypervisor_hostname +# - vlans +# If a value is sets, this parameter disable the use of openrc file +clouds_detail: + # Forwarder to use in nfvbenchvm image. Available options: ['vpp', 'testpmd'] vm_forwarder: testpmd @@ -46,16 +68,16 @@ vm_image_file: # Otherwise, a new flavor will be created with attributes listed below. flavor_type: 'nfvbench.medium' -# Custom flavor attributes +# Custom flavor attributes for the test VM flavor: - # Number of vCPUs for the flavor + # Number of vCPUs for the flavor, must be at least 2! vcpus: 2 # Memory for the flavor in MB ram: 4096 # Size of local disk in GB disk: 0 # metadata are supported and can be added if needed, optional - # note that if your openstack does not have NUMA optimization + # note that if your OpenStack does not have NUMA optimization # (cpu pinning and huge pages) # you must comment out extra_specs completely otherwise # loopback VM creation will fail @@ -63,14 +85,38 @@ flavor: "hw:cpu_policy": dedicated "hw:mem_page_size": large +# Enable multiqueue for all test VM interfaces (PVP and PVVP only). +# When enabled, the test VM image will get added the property to enable +# multiqueue (hw_vif_multiqueue_enabled='true'). +# The number of queues per interace will be set to the number of vCPUs configured for +# the VM. +# By default there is only 1 queue per interface +# The max allowed queue per interface is 8. +# The valid range for this parameter is [1..min(8, vcpu_count)] +# When multiqueue is used the recommended setting is to set it to same value as the +# number of vCPU used - up to a max of 8 queues. +# Setting to a lower value than vCPU should also work. For example if using 4 vCPU and +# vif_multiqueue_size is set to 2, OpenStack will create 4 queues per interface but the +# test VM will only use the first 2 queues. +vif_multiqueue_size: 1 + +# Increase number of buffers allocated for VPP VM forwarder. May be needed in scenarios with large +# number of interfaces and worker threads, or a lot of physical interfaces with multiple RSS queues. +# Value is per CPU socket. Default is 16384. +num_mbufs: 16384 + # Name of the availability zone to use for the test VMs # Must be one of the zones listed by 'nova availability-zone-list' # availability_zone: 'nova' +# If openrc is not admin set a valid value availability_zone: # To force placement on a given hypervisor, set the name here # (if multiple names are provided, the first will be used) -# Leave empty to let openstack pick the hypervisor +# Leave empty to let OpenStack pick the hypervisor compute_nodes: +# If openrc is not admin set a valid value for hypervisor hostname +# Example of value: hypervisor_hostname: "server1" +hypervisor_hostname: # Type of service chain to run, possible options are PVP, PVVP and EXT # PVP - port to VM to port @@ -99,10 +145,16 @@ flow_count: 10000 sriov: false # Perform port to port loopback (direct or through switch) -# Should be used with EXT service chain and no ARP (no_arp: true) -# When enabled, the vlans property must contain the same VLAN id for all chains. -# Can be overriden by --l2-loopback +# e.g. for unitary testing of the switch or the bench itself. +# When selected, this mode forces EXT service chain and no ARP mode +# Destination MAC for each port is set to the other (peer) port MAC. +# VLAN tagging is defined by 'vlans' & 'vlan_tagging' properties. +# Can be overriden by --l2-loopback (including vlan tagging spec). l2_loopback: false +# No assumption is made about the loop implementation. +# Multiple L2 vlan tagged service chains are allowed, +# the vlan ID lists' size must be at least service_chain_count. +# If not vlan tagging, the service chain count is forced to 1. # Resources created by NFVbench will not be removed # Can be overriden by --no-cleanup @@ -134,10 +186,17 @@ traffic_generator: # `ip_addrs_step`: step for generating IP sequence. Use "random" for random patterns, default is 0.0.0.1. ip_addrs: ['10.0.0.0/8', '20.0.0.0/8'] ip_addrs_step: 0.0.0.1 + + #'ip_src_static': an attribute to precise the state of source IP during the generation of traffic, It indicates whether + # the IP source variate or remain constant. Use True for constant IP and False for varying IPs. + # default value is True + ip_src_static: True + # `tg_gateway_ip_addrs` base IP for traffic generator ports in the left and right networks to the VNFs # chain count consecutive IP addresses spaced by tg_gateway_ip_addrs_step will be used # `tg_gateway_ip_addrs__step`: step for generating traffic generator gateway sequences. default is 0.0.0.1 - tg_gateway_ip_addrs: ['1.1.0.100', '2.2.0.100'] + tg_gateway_ip_addrs: ['192.168.1.100', '192.168.2.100'] + tg_gateway_ip_cidrs: ['192.168.1.0/24','192.168.2.0/24'] tg_gateway_ip_addrs_step: 0.0.0.1 # `gateway_ip_addrs`: base IPs of VNF router gateways (left and right), quantity used depends on chain count # must correspond to the public IP on the left and right networks @@ -145,23 +204,43 @@ traffic_generator: # must be the same subnet but not same IP as tg_gateway_ip_addrs. # chain count consecutive IP addresses spaced by gateway_ip_addrs_step will be used # `gateway_ip_addrs_step`: step for generating router gateway sequences. default is 0.0.0.1 - gateway_ip_addrs: ['1.1.0.2', '2.2.0.2'] + gateway_ip_addrs: ['192.168.1.1', '192.168.2.1'] gateway_ip_addrs_step: 0.0.0.1 + + # UDP DEFINED VARIABLES + # TRex pick default UDP port (53) but the range of UDP source and destination ports are also + # defined from configuration file by using the following attributes: + # # `udp_src_port`: the source port for sending UDP traffic, default is picked by TRex (53) # `udp_dst_port`: the destination port for sending UDP traffic, default is picked by TRex (53) + # `udp_src_port` and `udp_dst_port` can be defined by a single port or a range. Example: + # udp_src_port: 80 + # udp_dst_port: ['1024','65000'] + # `udp_port_step`: the step between two generated ports, default is equal to '1' + # + # NOTICE: + # Following TRex functionalities, incrementation and decrementation of source port and destination + # port values occur simultaneously. + # So, in order to reach the highest possible number of packets, it's recommended that the range of source ports + # minus the range of destination ports should be different of 1 + # i.e: |range[source_port] - range[destination_port]| = 1 udp_src_port: udp_dst_port: + udp_port_step: '1' # VxLAN only: optionally specify what VLAN tag to use for the VxLAN overlay # This is used if the vxlan tunnels are running on a specific VLAN. # Leave empty if there is no VLAN tagging required, or specify the VLAN id to use # for all VxLAN tunneled traffic vtep_vlan: - # VxLAN only: local/source vteps IP addresses for port 0 and 1 ['10.1.1.230', '10.1.1.231'] + # VxLAN and MPLS only: local/source vteps IP addresses for port 0 and 1 ['10.1.1.230', '10.1.1.231'] src_vteps: # VxLAN only: remote IP address of the remote VTEPs that terminate all tunnels originating from local VTEPs dst_vtep: - + # The encapsulated L3/MPLS packet needs to traverse L3 or MPLS fabric to reach to its final dst_vtep. + # This parameter is required to resolve first next-hop MAC address if it next-hop is not its final dst_vtep. + # This parameter is mandatory for MPLS only + vtep_gateway_ips: # L2 ADDRESSING OF UDP PACKETS # Lists of dest MAC addresses to use on each traffic generator port (one dest MAC per chain) # Leave empty for PVP, PVVP, EXT with ARP @@ -192,7 +271,7 @@ traffic_generator: # # Generator profiles are listed in the following format: # `name`: Traffic generator profile name (use a unique name, no space or special character) - # DFo not change this field + # Do not change this field # `tool`: Traffic generator tool to be used (currently supported is `TRex`). # Do not change this field # `ip`: IP address of the traffic generator. @@ -205,6 +284,13 @@ traffic_generator: # software mode, therefore the performance of TRex will be significantly # lower. ONLY applies to trex-local. # Recommended to leave the default value (false) + # `limit_memory`: Specify the memory reserved for running the TRex traffic generator (in MB). Limit the amount + # of packet memory used. (Passed to dpdk as -m arg) + # ONLY applies to trex-local. + # `zmq_pub_port`: Specify the ZMQ pub port number for the TRex traffic generator instance (default value is 4500). + # ONLY applies to trex-local. + # `zmq_rpc_port`: Specify the ZMQ rpc port for the TRex traffic generator instance (default value is 4501). + # ONLY applies to trex-local. # `interfaces`: Configuration of traffic generator interfaces. # `interfaces.port`: The port of the traffic generator to be used (leave as 0 and 1 resp.) # `interfaces.switch_port`: Leave empty (deprecated) @@ -218,12 +304,33 @@ traffic_generator: # Do not use unless you want to override the speed discovered by the # traffic generator. Expected format: 10Gbps # + # `platform`: Optional. Used to tune the performance and allocate the cores to the right NUMA. + # See https://trex-tgn.cisco.com/trex/doc/trex_manual.html (6.2.3. Platform section configuration) + # for more details + # `platform.master_thread_id`: Hardware thread_id for control thread. (Valid value is mandatory if platform property is set) + # `platform.latency_thread_id`: Hardware thread_id for RX thread. (Valid value is mandatory if platform property is set) + # `platform.dual_if`: Section defines info for interface pairs (according to the order in “interfaces” list). (Valid value is mandatory if platform property is set) + # Each section, starting with “- socket” defines info for different interface pair. (Valid value is mandatory if platform property is set) + # `platform.dual_if.socket`: The NUMA node from which memory will be allocated for use by the interface pair. (Valid value is mandatory if platform property is set) + # `platform.dual_if.threads`: Hardware threads to be used for sending packets for the interface pair. (Valid value is mandatory if platform property is set) + # Threads are pinned to cores, so specifying threads actually determines the hardware cores. + # Example of values: + # platform: + # master_thread_id: 0 + # latency_thread_id: 2 + # dual_if: + # - socket: 0 + # threads: [1] + # generator_profile: - name: trex-local tool: TRex ip: 127.0.0.1 cores: 4 software_mode: false + limit_memory: 1024 + zmq_pub_port: 4500 + zmq_rpc_port: 4501 interfaces: - port: 0 pci: @@ -232,14 +339,78 @@ traffic_generator: pci: switch_port: intf_speed: + platform: + master_thread_id: + latency_thread_id: + dual_if: + - socket: + threads: + +# Use 'true' to force restart of local TRex server before next run +# TRex local server will be restarted even if restart property is false in case of generator config changes between runs +restart: false # Simpler override for trex core count and mbuf multilier factor # if empty defaults to the one specified in generator_profile.cores cores: +# Simpler override for the interface speed +# if empty, the current generator_profile.intf_speed parameter applies +# if value = 'auto' the auto-detection is forced +intf_speed: + +# 'cores' and 'intf_speed' parameters can be overriden themselves +# by respective options --cores and --intf-speed on the command-line. + +# By default, the real ports line rate is detected and used as +# the reference for computing the theoretical maximum traffic load (100%). +# Note that specifying 'intf_speed' allows to artificially lower this +# reference while not modifying the actual transmission bit rate. + +# The values of the following parameters are ignored on entry +# they are defined here in order to appear in the reported configuration. +# They will reflect the value active at run-time (after overriding or detection) +cores_used: +intf_speed_used: +intf_speed_detected: + +# A cache size value is passed to the TRex field engine (FE) at packet generation. +# Can be overridden by --cache-size +# More information for TRex performance: +# https://trex-tgn.cisco.com/trex/doc/trex_stateless.html#_tutorial_field_engine_significantly_improve_performance +# If cache_size = 0 (or empty): no cache will be used by TRex (default) +# If cache_size < 0: cache_size will be set to flow count value +cache_size: 0 +# The cache size is actually limited by the number of 64B mbufs configured in the trex platform configuration (see Trex manual 6.2.2. Memory section configuration) +# Note that the resulting value is finally capped to 10000, whatever the requested size is (by design limitation). + +# Specification of the TRex behaviour dealing with the i40e network card driver issue: Trex-528 +# see https://trex-tgn.cisco.com/youtrack/issue/trex-528 +# This issue states that if other ports, in the same card, +# are in kernel mode, they could impair traffic counting. +# Can be overridden by --i40e-mixed +# Values can be: +# ignore - don't consider the case (default) +# exit - should the case arise, exit (TRex default behaviour) +# unbind - unbind kernel bound ports (the former NFVbench behaviour) +# The 'ignore' option might be OK as soon as the issue has been fixed in the driver. +# The 'unbind' option should not be used! who knows the current use of other ports? +i40e_mixed: + +# Trex will use 1 x 64B mbuf per pre-built cached packet, assuming 1 pre-built cached packet per flow, it means for very large number of flows, the number of configured mbuf_64 will need to be set accordingly. +mbuf_64: + # mbuffer ratio to use for TRex (see TRex documentation for more details) mbuf_factor: 0.2 +# A switch to disable hdrh +# hdrh is enabled by default and requires TRex v2.58 or higher +disable_hdrh: false + +# List of latency percentiles values returned using hdrh +# elements should be int or float between 0.0 and 100.0 +lat_percentiles: [25, 75, 99] + # ----------------------------------------------------------------------------- # These variables are not likely to be changed @@ -255,7 +426,7 @@ generic_poll_sec: 2 # name of the loop VM loop_vm_name: 'nfvbench-loop-vm' -# Default names, subnets and CIDRs for PVP/PVVP networks (openstack only) +# Default names, subnets and CIDRs for PVP/PVVP networks (OpenStack only) # # If a network with given name already exists it will be reused. # - PVP only uses left and right @@ -287,7 +458,7 @@ loop_vm_name: 'nfvbench-loop-vm' # segmentation_id: 2001 # physical_network: phys_sriov1 # -# For multi-chaining and non shared network mode (VLAN, SRIOV, VxLAN): +# For multi-chaining and non shared network mode (VLAN, SRIOV, VxLAN, MPLS): # - the segmentation_id field if provided must be a list of values (as many as chains) # - segmentation_id auto-indexing: # the segmentation_id field can also be a single value that represents the base value from which @@ -297,23 +468,42 @@ loop_vm_name: 'nfvbench-loop-vm' # - the physical_network can be a single name (all VFs to be allocated on same physnet) # of a list of physnet names to use different PFs # -# Example of 2-chain configuration: -# internal_networks: -# left: -# segmentation_id: [2000, 2001] -# physical_network: phys_sriov0 -# right: -# segmentation_id: [2010, 2011] -# physical_network: phys_sriov1 +# Example of 2-chain VLAN configuration: +# internal_networks: +# left: +# segmentation_id: [2000, 2001] +# physical_network: phys_sriov0 +# right: +# segmentation_id: [2010, 2011] +# physical_network: phys_sriov1 +# Equivalent to (using auto-indexing): +# internal_networks: +# left: +# segmentation_id: 2000 +# physical_network: phys_sriov0 +# right: +# segmentation_id: 2010 +# physical_network: phys_sriov1 # -# Equivalent to (using auto-indexing): -# internal_networks: -# left: -# segmentation_id: 2000 -# physical_network: phys_sriov0 -# right: -# segmentation_id: 2010 -# physical_network: phys_sriov1 +# - mpls_transport_labels is used only when MPLS encapsulation is enabled (mpls: true) +# this parameter doesn't support auto-indexing because this is not a typical scenario +# expected the list of values in a range 256-1048575, one value per chain is expected +# +# In the bellow configuration example 'segmentation_id; contains the inner MPLS label for each chain +# and 'mpls_transport_labels' contains the outer transport MPLS label for each chain +# Example of 2-chain MPLS configuration: +# internal_networks: +# left: +# network_type: mpls +# segmentation_id: [2000, 2001] +# mpls_transport_labels: [10000, 10000] +# physical_network: phys_sriov0 +# right: +# network_type: mpls +# segmentation_id: [2010, 2011] +# mpls_transport_labels: [11000, 11000] +# physical_network: phys_sriov1 + internal_networks: left: @@ -323,6 +513,7 @@ internal_networks: network_type: 'vlan' segmentation_id: physical_network: + mpls_transport_labels: right: name: 'nfvbench-rnet' subnet: 'nfvbench-rsubnet' @@ -330,6 +521,7 @@ internal_networks: network_type: 'vlan' segmentation_id: physical_network: + mpls_transport_labels: middle: name: 'nfvbench-mnet' subnet: 'nfvbench-msubnet' @@ -337,51 +529,201 @@ internal_networks: network_type: 'vlan' segmentation_id: physical_network: + mpls_transport_labels: + +# IDLE INTERFACES: PVP, PVVP and non shared net only. +# By default each test VM will have 2 virtual interfaces for looping traffic. +# If service_chain_shared_net is false, additional virtual interfaces can be +# added at VM creation time, these interfaces will not carry any traffic and +# can be used to test the impact of idle interfaces in the overall performance. +# All these idle interfaces will use normal ports (not direct). +# Number of idle interfaces per VM (none by default) +idle_interfaces_per_vm: 0 + +# A new network is created for each idle interface. +# If service_chain_shared_net is true, the options below will be ignored +# and no idle interfaces will be added. +idle_networks: + # Prefix for all idle networks, the final name will append the chain ID and idle index + # e.g. "nfvbench-idle-net.0.4" chain 0 idle index 4 + name: 'nfvbench-idle-net' + # Subnet name to use for all idle subnetworks + subnet: 'nfvbench-idle-subnet' + # CIDR to use for all idle networks (value should not matter) + cidr: '192.169.1.0/24' + # Type of network associated to the idle virtual interfaces (vlan or vxlan) + network_type: 'vlan' + # segmentation ID to use for the network attached to the idle virtual interfaces + # vlan: leave empty to let neutron pick the segmentation ID + # vxlan: must specify the starting VNI value to be used (cannot be empty) + # Note that NFVbench will use as many consecutive segmentation IDs as needed. + # For example, for 4 PVP chains and 8 idle + # interfaces per VM, NFVbench will use 32 consecutive values of segmentation ID + # starting from the value provided. + segmentation_id: + # physnet name to use for all idle interfaces + physical_network: + +# MANAGEMENT INTERFACE +# By default each test VM will have 2 virtual interfaces for looping traffic. +# If use_management_port is true, additional virtual interface can be +# added at VM creation time, this interface will be used for VM management over SSH. +# This will be helpful for debug (forwarder config, capture traffic...) +# or to emulate VNF with management interface +use_management_port: false + +# If a network with given name already exists it will be reused. +# Otherwise a new network is created for management interface. +# If use_management_port is false, the options below will be ignored +# and no management interface will be added. +management_network: + name: 'nfvbench-management-net' + # Subnet name to use for management subnetwork + subnet: 'nfvbench-management-subnet' + # CIDR to use for management network + cidr: '192.168.0.0/24' + gateway: '192.168.0.254' + # Type of network associated to the management virtual interface (vlan or vxlan) + network_type: 'vlan' + # segmentation ID to use for the network attached to the management virtual interface + # vlan: leave empty to let neutron pick the segmentation ID + # vxlan: must specify the starting VNI value to be used (cannot be empty) + segmentation_id: + # physnet name to use for all idle interfaces + physical_network: + +# Floating IP for management interface +# If use_floating_ip is true, floating IP will be set on management interface port +# One floating IP by loop VM will be used (floating ips are often limited, +# use them on limited context mainly for debug). If there are 10 PVP chains, this will require 10 +# floating IPs. If 10 PVVP chains, it will require 20 floating IPs +use_floating_ip: false + +# If a network with given name already exists it will be reused. +# Set same name as management_network if you want to use a floating IP from this network +# Otherwise set name, subnet and CIDR information from your floating IP pool network +# Floating network used to set floating IP on management port. +# Only 1 floating network will be used for all VMs and chains (shared network). +# If use_floating_ip is false, the options below will be ignored +# and no floating IP will be added. +floating_network: + name: 'nfvbench-floating-net' + # Subnet name to use for floating subnetwork + subnet: 'nfvbench-floating-subnet' + # CIDR to use for floating network + cidr: '192.168.0.0/24' + # Type of network associated to the management virtual interface (vlan or vxlan) + network_type: 'vlan' + # segmentation ID to use for the network attached to the management virtual interface + # vlan: leave empty to let neutron pick the segmentation ID + # vxlan: must specify the starting VNI value to be used (cannot be empty) + segmentation_id: + # physnet name to use for all idle interfaces + physical_network: # In the scenario of PVVP + SRIOV, there is choice of how the traffic will be # handled in the middle network. The default (false) will use vswitch, while # SRIOV can be used by toggling below setting. use_sriov_middle_net: false -# EXT chain only. Prefix names of edge networks which will be used to send traffic via traffic generator. +# EXT chain only. Prefix names of edge networks or list of edge network names +# used to send traffic via traffic generator. # # If service_chain_shared_net is true, the left and right networks must pre-exist and match exactly by name. # # If service_chain_shared_net is false, each chain must have its own pre-existing left and right networks. -# An index will be appended to each network name to form the final name: +# left and right can take either a string prefix or a list of arbitrary network names +# If a string prefix is passed, an index will be appended to each network name to form the final name. +# Example: +# external_networks: +# left: 'ext-lnet' +# right: 'ext-rnet' # ext-lnet0 ext-rnet0 for chain #0 # ext-lnet1 ext-rnet1 for chain #1 # etc... +# If a list of strings is passed, each string in the list must be the name of the network used for the +# chain indexed by the entry position in the list. +# The list must have at least as many entries as there are chains +# Example: +# external_networks: +# left: ['ext-lnet', 'ext-lnet2'] +# right: ['ext-rnet', 'ext-rnet2'] +# external_networks: - left: 'ext-lnet' - right: 'ext-rnet' + left: + right: + +# PVP with L3 router in the packet path only. +# Only use when l3_router option is True (see l3_router) +# Prefix names of edge networks which will be used to send traffic via traffic generator. +# If a network with given name already exists it will be reused. +# Otherwise a new edge network will be created with that name, subnet and CIDR. +# +# gateway can be set in case of L3 traffic with edge networks - refer to edge_networks +# +# segmentation_id can be set to enforce a specific VLAN id - by default (empty) the VLAN id +# will be assigned by Neutron. +# Must be unique for each network +# physical_network can be set to pick a specific phsyical network - by default (empty) the +# default physical network will be picked +# +edge_networks: + left: + name: 'nfvbench-net2' + router_name: 'router_left' + subnet: 'nfvbench-subnet2' + cidr: '192.168.3.0/24' + gateway: + network_type: + segmentation_id: + physical_network: + right: + name: 'nfvbench-net3' + router_name: 'router_right' + subnet: 'nfvbench-subnet3' + cidr: '192.168.4.0/24' + gateway: + network_type: + segmentation_id: + physical_network: # Use 'true' to enable VXLAN encapsulation support and sent by the traffic generator # When this option enabled internal networks 'network type' parameter value should be 'vxlan' +# VxLAN and MPLS encapsulations are mutual exclusive if 'vxlan' is true then 'mpls' should be false +# and vise versa vxlan: false - +# Use 'true' to enable MPLS encapsulation support and sent by the traffic generator +# When this option enabled internal networks 'network type' parameter value should be 'mpls' +# MPLS and VxLAN encapsulations are mutual exclusive if 'mpls' is 'true' then 'vxlan' should be set to 'false' +# and vise versa. no_flow_stats, no_latency_stats, no_latency_streams should be set to 'true' because these +# features are not supported at the moment. In future when these features will be supported they will require +# special NIC hardware. Only 2 label stack supported at the moment where one label is transport and another +# is VPN for more details please refer to 'mpls_transport_labels' and 'segmentation_id' in networks configuration +mpls: false # Use 'true' to enable VLAN tagging of packets generated and sent by the traffic generator # Leave empty or set to false if you do not want the traffic generator to insert the VLAN tag (this is # needed for example if VLAN tagging is enabled on switch (access mode) or if you want to hook # directly to a NIC). # By default is set to true (which is the nominal use case with TOR and trunk mode to Trex ports) -# If VxLAN is enabled, this option should be set to false (vlan tagging for encapsulated packets +# If VxLAN or MPLS are enabled, this option should be set to false (vlan tagging for encapsulated packets # is not supported). Use the vtep_vlan option to enable vlan tagging for the VxLAN overlay network. vlan_tagging: true -# Used only in the case of EXT chain and no openstack to specify the VLAN IDs to use. -# This property is ignored when OpenStakc is used or in the case of l2-loopback. +# Used only in the case of EXT chain and no OpenStack or not admin access to specify the VLAN IDs to use. +# This property is ignored when OpenStack is used or when 'vlan_tagging' is disabled. # If OpenStack is used leave the list empty, VLAN IDs are retrieved from OpenStack networks using Neutron API. # If networks are shared across all chains (service_chain_shared_net=true), the list should have exactly 2 values # If networks are not shared across chains (service_chain_shared_net=false), the list should have # 2 list of vlan IDs -# In the special case of l2-loopback the list should have the same VLAN id for all chains # Examples: # [1998, 1999] left network uses vlan 1998 right network uses vlan 1999 # [[1,2],[3,4]] chain 0 left vlan 1, right vlan 2 - chain 1 left vlan 3 right vlan 4 -# [1010, 1010] same VLAN id with l2-loopback enabled -# +# [1010, 1010] same vlan ID on both sides, for a typical l2-loopback test (*) +# The vlan lists may be oversized, compared to the actual service chain count +# (lowest indexes are used) but an exception is raised if they are too short. vlans: [] +# (*) actually there is no restriction, left/right IDs may differ +# for some exotic purpose - see also the l2_loopback parameter. # ARP is used to discover the MAC address of VNFs that run L3 routing. # Used only with EXT chain. @@ -390,6 +732,11 @@ vlans: [] # (see mac_addrs_left and mac_addrs_right) no_arp: false +# Loop VM (VPP forwarder) can use ARP to discover next hop mac address +# False (default): do not send ARP but use static config devices macs instead (TRex gratuitous ARP are not interpreted by VPP) +# True: ARP requests are sent to find out next hop MAC addresses (for instance SDN-GW) +loop_vm_arp: false + # Traffic Profiles # You can add here more profiles as needed # `l2frame_size` can be specified in any none zero integer value to represent the size in bytes @@ -416,6 +763,19 @@ traffic: # Can be overriden by --no-traffic no_traffic: false +# Use an L3 router in the packet path. This option if set will create or reuse an OpenStack neutron +# router (PVP, PVVP) or reuse an existing L3 router (EXT) to route traffic to the destination VM. +# Can be overriden by --l3-router +l3_router: false + +# If l3_router is true and depending on ARP stale time SUT configuration +# Gratuitous ARP (GARP) from TG port to the router is needed to keep traffic up +# Default value: 1 packet per second +# This value needs to be defined inferior to SUT ARP stale time to avoid GARP packets drop +# in case of high load traffic +periodic_gratuitous_arp: false +gratuitous_arp_pps: 1 + # Test configuration # The rate pps for traffic going in reverse direction in case of unidirectional flow. Default to 1. @@ -482,6 +842,26 @@ debug: false # Defaults to disabled log_file: +# One can specify a user ID for changing ownership of output log/json files +# - empty: depends on file existency +# . yes? replacement, owner is unchanged +# . no ? creation with root as user +# - 0: this is the root user ID +# - other: will corresponds (or not) to an existing user/group in the host +# (the current user ID can be obtained with the command 'id -u') +# Can be overriden by --user-id +# Consider also that the default value below is overridable by a USER_ID env variable, +# if nfvbench is run into a container, this information can be passed at its creation. +# The overall precedence rule is: 'default_config (this) < env < config < command_line' +user_id: + +# Similarly, the group ID is defined +# Can be overriden by --group-id +# Default may be set through env GROUP_ID +# Caveat: user and group with a same name may have different numerical IDs +# (the current group ID can be obtained with the command 'id -g') +group_id: + # When enabled, all results and/or logs will be sent to a fluentd servers at the requested IPs and ports # A list of one or more fluentd servers identified by their IPs and port numbers should be given. # For each recipient it is possible to enable both sending logs and performance @@ -517,10 +897,57 @@ factory_class: 'BasicFactory' # Can be overriden by --user-label user_label: +# Custom information to be passed to results post-processing, +# they will be included as is in the json report 'config' branch. +# Useful for documenting or automating further treatments. +# The value is any yaml object (=> open usage) - example: +# |user_info: +# | status: explore +# | description: +# | generator: VM +# | attachment: direct +# | target: lab-pf +# | switch: qfx3500 +# Keys may be merged/overriden using the --user-info command line option +# (the command-line parameter value is expressed as a json object string) +user_info: -# THESE FIELDS SHOULD BE USED VERY RARELY + +# THESE FIELDS SHOULD BE USED VERY RARELY OR ON PURPOSE # Skip vswitch configuration and retrieving of stats # Can be overriden by --no-vswitch-access # Should be left to the default value (false) no_vswitch_access: false + +# Enable service mode for trafic capture from TRex console (for debugging purpose) +# Can be overriden by --service-mode +# Should be left to the default value (false) +service_mode: false + +# Disable extra flow stats (on high load traffic) +# Can be overriden by --no-flow-stats +# Should be left to the default value (false) +no_flow_stats: false + +# Disable flow stats for latency traffic +# Can be overriden by --no-latency-stats +# Should be left to the default value (false) +no_latency_stats: false + +# Disable latency measurements (no streams) +# Can be overriden by --no-latency-streams +# Should be left to the default value (false) +no_latency_streams: false + +# Skip "end to end" connectivity check on traffic setup +# Can be overriden by --no-e2e-check +# Should be left to the default value (false) +# This flag is usable for traffic generation only +no_e2e_check: false + +# General purpose register (debugging flags) +# Can be overriden by --debug-mask +# Designed for development needs +# The hexadecimal notation (0x...) is accepted. +debug_mask: 0x00000000 diff --git a/nfvbench/chain_router.py b/nfvbench/chain_router.py new file mode 100644 index 0000000..99114e0 --- /dev/null +++ b/nfvbench/chain_router.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# Copyright 2018 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +# This module takes care of chaining routers +# +"""NFVBENCH CHAIN DISCOVERY/STAGING. + +This module takes care of staging/discovering resources that are participating in a +L3 benchmarking session: routers, networks, ports, routes. +If a resource is discovered with the same name, it will be reused. +Otherwise it will be created. + +Once created/discovered, instances are checked to be in the active state (ready to pass traffic) +Configuration parameters that will influence how these resources are staged/related: +- openstack or no openstack +- chain type +- number of chains +- number of VNF in each chain (PVP, PVVP) +- SRIOV and middle port SRIOV for port types +- whether networks are shared across chains or not + +There is not traffic generation involved in this module. +""" +import time + +from netaddr import IPAddress +from netaddr import IPNetwork + +from .log import LOG + + +class ChainException(Exception): + """Exception while operating the chains.""" + +class ChainRouter(object): + """Could be a shared router across all chains or a chain private router.""" + + def __init__(self, manager, name, subnets, routes): + """Create a router for given chain.""" + self.manager = manager + self.subnets = subnets + self.routes = routes + self.name = name + self.ports = [None, None] + self.reuse = False + self.router = None + try: + self._setup() + except Exception: + LOG.error("Error creating router %s", self.name) + self.delete() + raise + + def _setup(self): + # Lookup if there is a matching router with same name + routers = self.manager.neutron_client.list_routers(name=self.name) + + if routers['routers']: + router = routers['routers'][0] + # a router of same name already exists, we need to verify it has the same + # characteristics + if self.subnets: + for subnet in self.subnets: + if not self.get_router_interface(router['id'], subnet.network['subnets'][0]): + raise ChainException("Mismatch of 'subnet_id' for reused " + "router '{router}'.Router has no subnet id '{sub_id}'." + .format(router=self.name, + sub_id=subnet.network['subnets'][0])) + interfaces = self.manager.neutron_client.list_ports(device_id=router['id'])['ports'] + # This string filters nfvbench networks in case when some other specific networks + # created and attached to the test nfvebnch router manually or automatically + # like in case of HA when neutron router virtually present on several network nodes + interfaces = [x for x in interfaces if x['fixed_ips'][0]['subnet_id'] in + [s.network['subnets'][0] for s in self.subnets]] + for interface in interfaces: + if self.is_ip_in_network( + interface['fixed_ips'][0]['ip_address'], + self.manager.config.traffic_generator.tg_gateway_ip_cidrs[0]) \ + or self.is_ip_in_network( + interface['fixed_ips'][0]['ip_address'], + self.manager.config.traffic_generator.tg_gateway_ip_cidrs[1]): + self.ports[0] = interface + else: + self.ports[1] = interface + if self.routes: + for route in self.routes: + if route not in router['routes']: + LOG.info("Mismatch of 'router' for reused router '%s'." + "Router has no existing route destination '%s', " + "and nexthop '%s'.", self.name, + route['destination'], + route['nexthop']) + LOG.info("New route added to router %s for reused ", self.name) + body = { + 'router': { + 'routes': self.routes + } + } + self.manager.neutron_client.update_router(router['id'], body) + + LOG.info('Reusing existing router: %s', self.name) + self.reuse = True + self.router = router + return + + body = { + 'router': { + 'name': self.name, + 'admin_state_up': True + } + } + router = self.manager.neutron_client.create_router(body)['router'] + router_id = router['id'] + + if self.subnets: + for subnet in self.subnets: + router_interface = {'subnet_id': subnet.network['subnets'][0]} + self.manager.neutron_client.add_interface_router(router_id, router_interface) + interfaces = self.manager.neutron_client.list_ports(device_id=router_id)['ports'] + interfaces = [x for x in interfaces if x['fixed_ips'][0]['subnet_id'] in + [s.network['subnets'][0] for s in self.subnets]] + for interface in interfaces: + itf = interface['fixed_ips'][0]['ip_address'] + cidr0 = self.manager.config.traffic_generator.tg_gateway_ip_cidrs[0] + cidr1 = self.manager.config.traffic_generator.tg_gateway_ip_cidrs[1] + if self.is_ip_in_network(itf, cidr0) or self.is_ip_in_network(itf, cidr1): + self.ports[0] = interface + else: + self.ports[1] = interface + + if self.routes: + body = { + 'router': { + 'routes': self.routes + } + } + self.manager.neutron_client.update_router(router_id, body) + + LOG.info('Created router: %s.', self.name) + self.router = self.manager.neutron_client.show_router(router_id) + + def get_uuid(self): + """ + Extract UUID of this router. + + :return: UUID of this router + """ + return self.router['id'] + + def get_router_interface(self, router_id, subnet_id): + interfaces = self.manager.neutron_client.list_ports(device_id=router_id)['ports'] + matching_interface = None + for interface in interfaces: + if interface['fixed_ips'][0]['subnet_id'] == subnet_id: + matching_interface = interface + return matching_interface + + def is_ip_in_network(self, interface_ip, cidr): + return IPAddress(interface_ip) in IPNetwork(cidr) + + def delete(self): + """Delete this router.""" + if not self.reuse and self.router: + retry = 0 + while retry < self.manager.config.generic_retry_count: + try: + self.manager.neutron_client.delete_router(self.router['id']) + LOG.info("Deleted router: %s", self.name) + return + except Exception: + retry += 1 + LOG.info('Error deleting router %s (retry %d/%d)...', + self.name, + retry, + self.manager.config.generic_retry_count) + time.sleep(self.manager.config.generic_poll_sec) + LOG.error('Unable to delete router: %s', self.name) diff --git a/nfvbench/chain_runner.py b/nfvbench/chain_runner.py index a4e461d..f045528 100644 --- a/nfvbench/chain_runner.py +++ b/nfvbench/chain_runner.py @@ -23,11 +23,11 @@ The ChainRunner class is in charge of coordinating: from collections import OrderedDict -from chaining import ChainManager -from log import LOG -from specs import ChainType -from stats_manager import StatsManager -from traffic_client import TrafficClient +from .chaining import ChainManager +from .log import LOG +from .specs import ChainType +from .stats_manager import StatsManager +from .traffic_client import TrafficClient class ChainRunner(object): @@ -71,11 +71,16 @@ class ChainRunner(object): # VLAN is discovered from the networks gen_config.set_vlans(0, self.chain_manager.get_chain_vlans(0)) gen_config.set_vlans(1, self.chain_manager.get_chain_vlans(1)) + else: + LOG.info("Ports: untagged") # the only case we do not need to set the dest MAC is in the case of # l2-loopback (because the traffic gen will default to use the peer MAC) - # or EXT+ARP (because dest MAC will be discovered by TRex ARP) - if not config.l2_loopback and (config.service_chain != ChainType.EXT or config.no_arp): + # or EXT+ARP+VLAN (because dest MAC will be discovered by TRex ARP) + # Note that in the case of EXT+ARP+VxLAN, the dest MACs need to be loaded + # because ARP only operates on the dest VTEP IP not on the VM dest MAC + if not config.l2_loopback and \ + (config.service_chain != ChainType.EXT or config.no_arp or config.vxlan): gen_config.set_dest_macs(0, self.chain_manager.get_dest_macs(0)) gen_config.set_dest_macs(1, self.chain_manager.get_dest_macs(1)) @@ -93,16 +98,37 @@ class ChainRunner(object): gen_config.set_vxlan_endpoints(1, src_vteps[1], dst_vtep) self.config['vxlan_gen_config'] = gen_config + if config.mpls: + # MPLS VPN is discovered from the networks + src_vteps = gen_config.gen_config.src_vteps + vtep_gateway_ips = gen_config.gen_config.vtep_gateway_ips + gen_config.set_mpls_inner_labels(0, self.chain_manager.get_chain_mpls_inner_labels(0)) + gen_config.set_mpls_inner_labels(1, self.chain_manager.get_chain_mpls_inner_labels(1)) + outer_mpls_labels_left = self.config.internal_networks.left.mpls_transport_labels + outer_mpls_labels_right = self.config.internal_networks.right.mpls_transport_labels + if outer_mpls_labels_left or outer_mpls_labels_right: + gen_config.set_mpls_outer_labels(0, outer_mpls_labels_left) + gen_config.set_mpls_outer_labels(1, outer_mpls_labels_right) + # Configuring source an remote VTEPs on TREx interfaces + gen_config.set_mpls_peers(0, src_vteps[0], vtep_gateway_ips[0]) + gen_config.set_mpls_peers(1, src_vteps[1], vtep_gateway_ips[1]) + self.config['mpls_gen_config'] = gen_config + # get an instance of the stats manager self.stats_manager = StatsManager(self) LOG.info('ChainRunner initialized') def __setup_traffic(self): + # possibly skip connectivity check + if self.config.no_e2e_check: + LOG.info('Skipping end to end connectivity check') + return self.traffic_client.setup() if not self.config.no_traffic: - # ARP is needed for EXT chain or VxLAN overlay unless disabled explicitly - if (self.config.service_chain == ChainType.EXT or self.config.vxlan) and \ - not self.config.no_arp: + # ARP is needed for EXT chain or VxLAN overlay or MPLS unless disabled explicitly + if (self.config.service_chain == ChainType.EXT or self.config.mpls or + self.config.vxlan or self.config.l3_router or self.config.loop_vm_arp)\ + and not self.config.no_arp: self.traffic_client.ensure_arp_successful() self.traffic_client.ensure_end_to_end() @@ -129,7 +155,10 @@ class ChainRunner(object): if self.config.single_run: result['run_config'] = self.traffic_client.get_run_config(result) required = result['run_config']['direction-total']['orig']['rate_pps'] - actual = result['stats']['total_tx_rate'] + if self.config.periodic_gratuitous_arp: + actual = result['stats']['total_tx_rate'] + self.config.gratuitous_arp_pps + else: + actual = result['stats']['total_tx_rate'] warning = self.traffic_client.compare_tx_rates(required, actual) if warning is not None: result['run_config']['warning'] = warning @@ -163,10 +192,9 @@ class ChainRunner(object): LOG.info('Starting %dx%s benchmark...', self.config.service_chain_count, self.chain_name) self.stats_manager.create_worker() - if self.config.vxlan: - # Configure vxlan tunnels + if self.config.vxlan or self.config.mpls: + # Configure vxlan or mpls tunnels self.stats_manager.worker.config_interfaces() - self.__setup_traffic() results[self.chain_name] = {'result': self.__get_chain_result()} @@ -185,8 +213,8 @@ class ChainRunner(object): LOG.info('Clean up skipped.') try: self.traffic_client.close() - except Exception: - LOG.exception() + except Exception as exc: + LOG.exception(exc) if self.stats_manager: self.stats_manager.close() except Exception: diff --git a/nfvbench/chain_workers.py b/nfvbench/chain_workers.py index 0ed2648..e332d7b 100644 --- a/nfvbench/chain_workers.py +++ b/nfvbench/chain_workers.py @@ -43,7 +43,6 @@ class BasicWorker(object): Specialized workers can insert their own interface stats inside each existing packet path stats for every chain. """ - pass def update_interface_stats(self, diff=False): """Update all interface stats. @@ -53,4 +52,3 @@ class BasicWorker(object): Make sure that the interface stats inserted in insert_interface_stats() are updated with proper values """ - pass diff --git a/nfvbench/chaining.py b/nfvbench/chaining.py index a97cd0b..d6f67f9 100644 --- a/nfvbench/chaining.py +++ b/nfvbench/chaining.py @@ -49,18 +49,21 @@ import os import re import time -from glanceclient.v2 import client as glanceclient +import glanceclient from neutronclient.neutron import client as neutronclient from novaclient.client import Client from attrdict import AttrDict -import compute -from log import LOG -from specs import ChainType - +from .chain_router import ChainRouter +from . import compute +from .log import LOG +from .specs import ChainType # Left and right index for network and port lists LEFT = 0 RIGHT = 1 +# L3 traffic edge networks are at the end of networks list +EDGE_LEFT = -2 +EDGE_RIGHT = -1 # Name of the VM config file NFVBENCH_CFG_FILENAME = 'nfvbenchvm.conf' # full pathame of the VM config in the VM @@ -74,8 +77,6 @@ BOOT_SCRIPT_PATHNAME = os.path.join(os.path.dirname(os.path.abspath(__file__)), class ChainException(Exception): """Exception while operating the chains.""" - pass - class NetworkEncaps(object): """Network encapsulation.""" @@ -131,6 +132,7 @@ class ChainVnfPort(object): self.manager = vnf.manager self.reuse = False self.port = None + self.floating_ip = None if vnf.instance: # VNF instance is reused, we need to find an existing port that matches this instance # and network @@ -156,6 +158,10 @@ class ChainVnfPort(object): 'binding:vnic_type': vnic_type } } + subnet_id = chain_network.get_subnet_uuid() + if subnet_id: + body['port']['fixed_ips'] = [{'subnet_id': subnet_id}] + port = self.manager.neutron_client.create_port(body) self.port = port['port'] LOG.info('Created port %s', name) @@ -174,18 +180,39 @@ class ChainVnfPort(object): """Get the MAC address for this port.""" return self.port['mac_address'] + def get_ip(self): + """Get the IP address for this port.""" + return self.port['fixed_ips'][0]['ip_address'] + + def set_floating_ip(self, chain_network): + # create and add floating ip to port + try: + self.floating_ip = self.manager.neutron_client.create_floatingip({ + 'floatingip': { + 'floating_network_id': chain_network.get_uuid(), + 'port_id': self.port['id'], + 'description': 'nfvbench floating ip for port:' + self.port['name'], + }})['floatingip'] + LOG.info('Floating IP %s created and associated on port %s', + self.floating_ip['floating_ip_address'], self.name) + return self.floating_ip['floating_ip_address'] + except Exception: + LOG.info('Failed to created and associated floating ip on port %s (ignored)', self.name) + return self.port['fixed_ips'][0]['ip_address'] + def delete(self): """Delete this port instance.""" if self.reuse or not self.port: return - retry = 0 - while retry < self.manager.config.generic_retry_count: + for _ in range(0, self.manager.config.generic_retry_count): try: self.manager.neutron_client.delete_port(self.port['id']) LOG.info("Deleted port %s", self.name) + if self.floating_ip: + self.manager.neutron_client.delete_floatingip(self.floating_ip['id']) + LOG.info("Deleted floating IP %s", self.floating_ip['description']) return except Exception: - retry += 1 time.sleep(self.manager.config.generic_poll_sec) LOG.error('Unable to delete port: %s', self.name) @@ -193,24 +220,39 @@ class ChainVnfPort(object): class ChainNetwork(object): """Could be a shared network across all chains or a chain private network.""" - def __init__(self, manager, network_config, chain_id=None, lookup_only=False): + def __init__(self, manager, network_config, chain_id=None, lookup_only=False, + suffix=None): """Create a network for given chain. network_config: a dict containing the network properties - (segmentation_id and physical_network) + (name, segmentation_id and physical_network) chain_id: to which chain the networks belong. a None value will mean that these networks are shared by all chains + suffix: a suffix to add to the network name (if not None) """ self.manager = manager - self.name = network_config.name + if chain_id is None: + self.name = network_config.name + else: + # the name itself can be either a string or a list of names indexed by chain ID + if isinstance(network_config.name, tuple): + self.name = network_config.name[chain_id] + else: + # network_config.name is a prefix string + self.name = network_config.name + str(chain_id) + if suffix: + self.name = self.name + suffix self.segmentation_id = self._get_item(network_config.segmentation_id, chain_id, auto_index=True) + self.subnet_name = self._get_item(network_config.subnet, chain_id) self.physical_network = self._get_item(network_config.physical_network, chain_id) - if chain_id is not None: - self.name += str(chain_id) + self.reuse = False self.network = None self.vlan = None + self.router_name = None + if manager.config.l3_router and hasattr(network_config, 'router_name'): + self.router_name = network_config.router_name try: self._setup(network_config, lookup_only) except Exception: @@ -242,7 +284,7 @@ class ChainNetwork(object): return item_field[index] except IndexError: raise ChainException("List %s is too short for chain index %d" % - (str(item_field), index)) + (str(item_field), index)) from IndexError # single value is configured if auto_index: return item_field + index @@ -283,7 +325,7 @@ class ChainNetwork(object): 'network': { 'name': self.name, 'admin_state_up': True - } + } } if network_config.network_type: body['network']['provider:network_type'] = network_config.network_type @@ -292,6 +334,8 @@ class ChainNetwork(object): if self.physical_network: body['network']['provider:physical_network'] = self.physical_network self.network = self.manager.neutron_client.create_network(body)['network'] + # create associated subnet, all subnets have the same name (which is ok since + # we do not need to address them directly by name) body = { 'subnet': {'name': network_config.subnet, 'cidr': network_config.cidr, @@ -313,6 +357,18 @@ class ChainNetwork(object): """ return self.network['id'] + def get_subnet_uuid(self): + """ + Extract UUID of this subnet network. + + :return: UUID of this subnet network + """ + for subnet in self.network['subnets']: + if self.subnet_name == self.manager.neutron_client \ + .show_subnet(subnet)['subnet']['name']: + return subnet + return None + def get_vlan(self): """ Extract vlan for this network. @@ -329,24 +385,30 @@ class ChainNetwork(object): :return: VNI ID for this network """ - if 'vxlan' not in self.network['provider:network_type']: - raise ChainException('Trying to retrieve VNI for non VXLAN network') + + return self.network['provider:segmentation_id'] + + def get_mpls_inner_label(self): + """ + Extract MPLS VPN Label for this network. + + :return: MPLS VPN Label for this network + """ + return self.network['provider:segmentation_id'] def delete(self): """Delete this network.""" if not self.reuse and self.network: - retry = 0 - while retry < self.manager.config.generic_retry_count: + for retry in range(0, self.manager.config.generic_retry_count): try: self.manager.neutron_client.delete_network(self.network['id']) LOG.info("Deleted network: %s", self.name) return except Exception: - retry += 1 LOG.info('Error deleting network %s (retry %d/%d)...', self.name, - retry, + retry + 1, self.manager.config.generic_retry_count) time.sleep(self.manager.config.generic_poll_sec) LOG.error('Unable to delete network: %s', self.name) @@ -369,15 +431,27 @@ class ChainVnf(object): if len(networks) > 2: # we will have more than 1 VM in each chain self.name += '-' + str(vnf_id) + # A list of ports for this chain + # There are normally 2 ports carrying traffic (index 0, and index 1) and + # potentially multiple idle ports not carrying traffic (index 2 and up) + # For example if 7 idle interfaces are requested, the corresp. ports will be + # at index 2 to 8 self.ports = [] + self.management_port = None + self.routers = [] self.status = None self.instance = None self.reuse = False self.host_ip = None + self.idle_networks = [] + self.idle_ports = [] try: # the vnf_id is conveniently also the starting index in networks # for the left and right networks associated to this VNF - self._setup(networks[vnf_id:vnf_id + 2]) + if self.manager.config.l3_router: + self._setup(networks[vnf_id:vnf_id + 4]) + else: + self._setup(networks[vnf_id:vnf_id + 2]) except Exception: LOG.error("Error creating VNF %s", self.name) self.delete() @@ -386,29 +460,85 @@ class ChainVnf(object): def _get_vm_config(self, remote_mac_pair): config = self.manager.config devices = self.manager.generator_config.devices - with open(BOOT_SCRIPT_PATHNAME, 'r') as boot_script: + + if config.l3_router: + tg_gateway1_ip = self.routers[LEFT].ports[1]['fixed_ips'][0][ + 'ip_address'] # router edge ip left + tg_gateway2_ip = self.routers[RIGHT].ports[1]['fixed_ips'][0][ + 'ip_address'] # router edge ip right + tg_mac1 = self.routers[LEFT].ports[1]['mac_address'] # router edge mac left + tg_mac2 = self.routers[RIGHT].ports[1]['mac_address'] # router edge mac right + # edge cidr mask left + vnf_gateway1_cidr = \ + self.ports[LEFT].get_ip() + self.__get_network_mask( + self.manager.config.edge_networks.left.cidr) + # edge cidr mask right + vnf_gateway2_cidr = \ + self.ports[RIGHT].get_ip() + self.__get_network_mask( + self.manager.config.edge_networks.right.cidr) + if config.vm_forwarder != 'vpp': + raise ChainException( + 'L3 router mode imply to set VPP as VM forwarder.' + 'Please update your config file with: vm_forwarder: vpp') + else: + tg_gateway1_ip = devices[LEFT].tg_gateway_ip_addrs + tg_gateway2_ip = devices[RIGHT].tg_gateway_ip_addrs + if not config.loop_vm_arp: + tg_mac1 = remote_mac_pair[0] + tg_mac2 = remote_mac_pair[1] + else: + tg_mac1 = "" + tg_mac2 = "" + + g1cidr = devices[LEFT].get_gw_ip( + self.chain.chain_id) + self.__get_network_mask( + self.manager.config.internal_networks.left.cidr) + g2cidr = devices[RIGHT].get_gw_ip( + self.chain.chain_id) + self.__get_network_mask( + self.manager.config.internal_networks.right.cidr) + + vnf_gateway1_cidr = g1cidr + vnf_gateway2_cidr = g2cidr + + with open(BOOT_SCRIPT_PATHNAME, 'r', encoding="utf-8") as boot_script: content = boot_script.read() - g1cidr = devices[LEFT].get_gw_ip(self.chain.chain_id) + '/8' - g2cidr = devices[RIGHT].get_gw_ip(self.chain.chain_id) + '/8' vm_config = { 'forwarder': config.vm_forwarder, 'intf_mac1': self.ports[LEFT].get_mac(), 'intf_mac2': self.ports[RIGHT].get_mac(), - 'tg_gateway1_ip': devices[LEFT].tg_gateway_ip_addrs, - 'tg_gateway2_ip': devices[RIGHT].tg_gateway_ip_addrs, + 'tg_gateway1_ip': tg_gateway1_ip, + 'tg_gateway2_ip': tg_gateway2_ip, 'tg_net1': devices[LEFT].ip_addrs, 'tg_net2': devices[RIGHT].ip_addrs, - 'vnf_gateway1_cidr': g1cidr, - 'vnf_gateway2_cidr': g2cidr, - 'tg_mac1': remote_mac_pair[0], - 'tg_mac2': remote_mac_pair[1] + 'vnf_gateway1_cidr': vnf_gateway1_cidr, + 'vnf_gateway2_cidr': vnf_gateway2_cidr, + 'tg_mac1': tg_mac1, + 'tg_mac2': tg_mac2, + 'vif_mq_size': config.vif_multiqueue_size, + 'num_mbufs': config.num_mbufs } + if self.manager.config.use_management_port: + mgmt_ip = self.management_port.port['fixed_ips'][0]['ip_address'] + mgmt_mask = self.__get_network_mask(self.manager.config.management_network.cidr) + vm_config['intf_mgmt_cidr'] = mgmt_ip + mgmt_mask + vm_config['intf_mgmt_ip_gw'] = self.manager.config.management_network.gateway + vm_config['intf_mac_mgmt'] = self.management_port.port['mac_address'] + else: + # Interface management config left empty to avoid error in VM spawn + # if nfvbench config has values for management network but use_management_port=false + vm_config['intf_mgmt_cidr'] = '' + vm_config['intf_mgmt_ip_gw'] = '' + vm_config['intf_mac_mgmt'] = '' return content.format(**vm_config) + @staticmethod + def __get_network_mask(network): + return '/' + network.split('/')[1] + def _get_vnic_type(self, port_index): """Get the right vnic type for given port indexself. - If SR-IOV is speficied, middle ports in multi-VNF chains + If SR-IOV is specified, middle ports in multi-VNF chains can use vswitch or SR-IOV based on config.use_sriov_middle_net """ if self.manager.config.sriov: @@ -423,45 +553,164 @@ class ChainVnf(object): return 'direct' return 'normal' + def _get_idle_networks_ports(self): + """Get the idle networks for PVP or PVVP chain (non shared net only) + + For EXT packet path or shared net, returns empty list. + For PVP, PVVP these networks will be created if they do not exist. + chain_id: to which chain the networks belong. + a None value will mean that these networks are shared by all chains + """ + networks = [] + ports = [] + config = self.manager.config + chain_id = self.chain.chain_id + idle_interfaces_per_vm = config.idle_interfaces_per_vm + if config.service_chain == ChainType.EXT or chain_id is None or \ + idle_interfaces_per_vm == 0: + return + + # Make a copy of the idle networks dict as we may have to modify the + # segmentation ID + idle_network_cfg = AttrDict(config.idle_networks) + if idle_network_cfg.segmentation_id: + segmentation_id = idle_network_cfg.segmentation_id + \ + chain_id * idle_interfaces_per_vm + else: + segmentation_id = None + try: + # create as many idle networks and ports as requested + for idle_index in range(idle_interfaces_per_vm): + if config.service_chain == ChainType.PVP: + suffix = '.%d' % (idle_index) + else: + suffix = '.%d.%d' % (self.vnf_id, idle_index) + port_name = self.name + '-idle' + str(idle_index) + # update the segmentation id based on chain id and idle index + if segmentation_id: + idle_network_cfg.segmentation_id = segmentation_id + idle_index + port_name = port_name + "." + str(segmentation_id) + + networks.append(ChainNetwork(self.manager, + idle_network_cfg, + chain_id, + suffix=suffix)) + ports.append(ChainVnfPort(port_name, + self, + networks[idle_index], + 'normal')) + except Exception: + # need to cleanup all successful networks + for net in networks: + net.delete() + for port in ports: + port.delete() + raise + self.idle_networks = networks + self.idle_ports = ports + def _setup(self, networks): flavor_id = self.manager.flavor.flavor.id # Check if we can reuse an instance with same name for instance in self.manager.existing_instances: if instance.name == self.name: + instance_left = LEFT + instance_right = RIGHT + # In case of L3 traffic instance use edge networks + if self.manager.config.l3_router: + instance_left = EDGE_LEFT + instance_right = EDGE_RIGHT # Verify that other instance characteristics match if instance.flavor['id'] != flavor_id: self._reuse_exception('Flavor mismatch') if instance.status != "ACTIVE": self._reuse_exception('Matching instance is not in ACTIVE state') # The 2 networks for this instance must also be reused - if not networks[LEFT].reuse: - self._reuse_exception('network %s is new' % networks[LEFT].name) - if not networks[RIGHT].reuse: - self._reuse_exception('network %s is new' % networks[RIGHT].name) + if not networks[instance_left].reuse: + self._reuse_exception('network %s is new' % networks[instance_left].name) + if not networks[instance_right].reuse: + self._reuse_exception('network %s is new' % networks[instance_right].name) # instance.networks have the network names as keys: # {'nfvbench-rnet0': ['192.168.2.10'], 'nfvbench-lnet0': ['192.168.1.8']} - if networks[LEFT].name not in instance.networks: + if networks[instance_left].name not in instance.networks: self._reuse_exception('Left network mismatch') - if networks[RIGHT].name not in instance.networks: + if networks[instance_right].name not in instance.networks: self._reuse_exception('Right network mismatch') self.reuse = True self.instance = instance LOG.info('Reusing existing instance %s on %s', self.name, self.get_hypervisor_name()) + # create management port if needed + if self.manager.config.use_management_port: + self.management_port = ChainVnfPort(self.name + '-mgmt', self, + self.manager.management_network, 'normal') + ip = self.management_port.port['fixed_ips'][0]['ip_address'] + if self.manager.config.use_floating_ip: + ip = self.management_port.set_floating_ip(self.manager.floating_ip_network) + LOG.info("Management interface will be active using IP: %s, " + "and you can connect over SSH with login: nfvbench and password: nfvbench", ip) # create or reuse/discover 2 ports per instance - self.ports = [ChainVnfPort(self.name + '-' + str(index), - self, - networks[index], - self._get_vnic_type(index)) for index in [0, 1]] + if self.manager.config.l3_router: + for index in [0, 1]: + self.ports.append(ChainVnfPort(self.name + '-' + str(index), + self, + networks[index + 2], + self._get_vnic_type(index))) + else: + for index in [0, 1]: + self.ports.append(ChainVnfPort(self.name + '-' + str(index), + self, + networks[index], + self._get_vnic_type(index))) + + # create idle networks and ports only if instance is not reused + # if reused, we do not care about idle networks/ports + if not self.reuse: + self._get_idle_networks_ports() + + # Create neutron routers for L3 traffic use case + if self.manager.config.l3_router and self.manager.openstack: + internal_nets = networks[:2] + if self.manager.config.service_chain == ChainType.PVP: + edge_nets = networks[2:] + else: + edge_nets = networks[3:] + subnets_left = [internal_nets[0], edge_nets[0]] + routes_left = [{'destination': self.manager.config.traffic_generator.ip_addrs[0], + 'nexthop': self.manager.config.traffic_generator.tg_gateway_ip_addrs[ + 0]}, + {'destination': self.manager.config.traffic_generator.ip_addrs[1], + 'nexthop': self.ports[0].get_ip()}] + self.routers.append( + ChainRouter(self.manager, edge_nets[0].router_name, subnets_left, routes_left)) + subnets_right = [internal_nets[1], edge_nets[1]] + routes_right = [{'destination': self.manager.config.traffic_generator.ip_addrs[0], + 'nexthop': self.ports[1].get_ip()}, + {'destination': self.manager.config.traffic_generator.ip_addrs[1], + 'nexthop': self.manager.config.traffic_generator.tg_gateway_ip_addrs[ + 1]}] + self.routers.append( + ChainRouter(self.manager, edge_nets[1].router_name, subnets_right, routes_right)) + # Overload gateway_ips property with router ip address for ARP and traffic calls + self.manager.generator_config.devices[LEFT].set_gw_ip( + self.routers[LEFT].ports[0]['fixed_ips'][0]['ip_address']) # router edge ip left) + self.manager.generator_config.devices[RIGHT].set_gw_ip( + self.routers[RIGHT].ports[0]['fixed_ips'][0]['ip_address']) # router edge ip right) + # if no reuse, actual vm creation is deferred after all ports in the chain are created # since we need to know the next mac in a multi-vnf chain def create_vnf(self, remote_mac_pair): """Create the VNF instance if it does not already exist.""" if self.instance is None: - port_ids = [{'port-id': vnf_port.port['id']} - for vnf_port in self.ports] + port_ids = [] + if self.manager.config.use_management_port: + port_ids.append({'port-id': self.management_port.port['id']}) + port_ids.extend([{'port-id': vnf_port.port['id']} for vnf_port in self.ports]) + # add idle ports + for idle_port in self.idle_ports: + port_ids.append({'port-id': idle_port.port['id']}) vm_config = self._get_vm_config(remote_mac_pair) az = self.manager.placer.get_required_az() server = self.manager.comp.create_server(self.name, @@ -485,8 +734,8 @@ class ChainVnf(object): # here we MUST wait until this instance is resolved otherwise subsequent # VNF creation can be placed in other hypervisors! config = self.manager.config - max_retries = (config.check_traffic_time_sec + - config.generic_poll_sec - 1) / config.generic_poll_sec + max_retries = int((config.check_traffic_time_sec + + config.generic_poll_sec - 1) / config.generic_poll_sec) retry = 0 for retry in range(max_retries): status = self.get_status() @@ -522,7 +771,13 @@ class ChainVnf(object): def get_hostname(self): """Get the hypervisor host name running this VNF instance.""" - return getattr(self.instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname') + if self.manager.is_admin: + hypervisor_hostname = getattr(self.instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname') + else: + hypervisor_hostname = self.manager.config.hypervisor_hostname + if not hypervisor_hostname: + raise ChainException('Hypervisor hostname parameter is mandatory') + return hypervisor_hostname def get_host_ip(self): """Get the IP address of the host where this instance runs. @@ -536,7 +791,12 @@ class ChainVnf(object): def get_hypervisor_name(self): """Get hypervisor name (az:hostname) for this VNF instance.""" if self.instance: - az = getattr(self.instance, 'OS-EXT-AZ:availability_zone') + if self.manager.is_admin: + az = getattr(self.instance, 'OS-EXT-AZ:availability_zone') + else: + az = self.manager.config.availability_zone + if not az: + raise ChainException('Availability zone parameter is mandatory') hostname = self.get_hostname() if az: return az + ':' + hostname @@ -555,8 +815,15 @@ class ChainVnf(object): if self.instance: self.manager.comp.delete_server(self.instance) LOG.info("Deleted instance %s", self.name) + if self.manager.config.use_management_port: + self.management_port.delete() for port in self.ports: port.delete() + for port in self.idle_ports: + port.delete() + for network in self.idle_networks: + network.delete() + class Chain(object): """A class to manage a single chain. @@ -619,7 +886,8 @@ class Chain(object): def get_length(self): """Get the number of VNF in the chain.""" - return len(self.networks) - 1 + # Take into account 2 edge networks for routers + return len(self.networks) - 3 if self.manager.config.l3_router else len(self.networks) - 1 def _get_remote_mac_pairs(self): """Get the list of remote mac pairs for every VNF in the chain. @@ -674,7 +942,10 @@ class Chain(object): if port_index: # this will pick the last item in array port_index = -1 - return self.networks[port_index].get_vlan() + # This string filters networks connected to TG, in case of + # l3-router feature we have 4 networks instead of 2 + networks = [x for x in self.networks if not x.router_name] + return networks[port_index].get_vlan() def get_vxlan(self, port_index): """Get the VXLAN id on a given port. @@ -690,6 +961,20 @@ class Chain(object): port_index = -1 return self.networks[port_index].get_vxlan() + def get_mpls_inner_label(self, port_index): + """Get the MPLS VPN Label on a given port. + + port_index: left port is 0, right port is 1 + return: the mpls_label_id or None if there is no mpls + """ + # for port 1 we need to return the MPLS Label of the last network in the chain + # The networks array contains 2 networks for PVP [left, right] + # and 3 networks in the case of PVVP [left.middle,right] + if port_index: + # this will pick the last item in array + port_index = -1 + return self.networks[port_index].get_mpls_inner_label() + def get_dest_mac(self, port_index): """Get the dest MAC on a given port. @@ -851,6 +1136,7 @@ class ChainManager(object): if self.openstack: # openstack only session = chain_runner.cred.get_session() + self.is_admin = chain_runner.cred.is_admin self.nova_client = Client(2, session=session) self.neutron_client = neutronclient.Client('2.0', session=session) self.glance_client = glanceclient.Client('2', session=session) @@ -864,6 +1150,22 @@ class ChainManager(object): self.flavor = ChainFlavor(config.flavor_type, config.flavor, self.comp) # Get list of all existing instances to check if some instances can be reused self.existing_instances = self.comp.get_server_list() + # If management port is requested for VMs, create management network (shared) + if self.config.use_management_port: + self.management_network = ChainNetwork(self, self.config.management_network, + None, False) + # If floating IP is used for management, create and share + # across chains the floating network + if self.config.use_floating_ip: + self.floating_ip_network = ChainNetwork(self, + self.config.floating_network, + None, False) + else: + # For EXT chains, the external_networks left and right fields in the config + # must be either a prefix string or a list of at least chain-count strings + self._check_extnet('left', config.external_networks.left) + self._check_extnet('right', config.external_networks.right) + # If networks are shared across chains, get the list of networks if config.service_chain_shared_net: self.networks = self.get_networks() @@ -871,18 +1173,20 @@ class ChainManager(object): for chain_id in range(self.chain_count): self.chains.append(Chain(chain_id, self)) if config.service_chain == ChainType.EXT: - # if EXT and no ARP we need to read dest MACs from config - if config.no_arp: + # if EXT and no ARP or VxLAN we need to read dest MACs from config + if config.no_arp or config.vxlan: self._get_dest_macs_from_config() else: # Make sure all instances are active before proceeding self._ensure_instances_active() + # network API call do not show VLANS ID if not admin read from config + if not self.is_admin and config.vlan_tagging: + self._get_config_vlans() except Exception: self.delete() raise else: # no openstack, no need to create chains - if not config.l2_loopback and config.no_arp: self._get_dest_macs_from_config() if config.vlan_tagging: @@ -890,12 +1194,27 @@ class ChainManager(object): if len(config.vlans) != 2: raise ChainException('The config vlans property must be a list ' 'with 2 lists of VLAN IDs') - re_vlan = "[0-9]*$" - self.vlans = [self._check_list('vlans[0]', config.vlans[0], re_vlan), - self._check_list('vlans[1]', config.vlans[1], re_vlan)] + self._get_config_vlans() if config.vxlan: raise ChainException('VxLAN is only supported with OpenStack') + def _check_extnet(self, side, name): + if not name: + raise ChainException('external_networks.%s must contain a valid network' + ' name prefix or a list of network names' % side) + if isinstance(name, tuple) and len(name) < self.chain_count: + raise ChainException('external_networks.%s %s' + ' must have at least %d names' % (side, name, self.chain_count)) + + def _get_config_vlans(self): + re_vlan = "[0-9]*$" + try: + self.vlans = [self._check_list('vlans[0]', self.config.vlans[0], re_vlan), + self._check_list('vlans[1]', self.config.vlans[1], re_vlan)] + except IndexError: + raise ChainException( + 'vlans parameter is mandatory. Set valid value in config file') from IndexError + def _get_dest_macs_from_config(self): re_mac = "[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$" tg_config = self.config.traffic_generator @@ -908,6 +1227,8 @@ class ChainManager(object): # if it is a single int or mac, make it a list of 1 int if isinstance(ll, (int, str)): ll = [ll] + else: + ll = list(ll) for item in ll: if not re.match(pattern, str(item)): raise ChainException("Invalid format '{item}' specified in {fname}" @@ -965,12 +1286,16 @@ class ChainManager(object): LOG.info('Image %s successfully uploaded.', self.image_name) self.image_instance = self.comp.find_image(self.image_name) + # image multiqueue property must be set according to the vif_multiqueue_size + # config value (defaults to 1 or disabled) + self.comp.image_set_multiqueue(self.image_instance, self.config.vif_multiqueue_size > 1) + def _ensure_instances_active(self): instances = [] for chain in self.chains: instances.extend(chain.get_instances()) initial_instance_count = len(instances) - max_retries = (self.config.check_traffic_time_sec + + max_retries = (self.config.check_traffic_time_sec + (initial_instance_count - 1) * 10 + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec retry = 0 while instances: @@ -1016,6 +1341,7 @@ class ChainManager(object): lookup_only = True ext_net = self.config.external_networks net_cfg = [AttrDict({'name': name, + 'subnet': None, 'segmentation_id': None, 'physical_network': None}) for name in [ext_net.left, ext_net.right]] @@ -1028,6 +1354,10 @@ class ChainManager(object): net_cfg = [int_nets.left, int_nets.right] else: net_cfg = [int_nets.left, int_nets.middle, int_nets.right] + if self.config.l3_router: + edge_nets = self.config.edge_networks + net_cfg.append(edge_nets.left) + net_cfg.append(edge_nets.right) networks = [] try: for cfg in net_cfg: @@ -1091,30 +1421,41 @@ class ChainManager(object): """ return self.get_existing_ports().get(chain_network.get_uuid(), None) - def get_host_ip_from_mac(self, mac): - """Get the host IP address matching a MAC. + def get_hypervisor_from_mac(self, mac): + """Get the hypervisor that hosts a VM MAC. mac: MAC address to look for - return: the IP address of the host where the matching port runs or None if not found + return: the hypervisor where the matching port runs or None if not found """ # _existing_ports is a dict of list of ports indexed by network id - for port_list in self.get_existing_ports().values(): + for port_list in list(self.get_existing_ports().values()): for port in port_list: try: if port['mac_address'] == mac: host_id = port['binding:host_id'] - return self.comp.get_hypervisor(host_id).host_ip + return self.comp.get_hypervisor(host_id) except KeyError: pass return None + def get_host_ip_from_mac(self, mac): + """Get the host IP address matching a MAC. + + mac: MAC address to look for + return: the IP address of the host where the matching port runs or None if not found + """ + hypervisor = self.get_hypervisor_from_mac(mac) + if hypervisor: + return hypervisor.host_ip + return None + def get_chain_vlans(self, port_index): """Get the list of per chain VLAN id on a given port. port_index: left port is 0, right port is 1 return: a VLAN ID list indexed by the chain index or None if no vlan tagging """ - if self.chains: + if self.chains and self.is_admin: return [self.chains[chain_index].get_vlan(port_index) for chain_index in range(self.chain_count)] # no openstack @@ -1126,11 +1467,23 @@ class ChainManager(object): port_index: left port is 0, right port is 1 return: a VNIs ID list indexed by the chain index or None if no vlan tagging """ - if self.chains: + if self.chains and self.is_admin: return [self.chains[chain_index].get_vxlan(port_index) for chain_index in range(self.chain_count)] # no openstack - raise ChainException('VxLAN is only supported with OpenStack') + raise ChainException('VxLAN is only supported with OpenStack and with admin user') + + def get_chain_mpls_inner_labels(self, port_index): + """Get the list of per chain MPLS VPN Labels on a given port. + + port_index: left port is 0, right port is 1 + return: a MPLSs ID list indexed by the chain index or None if no mpls + """ + if self.chains and self.is_admin: + return [self.chains[chain_index].get_mpls_inner_label(port_index) + for chain_index in range(self.chain_count)] + # no openstack + raise ChainException('MPLS is only supported with OpenStack and with admin user') def get_dest_macs(self, port_index): """Get the list of per chain dest MACs on a given port. @@ -1178,7 +1531,17 @@ class ChainManager(object): if self.chains: # in the case of EXT, the compute node must be retrieved from the port # associated to any of the dest MACs - return self.chains[0].get_compute_nodes() + if self.config.service_chain != ChainType.EXT: + return self.chains[0].get_compute_nodes() + # in the case of EXT, the compute node must be retrieved from the port + # associated to any of the dest MACs + dst_macs = self.generator_config.get_dest_macs() + # dest MAC on port 0, chain 0 + dst_mac = dst_macs[0][0] + hypervisor = self.get_hypervisor_from_mac(dst_mac) + if hypervisor: + LOG.info('Found hypervisor for EXT chain: %s', hypervisor.hypervisor_hostname) + return [':' + hypervisor.hypervisor_hostname] # no openstack = no chains return [] @@ -1188,5 +1551,9 @@ class ChainManager(object): chain.delete() for network in self.networks: network.delete() + if self.config.use_management_port and hasattr(self, 'management_network'): + self.management_network.delete() + if self.config.use_floating_ip and hasattr(self, 'floating_ip_network'): + self.floating_ip_network.delete() if self.flavor: self.flavor.delete() diff --git a/nfvbench/cleanup.py b/nfvbench/cleanup.py index 819514a..cefdcfa 100644 --- a/nfvbench/cleanup.py +++ b/nfvbench/cleanup.py @@ -15,15 +15,16 @@ # import sys -import time from neutronclient.neutron import client as nclient from novaclient.client import Client from novaclient.exceptions import NotFound from tabulate import tabulate -import credentials as credentials -from log import LOG +from . import credentials +from .log import LOG +from . import utils + class ComputeCleaner(object): """A cleaner for compute resources.""" @@ -35,40 +36,24 @@ class ComputeCleaner(object): self.servers = [server for server in all_servers if server.name.startswith(instance_prefix)] - def instance_exists(self, server): - try: - self.nova_client.servers.get(server.id) - except NotFound: - return False - return True - def get_resource_list(self): return [["Instance", server.name, server.id] for server in self.servers] - def clean(self): - if self.servers: - for server in self.servers: - try: - LOG.info('Deleting instance %s...', server.name) - self.nova_client.servers.delete(server.id) - except Exception: - LOG.exception("Instance %s deletion failed", server.name) - LOG.info(' Waiting for %d instances to be fully deleted...', len(self.servers)) - retry_count = 15 + len(self.servers) * 5 - while True: - retry_count -= 1 - self.servers = [server for server in self.servers if self.instance_exists(server)] - if not self.servers: - break + def get_cleaner_code(self): + return "instances" - if retry_count: - LOG.info(' %d yet to be deleted by Nova, retries left=%d...', - len(self.servers), retry_count) - time.sleep(2) - else: - LOG.warning(' instance deletion verification time-out: %d still not deleted', - len(self.servers)) - break + def clean_needed(self, clean_options): + if clean_options is None: + return True + code = self.get_cleaner_code() + return code[0] in clean_options + + def clean(self, clean_options): + if self.clean_needed(clean_options): + if self.servers: + for server in self.servers: + utils.delete_server(self.nova_client, server) + utils.waiting_servers_deletion(self.nova_client, self.servers) class NetworkCleaner(object): @@ -83,7 +68,7 @@ class NetworkCleaner(object): for net in all_networks: netname = net['name'] for prefix in network_name_prefixes: - if netname.startswith(prefix): + if prefix and netname.startswith(prefix): self.networks.append(net) net_ids.append(net['id']) break @@ -91,28 +76,125 @@ class NetworkCleaner(object): LOG.info('Discovering ports...') all_ports = self.neutron_client.list_ports()['ports'] self.ports = [port for port in all_ports if port['network_id'] in net_ids] + LOG.info('Discovering floating ips...') + all_floating_ips = self.neutron_client.list_floatingips()['floatingips'] + self.floating_ips = [floating_ip for floating_ip in all_floating_ips if + floating_ip['floating_network_id'] in net_ids and "nfvbench" in + floating_ip['description']] else: self.ports = [] + self.floating_ips = [] def get_resource_list(self): res_list = [["Network", net['name'], net['id']] for net in self.networks] res_list.extend([["Port", port['name'], port['id']] for port in self.ports]) + res_list.extend( + [["Floating IP", floating_ip['description'], floating_ip['id']] for floating_ip in + self.floating_ips]) return res_list - def clean(self): - for port in self.ports: - LOG.info("Deleting port %s...", port['id']) - try: - self.neutron_client.delete_port(port['id']) - except Exception: - LOG.exception("Port deletion failed") - - for net in self.networks: - LOG.info("Deleting network %s...", net['name']) - try: - self.neutron_client.delete_network(net['id']) - except Exception: - LOG.exception("Network deletion failed") + def get_cleaner_code(self): + return "networks, ports and floating ips" + + def clean_needed(self, clean_options): + if clean_options is None: + return True + code = self.get_cleaner_code() + return code[0] in clean_options + + def clean(self, clean_options): + if self.clean_needed(clean_options): + for port in self.ports: + LOG.info("Deleting port %s...", port['id']) + try: + self.neutron_client.delete_port(port['id']) + except Exception: + LOG.exception("Port deletion failed") + for floating_ip in self.floating_ips: + LOG.info("Deleting floating ip %s...", floating_ip['id']) + try: + self.neutron_client.delete_floatingip(floating_ip['id']) + except Exception: + LOG.exception("Floating IP deletion failed") + # associated subnets are automatically deleted by neutron + for net in self.networks: + LOG.info("Deleting network %s...", net['name']) + try: + self.neutron_client.delete_network(net['id']) + except Exception: + LOG.exception("Network deletion failed") + + +class RouterCleaner(object): + """A cleaner for router resources.""" + + def __init__(self, neutron_client, router_names): + self.neutron_client = neutron_client + LOG.info('Discovering routers...') + all_routers = self.neutron_client.list_routers()['routers'] + self.routers = [] + self.ports = [] + self.routes = [] + rtr_ids = [] + for rtr in all_routers: + rtrname = rtr['name'] + for name in router_names: + if rtrname == name: + self.routers.append(rtr) + rtr_ids.append(rtr['id']) + + LOG.info('Discovering router routes for router %s...', rtr['name']) + all_routes = rtr['routes'] + for route in all_routes: + LOG.info("destination: %s, nexthop: %s", route['destination'], + route['nexthop']) + + LOG.info('Discovering router ports for router %s...', rtr['name']) + self.ports.extend(self.neutron_client.list_ports(device_id=rtr['id'])['ports']) + break + + def get_resource_list(self): + res_list = [["Router", rtr['name'], rtr['id']] for rtr in self.routers] + return res_list + + def get_cleaner_code(self): + return "router" + + def clean_needed(self, clean_options): + if clean_options is None: + return True + code = self.get_cleaner_code() + return code[0] in clean_options + + def clean(self, clean_options): + if self.clean_needed(clean_options): + # associated routes needs to be deleted before deleting routers + for rtr in self.routers: + LOG.info("Deleting routes for %s...", rtr['name']) + try: + body = { + 'router': { + 'routes': [] + } + } + self.neutron_client.update_router(rtr['id'], body) + except Exception: + LOG.exception("Router routes deletion failed") + LOG.info("Deleting ports for %s...", rtr['name']) + try: + for port in self.ports: + body = { + 'port_id': port['id'] + } + self.neutron_client.remove_interface_router(rtr['id'], body) + except Exception: + LOG.exception("Router ports deletion failed") + LOG.info("Deleting router %s...", rtr['name']) + try: + self.neutron_client.delete_router(rtr['id']) + except Exception: + LOG.exception("Router deletion failed") + class FlavorCleaner(object): """Cleaner for NFVbench flavor.""" @@ -130,26 +212,45 @@ class FlavorCleaner(object): return [['Flavor', self.name, self.flavor.id]] return None - def clean(self): - if self.flavor: - LOG.info("Deleting flavor %s...", self.flavor.name) - try: - self.flavor.delete() - except Exception: - LOG.exception("Flavor deletion failed") + def get_cleaner_code(self): + return "flavor" + + def clean_needed(self, clean_options): + if clean_options is None: + return True + code = self.get_cleaner_code() + return code[0] in clean_options + + def clean(self, clean_options): + if self.clean_needed(clean_options): + if self.flavor: + LOG.info("Deleting flavor %s...", self.flavor.name) + try: + self.flavor.delete() + except Exception: + LOG.exception("Flavor deletion failed") + class Cleaner(object): """Cleaner for all NFVbench resources.""" def __init__(self, config): - cred = credentials.Credentials(config.openrc_file, None, False) + cred = credentials.Credentials(config.openrc_file, config.clouds_detail, None, False) session = cred.get_session() self.neutron_client = nclient.Client('2.0', session=session) self.nova_client = Client(2, session=session) network_names = [inet['name'] for inet in config.internal_networks.values()] + network_names.extend([inet['name'] for inet in config.edge_networks.values()]) + network_names.append(config.management_network['name']) + network_names.append(config.floating_network['name']) + router_names = [rtr['router_name'] for rtr in config.edge_networks.values()] + # add idle networks as well + if config.idle_networks.name: + network_names.append(config.idle_networks.name) self.cleaners = [ComputeCleaner(self.nova_client, config.loop_vm_name), FlavorCleaner(self.nova_client, config.flavor_type), - NetworkCleaner(self.neutron_client, network_names)] + NetworkCleaner(self.neutron_client, network_names), + RouterCleaner(self.neutron_client, router_names)] def show_resources(self): """Show all NFVbench resources.""" @@ -168,11 +269,37 @@ class Cleaner(object): def clean(self, prompt): """Clean all resources.""" - LOG.info("NFVbench will delete all resources shown...") + LOG.info("NFVbench will delete resources shown...") + clean_options = None if prompt: - answer = raw_input("Are you sure? (y/n) ") + answer = input("Do you want to delete all ressources? (y/n) ") if answer.lower() != 'y': - LOG.info("Exiting without deleting any resource") - sys.exit(0) + print("What kind of resources do you want to delete?") + all_option = "" + all_option_codes = [] + for cleaner in self.cleaners: + code = cleaner.get_cleaner_code() + print(("%s: %s" % (code[0], code))) + all_option += code[0] + all_option_codes.append(code) + print(("a: all resources - a shortcut for '%s'" % all_option)) + all_option_codes.append("all resources") + print("q: quit") + answer_res = input(":").lower() + # Check only first character because answer_res can be "flavor" and it is != all + if answer_res[0] == "a": + clean_options = all_option + elif answer_res[0] != 'q': + # if user write complete code instead of shortcuts + # Get only first character of clean code to avoid false clean request + # i.e "networks and ports" and "router" have 1 letter in common and router clean + # will be called even if user ask for networks and ports + if answer_res in all_option_codes: + clean_options = answer_res[0] + else: + clean_options = answer_res + else: + LOG.info("Exiting without deleting any resource") + sys.exit(0) for cleaner in self.cleaners: - cleaner.clean() + cleaner.clean(clean_options) diff --git a/nfvbench/compute.py b/nfvbench/compute.py index 556ade4..883dc28 100644 --- a/nfvbench/compute.py +++ b/nfvbench/compute.py @@ -23,8 +23,10 @@ except ImportError: from glanceclient.v1.apiclient.exceptions import NotFound as GlanceImageNotFound import keystoneauth1 import novaclient +from novaclient.exceptions import NotFound -from log import LOG +from .log import LOG +from . import utils class Compute(object): @@ -50,7 +52,7 @@ class Compute(object): retry = 0 try: # check image is file/url based. - with open(image_file) as f_image: + with open(image_file, 'rb') as f_image: img = self.glance_client.images.create(name=str(final_image_name), disk_format="qcow2", container_format="bare", @@ -95,6 +97,24 @@ class Compute(object): return True + def image_multiqueue_enabled(self, img): + """Check if multiqueue property is enabled on given image.""" + try: + return img['hw_vif_multiqueue_enabled'] == 'true' + except KeyError: + return False + + def image_set_multiqueue(self, img, enabled): + """Set multiqueue property as enabled or disabled on given image.""" + cur_mqe = self.image_multiqueue_enabled(img) + LOG.info('Image %s hw_vif_multiqueue_enabled property is "%s"', + img.name, str(cur_mqe).lower()) + if cur_mqe != enabled: + mqe = str(enabled).lower() + self.glance_client.images.update(img.id, hw_vif_multiqueue_enabled=mqe) + img['hw_vif_multiqueue_enabled'] = mqe + LOG.info('Image %s hw_vif_multiqueue_enabled property changed to "%s"', img.name, mqe) + # Create a server instance with name vmname # and check that it gets into the ACTIVE state def create_server(self, vmname, image, flavor, key_name, @@ -129,9 +149,17 @@ class Compute(object): servers_list = self.novaclient.servers.list() return servers_list + def instance_exists(self, server): + try: + self.novaclient.servers.get(server) + except NotFound: + return False + return True + def delete_server(self, server): """Delete a server from its object reference.""" - self.novaclient.servers.delete(server) + utils.delete_server(self.novaclient, server) + utils.waiting_servers_deletion(self.novaclient, [server]) def find_flavor(self, flavor_type): """Find a flavor by name.""" diff --git a/nfvbench/config.py b/nfvbench/config.py index 5feeda5..8e77127 100644 --- a/nfvbench/config.py +++ b/nfvbench/config.py @@ -16,19 +16,19 @@ from attrdict import AttrDict import yaml -from log import LOG +from .log import LOG def config_load(file_name, from_cfg=None, whitelist_keys=None): """Load a yaml file into a config dict, merge with from_cfg if not None The config file content taking precedence in case of duplicate """ try: - with open(file_name) as fileobj: + with open(file_name, encoding="utf-8") as fileobj: cfg = AttrDict(yaml.safe_load(fileobj)) except IOError: raise Exception("Configuration file at '{}' was not found. Please use correct path " "and verify it is visible to container if you run nfvbench in container." - .format(file_name)) + .format(file_name)) from IOError if from_cfg: if not whitelist_keys: @@ -43,10 +43,16 @@ def config_loads(cfg_text, from_cfg=None, whitelist_keys=None): """Same as config_load but load from a string """ try: - cfg = AttrDict(yaml.load(cfg_text)) + cfg = AttrDict(yaml.safe_load(cfg_text)) except TypeError: # empty string cfg = AttrDict() + except ValueError as e: + # In case of wrong path or file not readable or string not well formatted + LOG.error("String %s is not well formatted. Please verify your yaml/json string. " + "If string is a file path, file was not found. Please use correct path and " + "verify it is visible to container if you run nfvbench in container.", cfg_text) + raise Exception(e) from e if from_cfg: if not whitelist_keys: whitelist_keys = [] @@ -58,7 +64,7 @@ def config_loads(cfg_text, from_cfg=None, whitelist_keys=None): def _validate_config(subset, superset, whitelist_keys): def get_err_config(subset, superset): result = {} - for k, v in subset.items(): + for k, v in list(subset.items()): if k not in whitelist_keys: if k not in superset: result.update({k: v}) diff --git a/nfvbench/config_plugin.py b/nfvbench/config_plugin.py index a6759cd..86e5505 100644 --- a/nfvbench/config_plugin.py +++ b/nfvbench/config_plugin.py @@ -18,19 +18,15 @@ This module is used to override the configuration with platform specific constraints and extensions """ import abc -import specs +from . import specs -class ConfigPluginBase(object): +class ConfigPluginBase(object, metaclass=abc.ABCMeta): """Base class for config plugins.""" - __metaclass__ = abc.ABCMeta - class InitializationFailure(Exception): """Used in case of any init failure.""" - pass - def __init__(self, config): """Save configuration.""" if not config: @@ -95,9 +91,8 @@ class ConfigPlugin(ConfigPluginBase): """Return RunSpec for given platform.""" return specs.RunSpec(config.no_vswitch_access, openstack_spec) - def validate_config(self, config, openstack_spec): + def validate_config(self, cfg, openstack_spec): """Nothing to validate by default.""" - pass def prepare_results_config(self, cfg): """Nothing to add the results by default.""" diff --git a/nfvbench/credentials.py b/nfvbench/credentials.py index 530ad69..7c48879 100644 --- a/nfvbench/credentials.py +++ b/nfvbench/credentials.py @@ -21,32 +21,40 @@ import getpass from keystoneauth1.identity import v2 from keystoneauth1.identity import v3 from keystoneauth1 import session -from log import LOG +import openstack +from keystoneclient.exceptions import HTTPClientError + +from .log import LOG class Credentials(object): def get_session(self): - dct = { - 'username': self.rc_username, - 'password': self.rc_password, - 'auth_url': self.rc_auth_url - } - auth = None - - if self.rc_identity_api_version == 3: - dct.update({ - 'project_name': self.rc_project_name, - 'project_domain_name': self.rc_project_domain_name, - 'user_domain_name': self.rc_user_domain_name - }) - auth = v3.Password(**dct) + + if self.clouds_detail: + connection = openstack.connect(cloud=self.clouds_detail) + cred_session = connection.session else: - dct.update({ - 'tenant_name': self.rc_tenant_name - }) - auth = v2.Password(**dct) - return session.Session(auth=auth, verify=self.rc_cacert) + dct = { + 'username': self.rc_username, + 'password': self.rc_password, + 'auth_url': self.rc_auth_url + } + + if self.rc_identity_api_version == 3: + dct.update({ + 'project_name': self.rc_project_name, + 'project_domain_name': self.rc_project_domain_name, + 'user_domain_name': self.rc_user_domain_name + }) + auth = v3.Password(**dct) + else: + dct.update({ + 'tenant_name': self.rc_tenant_name + }) + auth = v2.Password(**dct) + cred_session = session.Session(auth=auth, verify=self.rc_cacert) + return cred_session def __parse_openrc(self, file): export_re = re.compile('export OS_([A-Z_]*)="?(.*)') @@ -91,11 +99,28 @@ class Credentials(object): elif name == "PROJECT_DOMAIN_NAME": self.rc_project_domain_name = value + # /users URL returns exception (HTTP 403) if user is not admin. + # try first without the version in case session already has it in + # Return HTTP 200 if user is admin + def __user_is_admin(self, url): + is_admin = False + try: + # check if user has admin role in OpenStack project + filter = {'service_type': 'identity', + 'interface': 'public'} + self.get_session().get(url, endpoint_filter=filter) + is_admin = True + except HTTPClientError as exc: + if exc.http_status == 403: + LOG.warning( + "User is not admin, no permission to list user roles. Exception: %s", exc) + return is_admin + # # Read a openrc file and take care of the password # The 2 args are passed from the command line and can be None # - def __init__(self, openrc_file, pwd=None, no_env=False): + def __init__(self, openrc_file, clouds_detail, pwd=None, no_env=False): self.rc_password = None self.rc_username = None self.rc_tenant_name = None @@ -105,19 +130,22 @@ class Credentials(object): self.rc_user_domain_name = None self.rc_project_domain_name = None self.rc_project_name = None - self.rc_identity_api_version = 2 + self.rc_identity_api_version = 3 + self.is_admin = False + self.clouds_detail = clouds_detail success = True if openrc_file: if isinstance(openrc_file, str): if os.path.exists(openrc_file): - self.__parse_openrc(open(openrc_file)) + with open(openrc_file, encoding="utf-8") as rc_file: + self.__parse_openrc(rc_file) else: LOG.error('Error: rc file does not exist %s', openrc_file) success = False else: self.__parse_openrc(openrc_file) - elif not no_env: + elif not clouds_detail and not no_env: # no openrc file passed - we assume the variables have been # sourced by the calling shell # just check that they are present @@ -152,15 +180,27 @@ class Credentials(object): # always override with CLI argument if provided - if pwd: - self.rc_password = pwd - # if password not know, check from env variable - elif self.rc_auth_url and not self.rc_password and success: - if 'OS_PASSWORD' in os.environ and not no_env: - self.rc_password = os.environ['OS_PASSWORD'] - else: - # interactively ask for password - self.rc_password = getpass.getpass( - 'Please enter your OpenStack Password: ') - if not self.rc_password: - self.rc_password = "" + if not clouds_detail: + if pwd: + self.rc_password = pwd + # if password not know, check from env variable + elif self.rc_auth_url and not self.rc_password and success: + if 'OS_PASSWORD' in os.environ and not no_env: + self.rc_password = os.environ['OS_PASSWORD'] + else: + # interactively ask for password + self.rc_password = getpass.getpass( + 'Please enter your OpenStack Password: ') + if not self.rc_password: + self.rc_password = "" + + + try: + # /users URL returns exception (HTTP 403) if user is not admin. + # try first without the version in case session already has it in + # Return HTTP 200 if user is admin + self.is_admin = self.__user_is_admin('/users') or self.__user_is_admin( + '/v2/users') or self.__user_is_admin('/v3/users') + except Exception as e: + LOG.warning("Error occurred during Openstack API access. " + "Unable to check user is admin. Exception: %s", e) diff --git a/nfvbench/factory.py b/nfvbench/factory.py index cad5a43..0d4b042 100644 --- a/nfvbench/factory.py +++ b/nfvbench/factory.py @@ -15,8 +15,8 @@ # """Factory for creating worker and config plugin instances.""" -import chain_workers as workers -from config_plugin import ConfigPlugin +from . import chain_workers as workers +from .config_plugin import ConfigPlugin class BasicFactory(object): diff --git a/nfvbench/fluentd.py b/nfvbench/fluentd.py index ad0ea34..535d640 100644 --- a/nfvbench/fluentd.py +++ b/nfvbench/fluentd.py @@ -114,7 +114,7 @@ class FluentLogHandler(logging.Handler): def __get_highest_level(self): if self.__error_counter > 0: return logging.ERROR - elif self.__warning_counter > 0: + if self.__warning_counter > 0: return logging.WARNING return logging.INFO @@ -122,7 +122,7 @@ class FluentLogHandler(logging.Handler): highest_level = self.__get_highest_level() if highest_level == logging.INFO: return "GOOD RUN" - elif highest_level == logging.WARNING: + if highest_level == logging.WARNING: return "RUN WITH WARNINGS" return "RUN WITH ERRORS" diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py index cdb99c8..891b2bb 100644 --- a/nfvbench/nfvbench.py +++ b/nfvbench/nfvbench.py @@ -24,23 +24,24 @@ import sys import traceback from attrdict import AttrDict +from logging import FileHandler import pbr.version from pkg_resources import resource_string -from __init__ import __version__ -from chain_runner import ChainRunner -from cleanup import Cleaner -from config import config_load -from config import config_loads -import credentials as credentials -from fluentd import FluentLogHandler -import log -from log import LOG -from nfvbenchd import WebSocketIoServer -from specs import ChainType -from specs import Specs -from summarizer import NFVBenchSummarizer -import utils +from .__init__ import __version__ +from .chain_runner import ChainRunner +from .cleanup import Cleaner +from .config import config_load +from .config import config_loads +from . import credentials +from .fluentd import FluentLogHandler +from . import log +from .log import LOG +from .nfvbenchd import WebServer +from .specs import ChainType +from .specs import Specs +from .summarizer import NFVBenchSummarizer +from . import utils fluent_logger = None @@ -59,8 +60,8 @@ class NFVBench(object): self.config_plugin = config_plugin self.factory = factory self.notifier = notifier - self.cred = credentials.Credentials(config.openrc_file, None, False) \ - if config.openrc_file else None + self.cred = credentials.Credentials(config.openrc_file, config.clouds_detail, None, False) \ + if config.openrc_file or config.clouds_detail else None self.chain_runner = None self.specs = Specs() self.specs.set_openstack_spec(openstack_spec) @@ -70,7 +71,13 @@ class NFVBench(object): def set_notifier(self, notifier): self.notifier = notifier - def run(self, opts, args): + def run(self, opts, args, dry_run=False): + """This run() method is called for every NFVbench benchmark request. + + In CLI mode, this method is called only once per invocation. + In REST server mode, this is called once per REST POST request + On dry_run, show the running config in json format then exit + """ status = NFVBench.STATUS_OK result = None message = '' @@ -82,6 +89,18 @@ class NFVBench(object): try: # recalc the running config based on the base config and options for this run self._update_config(opts) + + if dry_run: + print((json.dumps(self.config, sort_keys=True, indent=4))) + sys.exit(0) + + # check that an empty openrc file (no OpenStack) is only allowed + # with EXT chain + if (not self.config.openrc_file and not self.config.clouds_detail) and \ + self.config.service_chain != ChainType.EXT: + raise Exception("openrc_file or clouds_detail in the configuration is required" + " for PVP/PVVP chains") + self.specs.set_run_spec(self.config_plugin.get_run_spec(self.config, self.specs.openstack)) self.chain_runner = ChainRunner(self.config, @@ -153,7 +172,9 @@ class NFVBench(object): self.config.service_chain, self.config.service_chain_count, self.config.flow_count, - self.config.frame_sizes) + self.config.frame_sizes, + self.config.user_id, + self.config.group_id) def _update_config(self, opts): """Recalculate the running config based on the base config and opts. @@ -161,17 +182,56 @@ class NFVBench(object): Sanity check on the config is done here as well. """ self.config = AttrDict(dict(self.base_config)) + # Update log file handler if needed after a config update (REST mode) + if 'log_file' in opts: + if opts['log_file']: + (path, _filename) = os.path.split(opts['log_file']) + if not os.path.exists(path): + LOG.warning( + 'Path %s does not exist. Please verify root path is shared with host. Path ' + 'will be created.', path) + os.makedirs(path) + LOG.info('%s is created.', path) + if not any(isinstance(h, FileHandler) for h in log.getLogger().handlers): + log.add_file_logger(opts['log_file']) + else: + for h in log.getLogger().handlers: + if isinstance(h, FileHandler) and h.baseFilename != opts['log_file']: + # clean log file handler + log.getLogger().removeHandler(h) + log.add_file_logger(opts['log_file']) + self.config.update(opts) config = self.config config.service_chain = config.service_chain.upper() config.service_chain_count = int(config.service_chain_count) if config.l2_loopback: - # force the number of chains to be 1 in case of l2 loopback - config.service_chain_count = 1 + # force the number of chains to be 1 in case of untagged l2 loopback + # (on the other hand, multiple L2 vlan tagged service chains are allowed) + if not config.vlan_tagging: + config.service_chain_count = 1 config.service_chain = ChainType.EXT config.no_arp = True LOG.info('Running L2 loopback: using EXT chain/no ARP') + + # allow oversized vlan lists, just clip them + try: + vlans = [list(v) for v in config.vlans] + for v in vlans: + del v[config.service_chain_count:] + config.vlans = vlans + except Exception: + pass + + # traffic profile override options + if 'frame_sizes' in opts: + unidir = False + if 'unidir' in opts: + unidir = opts['unidir'] + override_custom_traffic(config, opts['frame_sizes'], unidir) + LOG.info("Frame size has been set to %s for current configuration", opts['frame_sizes']) + config.flow_count = utils.parse_flow_count(config.flow_count) required_flow_count = config.service_chain_count * 2 if config.flow_count < required_flow_count: @@ -183,6 +243,13 @@ class NFVBench(object): if config.flow_count % 2: config.flow_count += 1 + # Possibly adjust the cache size + if config.cache_size < 0: + config.cache_size = config.flow_count + + # The size must be capped to 10000 (where does this limit come from?) + config.cache_size = min(config.cache_size, 10000) + config.duration_sec = float(config.duration_sec) config.interval_sec = float(config.interval_sec) config.pause_sec = float(config.pause_sec) @@ -192,6 +259,8 @@ class NFVBench(object): if config.openrc_file: config.openrc_file = os.path.expanduser(config.openrc_file) + if config.flavor.vcpus < 2: + raise Exception("Flavor vcpus must be >= 2") config.ndr_run = (not config.no_traffic and 'ndr' in config.rate.strip().lower().split('_')) @@ -213,16 +282,41 @@ class NFVBench(object): raise Exception('Please provide existing path for storing results in JSON file. ' 'Path used: {path}'.format(path=config.std_json_path)) - # VxLAN sanity checks - if config.vxlan: + # Check that multiqueue is between 1 and 8 (8 is the max allowed by libvirt/qemu) + if config.vif_multiqueue_size < 1 or config.vif_multiqueue_size > 8: + raise Exception('vif_multiqueue_size (%d) must be in [1..8]' % + config.vif_multiqueue_size) + + # VxLAN and MPLS sanity checks + if config.vxlan or config.mpls: if config.vlan_tagging: config.vlan_tagging = False - LOG.info('VxLAN: vlan_tagging forced to False ' + config.no_latency_streams = True + config.no_latency_stats = True + config.no_flow_stats = True + LOG.info('VxLAN or MPLS: vlan_tagging forced to False ' '(inner VLAN tagging must be disabled)') self.config_plugin.validate_config(config, self.specs.openstack) +def bool_arg(x): + """Argument type to be used in parser.add_argument() + When a boolean like value is expected to be given + """ + return (str(x).lower() != 'false') \ + and (str(x).lower() != 'no') \ + and (str(x).lower() != '0') + + +def int_arg(x): + """Argument type to be used in parser.add_argument() + When an integer type value is expected to be given + (returns 0 if argument is invalid, hexa accepted) + """ + return int(x, 0) + + def _parse_opts_from_cli(): parser = argparse.ArgumentParser() @@ -239,10 +333,8 @@ def _parse_opts_from_cli(): parser.add_argument('--server', dest='server', default=None, - action='store', - metavar='<http_root_pathname>', - help='Run nfvbench in server mode and pass' - ' the HTTP root folder full pathname') + action='store_true', + help='Run nfvbench in server mode') parser.add_argument('--host', dest='host', action='store', @@ -309,6 +401,17 @@ def _parse_opts_from_cli(): action='store', help='Traffic generator profile to use') + parser.add_argument('-l3', '--l3-router', dest='l3_router', + default=None, + action='store_true', + help='Use L3 neutron routers to handle traffic') + + parser.add_argument('-garp', '--gratuitous-arp', dest='periodic_gratuitous_arp', + default=None, + action='store_true', + help='Use gratuitous ARP to maintain session between TG ' + 'and L3 routers to handle traffic') + parser.add_argument('-0', '--no-traffic', dest='no_traffic', default=None, action='store_true', @@ -320,6 +423,12 @@ def _parse_opts_from_cli(): help='Do not use ARP to find MAC addresses, ' 'instead use values in config file') + parser.add_argument('--loop-vm-arp', dest='loop_vm_arp', + default=None, + action='store_true', + help='Use ARP to find MAC addresses ' + 'instead of using values from TRex ports (VPP forwarder only)') + parser.add_argument('--no-vswitch-access', dest='no_vswitch_access', default=None, action='store_true', @@ -330,6 +439,11 @@ def _parse_opts_from_cli(): action='store_true', help='Enable VxLan encapsulation') + parser.add_argument('--mpls', dest='mpls', + default=None, + action='store_true', + help='Enable MPLS encapsulation') + parser.add_argument('--no-cleanup', dest='no_cleanup', default=None, action='store_true', @@ -345,6 +459,11 @@ def _parse_opts_from_cli(): action='store_true', help='Cleanup NFVbench resources (do not prompt)') + parser.add_argument('--restart', dest='restart', + default=None, + action='store_true', + help='Restart TRex server') + parser.add_argument('--json', dest='json', action='store', help='store results in json format file', @@ -362,10 +481,15 @@ def _parse_opts_from_cli(): action='store_true', help='print the default config in yaml format (unedited)') + parser.add_argument('--show-pre-config', dest='show_pre_config', + default=None, + action='store_true', + help='print the config in json format (cfg file applied)') + parser.add_argument('--show-config', dest='show_config', default=None, action='store_true', - help='print the running config in json format') + help='print the running config in json format (final)') parser.add_argument('-ss', '--show-summary', dest='summary', action='store', @@ -403,8 +527,109 @@ def _parse_opts_from_cli(): parser.add_argument('--l2-loopback', '--l2loopback', dest='l2_loopback', action='store', - metavar='<vlan>', - help='Port to port or port to switch to port L2 loopback with VLAN id') + metavar='<vlan(s)|no-tag|true|false>', + help='Port to port or port to switch to port L2 loopback ' + 'tagged with given VLAN id(s) or not (given \'no-tag\') ' + '\'true\': use current vlans; \'false\': disable this mode.') + + parser.add_argument('--i40e-mixed', dest='i40e_mixed', + action='store', + default=None, + metavar='<ignore,check,unbind>', + help='TRex behavior when dealing with a i40e network card driver' + ' [ https://trex-tgn.cisco.com/youtrack/issue/trex-528 ]') + + parser.add_argument('--user-info', dest='user_info', + action='append', + metavar='<data>', + help='Custom data to be included as is ' + 'in the json report config branch - ' + ' example, pay attention! no space: ' + '--user-info=\'{"status":"explore","description":' + '{"target":"lab","ok":true,"version":2020}}\' - ' + 'this option may be repeated; given data will be merged.') + + parser.add_argument('--vlan-tagging', dest='vlan_tagging', + type=bool_arg, + metavar='<boolean>', + action='store', + default=None, + help='Override the NFVbench \'vlan_tagging\' parameter') + + parser.add_argument('--intf-speed', dest='intf_speed', + metavar='<speed>', + action='store', + default=None, + help='Override the NFVbench \'intf_speed\' ' + 'parameter (e.g. 10Gbps, auto, 16.72Gbps)') + + parser.add_argument('--cores', dest='cores', + type=int_arg, + metavar='<number>', + action='store', + default=None, + help='Override the T-Rex \'cores\' parameter') + + parser.add_argument('--cache-size', dest='cache_size', + type=int_arg, + metavar='<size>', + action='store', + default=None, + help='Specify the FE cache size (default: 0, flow-count if < 0)') + + parser.add_argument('--service-mode', dest='service_mode', + action='store_true', + default=None, + help='Enable T-Rex service mode (for debugging purpose)') + + parser.add_argument('--no-e2e-check', dest='no_e2e_check', + action='store_true', + default=None, + help='Skip "end to end" connectivity check (on test purpose)') + + parser.add_argument('--no-flow-stats', dest='no_flow_stats', + action='store_true', + default=None, + help='Disable additional flow stats (on high load traffic)') + + parser.add_argument('--no-latency-stats', dest='no_latency_stats', + action='store_true', + default=None, + help='Disable flow stats for latency traffic') + + parser.add_argument('--no-latency-streams', dest='no_latency_streams', + action='store_true', + default=None, + help='Disable latency measurements (no streams)') + + parser.add_argument('--user-id', dest='user_id', + type=int_arg, + metavar='<uid>', + action='store', + default=None, + help='Change json/log files ownership with this user (int)') + + parser.add_argument('--group-id', dest='group_id', + type=int_arg, + metavar='<gid>', + action='store', + default=None, + help='Change json/log files ownership with this group (int)') + + parser.add_argument('--show-trex-log', dest='show_trex_log', + default=None, + action='store_true', + help='Show the current TRex local server log file contents' + ' => diagnostic/help in case of configuration problems') + + parser.add_argument('--debug-mask', dest='debug_mask', + type=int_arg, + metavar='<mask>', + action='store', + default=None, + help='General purpose register (debugging flags), ' + 'the hexadecimal notation (0x...) is accepted.' + 'Designed for development needs (default: 0).') opts, unknown_opts = parser.parse_known_args() return opts, unknown_opts @@ -470,13 +695,20 @@ def main(): log.setup() # load default config file config, default_cfg = load_default_config() + # possibly override the default user_id & group_id values + if 'USER_ID' in os.environ: + config.user_id = int(os.environ['USER_ID']) + if 'GROUP_ID' in os.environ: + config.group_id = int(os.environ['GROUP_ID']) + # create factory for platform specific classes try: factory_module = importlib.import_module(config['factory_module']) factory = getattr(factory_module, config['factory_class'])() except AttributeError: raise Exception("Requested factory module '{m}' or class '{c}' was not found." - .format(m=config['factory_module'], c=config['factory_class'])) + .format(m=config['factory_module'], + c=config['factory_class'])) from AttributeError # create config plugin for this platform config_plugin = factory.get_config_plugin_class()(config) config = config_plugin.get_config() @@ -485,26 +717,40 @@ def main(): log.set_level(debug=opts.debug) if opts.version: - print pbr.version.VersionInfo('nfvbench').version_string_with_vcs() + print((pbr.version.VersionInfo('nfvbench').version_string_with_vcs())) sys.exit(0) if opts.summary: - with open(opts.summary) as json_data: + with open(opts.summary, encoding="utf-8") as json_data: result = json.load(json_data) if opts.user_label: result['config']['user_label'] = opts.user_label - print NFVBenchSummarizer(result, fluent_logger) + print((NFVBenchSummarizer(result, fluent_logger))) sys.exit(0) # show default config in text/yaml format if opts.show_default_config: - print default_cfg + print((default_cfg.decode("utf-8"))) sys.exit(0) + # dump the contents of the trex log file + if opts.show_trex_log: + try: + with open('/tmp/trex.log', encoding="utf-8") as trex_log_file: + print(trex_log_file.read(), end="") + except FileNotFoundError: + print("No TRex log file found!") + sys.exit(0) + + # mask info logging in case of further config dump + if opts.show_config or opts.show_pre_config: + LOG.setLevel(log.logging.WARNING) + config.name = '' if opts.config: # do not check extra_specs in flavor as it can contain any key/value pairs - whitelist_keys = ['extra_specs'] + # the same principle applies also to the optional user_info open property + whitelist_keys = ['extra_specs', 'user_info'] # override default config options with start config at path parsed from CLI # check if it is an inline yaml/json config or a file name if os.path.isfile(opts.config): @@ -515,6 +761,11 @@ def main(): LOG.info('Loading configuration string: %s', opts.config) config = config_loads(opts.config, config, whitelist_keys) + # show current config in json format (before CLI overriding) + if opts.show_pre_config: + print((json.dumps(config, sort_keys=True, indent=4))) + sys.exit(0) + # setup the fluent logger as soon as possible right after the config plugin is called, # if there is any logging or result tag is set then initialize the fluent logger for fluentd in config.fluentd: @@ -526,40 +777,124 @@ def main(): # traffic profile override options override_custom_traffic(config, opts.frame_sizes, opts.unidir) - # copy over cli options that are used in config + # Copy over some of the cli options that are used in config. + # This explicit copy is sometimes necessary + # because some early evaluation depends on them + # and cannot wait for _update_config() coming further. + # It is good practice then to set them to None (<=> done) + # and even required if a specific conversion is performed here + # that would be corrupted by a default update (simple copy). + # On the other hand, some excessive assignments have been removed + # from here, since the _update_config() procedure does them well. + config.generator_profile = opts.generator_profile - if opts.sriov: + if opts.sriov is not None: config.sriov = True - if opts.log_file: + opts.sriov = None + if opts.log_file is not None: config.log_file = opts.log_file - if opts.service_chain: + opts.log_file = None + if opts.user_id is not None: + config.user_id = opts.user_id + opts.user_id = None + if opts.group_id is not None: + config.group_id = opts.group_id + opts.group_id = None + if opts.service_chain is not None: config.service_chain = opts.service_chain - if opts.service_chain_count: - config.service_chain_count = opts.service_chain_count - if opts.no_vswitch_access: - config.no_vswitch_access = opts.no_vswitch_access - if opts.hypervisor: + opts.service_chain = None + if opts.hypervisor is not None: # can be any of 'comp1', 'nova:', 'nova:comp1' config.compute_nodes = opts.hypervisor - if opts.vxlan: - config.vxlan = True + opts.hypervisor = None + if opts.debug_mask is not None: + config.debug_mask = opts.debug_mask + opts.debug_mask = None + + # convert 'user_info' opt from json string to dictionnary + # and merge the result with the current config dictionnary + if opts.user_info is not None: + for user_info_json in opts.user_info: + user_info_dict = json.loads(user_info_json) + if config.user_info: + config.user_info = config.user_info + user_info_dict + else: + config.user_info = user_info_dict + opts.user_info = None # port to port loopback (direct or through switch) - if opts.l2_loopback: - config.l2_loopback = True - if config.service_chain != ChainType.EXT: - LOG.info('Changing service chain type to EXT') - config.service_chain = ChainType.EXT - if not config.no_arp: - LOG.info('Disabling ARP') - config.no_arp = True - config.vlans = [int(opts.l2_loopback), int(opts.l2_loopback)] - LOG.info('Running L2 loopback: using EXT chain/no ARP') - - if opts.use_sriov_middle_net: - if (not config.sriov) or (config.service_chain != ChainType.PVVP): - raise Exception("--use-sriov-middle-net is only valid for PVVP with SRIOV") - config.use_sriov_middle_net = True + # we accept the following syntaxes for the CLI argument + # 'false' : mode not enabled + # 'true' : mode enabled with currently defined vlan IDs + # 'no-tag' : mode enabled with no vlan tagging + # <vlan IDs>: mode enabled using the given (pair of) vlan ID lists + # - If present, a '_' char will separate left an right ports lists + # e.g. 'a_x' => vlans: [[a],[x]] + # 'a,b,c_x,y,z' => [[a,b,c],[x,y,z]] + # - Otherwise the given vlan ID list applies to both sides + # e.g. 'a' => vlans: [[a],[a]] + # 'a,b' => [[a,b],[a,b]] + # - Vlan lists size needs to be at least the actual SCC value + # - Unless overriden in CLI opts, config.service_chain_count + # is adjusted to the size of the VLAN ID lists given here. + + if opts.l2_loopback is not None: + arg_pair = opts.l2_loopback.lower().split('_') + if arg_pair[0] == 'false': + config.l2_loopback = False + else: + config.l2_loopback = True + if config.service_chain != ChainType.EXT: + LOG.info('Changing service chain type to EXT') + config.service_chain = ChainType.EXT + if not config.no_arp: + LOG.info('Disabling ARP') + config.no_arp = True + if arg_pair[0] == 'true': + pass + else: + # here explicit (not)tagging is not CLI overridable + opts.vlan_tagging = None + if arg_pair[0] == 'no-tag': + config.vlan_tagging = False + else: + config.vlan_tagging = True + if len(arg_pair) == 1 or not arg_pair[1]: + arg_pair = [arg_pair[0], arg_pair[0]] + vlans = [[], []] + + def append_vlan(port, vlan_id): + # a vlan tag value must be in [0..4095] + if vlan_id not in range(0, 4096): + raise ValueError + vlans[port].append(vlan_id) + try: + for port in [0, 1]: + vlan_ids = arg_pair[port].split(',') + for vlan_id in vlan_ids: + append_vlan(port, int(vlan_id)) + if len(vlans[0]) != len(vlans[1]): + raise ValueError + except ValueError: + # at least one invalid tag => no tagging + config.vlan_tagging = False + if config.vlan_tagging: + config.vlans = vlans + # force service chain count if not CLI overriden + if opts.service_chain_count is None: + config.service_chain_count = len(vlans[0]) + opts.l2_loopback = None + + if config.i40e_mixed is None: + config.i40e_mixed = 'ignore' + if config.use_sriov_middle_net is None: + config.use_sriov_middle_net = False + if opts.use_sriov_middle_net is not None: + config.use_sriov_middle_net = opts.use_sriov_middle_net + opts.use_sriov_middle_net = None + if (config.use_sriov_middle_net and ( + (not config.sriov) or (config.service_chain != ChainType.PVVP))): + raise Exception("--use-sriov-middle-net is only valid for PVVP with SRIOV") if config.sriov and config.service_chain != ChainType.EXT: # if sriov is requested (does not apply to ext chains) @@ -569,19 +904,6 @@ def main(): if config.service_chain == ChainType.PVVP and config.use_sriov_middle_net: check_physnet("middle", config.internal_networks.middle) - # show running config in json format - if opts.show_config: - print json.dumps(config, sort_keys=True, indent=4) - sys.exit(0) - - # check that an empty openrc file (no OpenStack) is only allowed - # with EXT chain - if not config.openrc_file: - if config.service_chain == ChainType.EXT: - LOG.info('EXT chain with OpenStack mode disabled') - else: - raise Exception("openrc_file is empty in the configuration and is required") - # update the config in the config plugin as it might have changed # in a copy of the dict (config plugin still holds the original dict) config_plugin.set_config(config) @@ -592,6 +914,13 @@ def main(): # add file log if requested if config.log_file: log.add_file_logger(config.log_file) + # possibly change file ownership + uid = config.user_id + gid = config.group_id + if gid is None: + gid = uid + if uid is not None: + os.chown(config.log_file, uid, gid) openstack_spec = config_plugin.get_openstack_spec() if config.openrc_file \ else None @@ -599,19 +928,16 @@ def main(): nfvbench_instance = NFVBench(config, openstack_spec, config_plugin, factory) if opts.server: - if os.path.isdir(opts.server): - server = WebSocketIoServer(opts.server, nfvbench_instance, fluent_logger) - nfvbench_instance.set_notifier(server) - try: - port = int(opts.port) - except ValueError: - server.run(host=opts.host) - else: - server.run(host=opts.host, port=port) + server = WebServer(nfvbench_instance, fluent_logger) + try: + port = int(opts.port) + except ValueError: + server.run(host=opts.host) else: - print 'Invalid HTTP root directory: ' + opts.server - sys.exit(1) + server.run(host=opts.host, port=port) + # server.run() should never return else: + dry_run = opts.show_config with utils.RunLock(): run_summary_required = True if unknown_opts: @@ -620,10 +946,10 @@ def main(): raise Exception(err_msg) # remove unfilled values - opts = {k: v for k, v in vars(opts).iteritems() if v is not None} + opts = {k: v for k, v in list(vars(opts).items()) if v is not None} # get CLI args params = ' '.join(str(e) for e in sys.argv[1:]) - result = nfvbench_instance.run(opts, params) + result = nfvbench_instance.run(opts, params, dry_run=dry_run) if 'error_message' in result: raise Exception(result['error_message']) @@ -636,7 +962,7 @@ def main(): 'status': NFVBench.STATUS_ERROR, 'error_message': traceback.format_exc() }) - print str(exc) + print((str(exc))) finally: if fluent_logger: # only send a summary record if there was an actual nfvbench run or diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index fa781af..07f1eea 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -15,38 +15,31 @@ # import json -import Queue +import queue +from threading import Thread import uuid from flask import Flask from flask import jsonify -from flask import render_template from flask import request -from flask_socketio import emit -from flask_socketio import SocketIO -from summarizer import NFVBenchSummarizer +from .summarizer import NFVBenchSummarizer -from log import LOG -from utils import byteify -from utils import RunLock +from .log import LOG +from .utils import RunLock -# this global cannot reside in Ctx because of the @app and @socketio decorators -app = None -socketio = None +from .__init__ import __version__ STATUS_OK = 'OK' STATUS_ERROR = 'ERROR' STATUS_PENDING = 'PENDING' STATUS_NOT_FOUND = 'NOT_FOUND' - def result_json(status, message, request_id=None): body = { 'status': status, 'error_message': message } - if request_id is not None: body['request_id'] = request_id @@ -54,7 +47,7 @@ def result_json(status, message, request_id=None): def load_json(data): - return json.loads(json.dumps(data), object_hook=byteify) + return json.loads(json.dumps(data)) def get_uuid(): @@ -63,18 +56,16 @@ def get_uuid(): class Ctx(object): MAXLEN = 5 - run_queue = Queue.Queue() + run_queue = queue.Queue() busy = False result = None - request_from_socketio = False results = {} ids = [] current_id = None @staticmethod - def enqueue(config, request_id, from_socketio=False): + def enqueue(config, request_id): Ctx.busy = True - Ctx.request_from_socketio = from_socketio config['request_id'] = request_id Ctx.run_queue.put(config) @@ -109,16 +100,15 @@ class Ctx(object): res = Ctx.results[request_id] except KeyError: return None - + # pylint: disable=unsubscriptable-object if Ctx.result and request_id == Ctx.result['request_id']: Ctx.result = None - - return res - else: - res = Ctx.result - if res: - Ctx.result = None return res + # pylint: enable=unsubscriptable-object + res = Ctx.result + if res: + Ctx.result = None + return res @staticmethod def is_busy(): @@ -129,40 +119,18 @@ class Ctx(object): return Ctx.current_id -def setup_flask(root_path): - global socketio - global app +def setup_flask(): app = Flask(__name__) - app.root_path = root_path - socketio = SocketIO(app, async_mode='threading') busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running') not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run') not_found_msg = 'results not found' pending_msg = 'NFVbench run still pending' - # --------- socketio requests ------------ - - @socketio.on('start_run') - def _socketio_start_run(config): - if not Ctx.is_busy(): - Ctx.enqueue(config, get_uuid(), from_socketio=True) - else: - emit('error', {'reason': 'there is already an NFVbench request running'}) - - @socketio.on('echo') - def _socketio_echo(config): - emit('echo', config) - # --------- HTTP requests ------------ - @app.route('/') - def _index(): - return render_template('index.html') - - @app.route('/echo', methods=['GET']) - def _echo(): - config = request.json - return jsonify(config) + @app.route('/version', methods=['GET']) + def _version(): + return __version__ @app.route('/start_run', methods=['POST']) def _start_run(): @@ -189,35 +157,34 @@ def setup_flask(root_path): return jsonify(res) # result for given request_id not found return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id)) - else: - if Ctx.is_busy(): - # task still pending, return with request_id - return jsonify(result_json(STATUS_PENDING, - pending_msg, - Ctx.get_current_request_id())) - - res = Ctx.get_result() - if res: - return jsonify(res) - return jsonify(not_busy_json) + if Ctx.is_busy(): + # task still pending, return with request_id + return jsonify(result_json(STATUS_PENDING, + pending_msg, + Ctx.get_current_request_id())) + res = Ctx.get_result() + if res: + return jsonify(res) + return jsonify(not_busy_json) -class WebSocketIoServer(object): - """This class takes care of the web socketio server, accepts websocket events, and sends back - notifications using websocket events (send_ methods). Caller should simply create an instance + return app + +class WebServer(object): + """This class takes care of the web server. Caller should simply create an instance of this class and pass a runner object then invoke the run method """ - def __init__(self, http_root, runner, fluent_logger): + def __init__(self, runner, fluent_logger): self.nfvbench_runner = runner - setup_flask(http_root) + self.app = setup_flask() self.fluent_logger = fluent_logger - def run(self, host='127.0.0.1', port=7556): + def run(self, host, port): - # socketio.run will not return so we need to run it in a background thread so that + # app.run will not return so we need to run it in a background thread so that # the calling thread (main thread) can keep doing work - socketio.start_background_task(target=socketio.run, app=app, host=host, port=port) + Thread(target=self.app.run, args=(host, port)).start() # wait for run requests # the runner must be executed from the main thread (Trex client library requirement) @@ -229,7 +196,7 @@ class WebSocketIoServer(object): # print config try: # remove unfilled values as we do not want them to override default values with None - config = {k: v for k, v in config.items() if v is not None} + config = {k: v for k, v in list(config.items()) if v is not None} with RunLock(): if self.fluent_logger: self.fluent_logger.start_new_run() @@ -238,14 +205,13 @@ class WebSocketIoServer(object): results = result_json(STATUS_ERROR, str(exc)) LOG.exception('NFVbench runner exception:') - if Ctx.request_from_socketio: - socketio.emit('run_end', results) - else: - # this might overwrite a previously unfetched result - Ctx.set_result(results) + # this might overwrite a previously unfetched result + Ctx.set_result(results) try: summary = NFVBenchSummarizer(results['result'], self.fluent_logger) LOG.info(str(summary)) + if 'json' in config and 'result' in results and results['status']: + self.nfvbench_runner.save(results['result']) except KeyError: # in case of error, 'result' might be missing if 'error_message' in results: @@ -255,13 +221,3 @@ class WebSocketIoServer(object): Ctx.release() if self.fluent_logger: self.fluent_logger.send_run_summary(True) - - def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): - stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} - socketio.emit('run_interval_stats', stats) - - def send_ndr_found(self, ndr_pps): - socketio.emit('ndr_found', {'rate_pps': ndr_pps}) - - def send_pdr_found(self, pdr_pps): - socketio.emit('pdr_found', {'rate_pps': pdr_pps}) diff --git a/nfvbench/nfvbenchvm/nfvbenchvm.conf b/nfvbench/nfvbenchvm/nfvbenchvm.conf index 3bc6ace..8f5e7e9 100644 --- a/nfvbench/nfvbenchvm/nfvbenchvm.conf +++ b/nfvbench/nfvbenchvm/nfvbenchvm.conf @@ -9,3 +9,8 @@ TG_NET1={tg_net1} TG_NET2={tg_net2} TG_GATEWAY1_IP={tg_gateway1_ip} TG_GATEWAY2_IP={tg_gateway2_ip} +VIF_MQ_SIZE={vif_mq_size} +NUM_MBUFS={num_mbufs} +INTF_MGMT_CIDR={intf_mgmt_cidr} +INTF_MGMT_IP_GW={intf_mgmt_ip_gw} +INTF_MAC_MGMT={intf_mac_mgmt}
\ No newline at end of file diff --git a/nfvbench/packet_stats.py b/nfvbench/packet_stats.py index 4b9eac5..d3ec78a 100644 --- a/nfvbench/packet_stats.py +++ b/nfvbench/packet_stats.py @@ -21,7 +21,8 @@ PacketPathStatsManager manages all packet path stats for all chains. import copy -from traffic_gen.traffic_base import Latency +from hdrh.histogram import HdrHistogram +from .traffic_gen.traffic_base import Latency class InterfaceStats(object): """A class to hold the RX and TX counters for a virtual or physical interface. @@ -141,7 +142,7 @@ class PacketPathStats(object): chain. """ - def __init__(self, if_stats, aggregate=False): + def __init__(self, config, if_stats, aggregate=False): """Create a packet path stats intance with the list of associated if stats. if_stats: a list of interface stats that compose this packet path stats @@ -150,6 +151,7 @@ class PacketPathStats(object): Aggregate packet path stats are the only one that should show counters for shared interface stats """ + self.config = config self.if_stats = if_stats # latency for packets sent from port 0 and 1 self.latencies = [Latency(), Latency()] @@ -170,7 +172,7 @@ class PacketPathStats(object): ifstats.add_if_stats(pps.if_stats[index]) @staticmethod - def get_agg_packet_path_stats(pps_list): + def get_agg_packet_path_stats(config, pps_list): """Get the aggregated packet path stats from a list of packet path stats. Interface counters are added, latency stats are updated. @@ -179,7 +181,7 @@ class PacketPathStats(object): for pps in pps_list: if agg_pps is None: # Get a clone of the first in the list - agg_pps = PacketPathStats(pps.get_cloned_if_stats(), aggregate=True) + agg_pps = PacketPathStats(config, pps.get_cloned_if_stats(), aggregate=True) else: agg_pps.add_packet_path_stats(pps) # aggregate all latencies @@ -237,6 +239,21 @@ class PacketPathStats(object): results = {'lat_min_usec': latency.min_usec, 'lat_max_usec': latency.max_usec, 'lat_avg_usec': latency.avg_usec} + if latency.hdrh_available(): + results['hdrh'] = latency.hdrh + decoded_histogram = HdrHistogram.decode(latency.hdrh) + results['lat_percentile'] = {} + # override min max and avg from hdrh (only if histogram is valid) + if decoded_histogram.get_total_count() != 0: + results['lat_min_usec'] = decoded_histogram.get_min_value() + results['lat_max_usec'] = decoded_histogram.get_max_value() + results['lat_avg_usec'] = decoded_histogram.get_mean_value() + for percentile in self.config.lat_percentiles: + results['lat_percentile'][percentile] = decoded_histogram.\ + get_value_at_percentile(percentile) + else: + for percentile in self.config.lat_percentiles: + results['lat_percentile'][percentile] = 'n/a' else: results = {} results['packets'] = counters @@ -249,12 +266,13 @@ class PacketPathStatsManager(object): Each run will generate packet path stats for 1 or more chains. """ - def __init__(self, pps_list): + def __init__(self, config, pps_list): """Create a packet path stats intance with the list of associated if stats. pps_list: a list of packet path stats indexed by the chain id. All packet path stats must have the same length. """ + self.config = config self.pps_list = pps_list def insert_pps_list(self, chain_index, if_stats): @@ -286,11 +304,11 @@ class PacketPathStatsManager(object): chains = {} # insert the aggregated row if applicable if len(self.pps_list) > 1: - agg_pps = PacketPathStats.get_agg_packet_path_stats(self.pps_list) + agg_pps = PacketPathStats.get_agg_packet_path_stats(self.config, self.pps_list) chains['total'] = agg_pps.get_stats(reverse) for index, pps in enumerate(self.pps_list): - chains[index] = pps.get_stats(reverse) + chains[str(index)] = pps.get_stats(reverse) return {'interfaces': self._get_if_agg_name(reverse), 'chains': chains} @@ -305,11 +323,11 @@ class PacketPathStatsManager(object): 'Forward': { 'interfaces': ['Port0', 'vhost0', 'Port1'], 'chains': { - 0: {'packets': [2000054, 1999996, 1999996], + '0': {'packets': [2000054, 1999996, 1999996], 'min_usec': 10, 'max_usec': 187, 'avg_usec': 45}, - 1: {...}, + '1': {...}, 'total': {...} } }, diff --git a/nfvbench/specs.py b/nfvbench/specs.py index 75fe703..ec5e24e 100644 --- a/nfvbench/specs.py +++ b/nfvbench/specs.py @@ -17,11 +17,13 @@ class Encaps(object): VLAN = "VLAN" VxLAN = "VxLAN" + MPLS = "MPLS" NO_ENCAPS = "NONE" encaps_mapping = { 'VLAN': VLAN, 'VXLAN': VxLAN, + 'MPLS': MPLS, 'NONE': NO_ENCAPS } diff --git a/nfvbench/stats_collector.py b/nfvbench/stats_collector.py index 964d704..dc750db 100644 --- a/nfvbench/stats_collector.py +++ b/nfvbench/stats_collector.py @@ -56,9 +56,7 @@ class IntervalCollector(StatsCollector): self.notifier = notifier def add(self, stats): - if self.notifier: - current_stats = self.__compute_tx_rx_diff(stats) - self.notifier.send_interval_stats(**current_stats) + pass def reset(self): # don't reset time! @@ -66,52 +64,7 @@ class IntervalCollector(StatsCollector): self.last_tx_pkts = 0 def add_ndr_pdr(self, tag, stats): - if self.notifier: - - current_time = self._get_current_time_diff() - rx_pps = self._get_rx_pps(stats['tx_pps'], stats['drop_percentage']) - - self.last_tx_pkts = stats['tx_pps'] / 1000 * (current_time - self.last_time) - self.last_rx_pkts = rx_pps / 1000 * (current_time - self.last_time) - self.last_time = current_time - - # 'drop_pct' key is an unfortunate name, since in iteration stats it means - # number of the packets. More suitable would be 'drop_percentage'. - # FDS frontend depends on this key - current_stats = { - '{}_pps'.format(tag): stats['tx_pps'], - 'tx_pps': stats['tx_pps'], - 'rx_pps': rx_pps, - 'drop_pct': stats['drop_percentage'], - 'time_ms': current_time - } - - self.notifier.send_interval_stats(time_ms=current_stats['time_ms'], - tx_pps=current_stats['tx_pps'], - rx_pps=current_stats['rx_pps'], - drop_pct=current_stats['drop_pct']) - if tag == 'ndr': - self.notifier.send_ndr_found(stats['tx_pps']) - else: - self.notifier.send_pdr_found(stats['tx_pps']) - - def __compute_tx_rx_diff(self, stats): - current_time = self._get_current_time_diff() - tx_diff = stats['overall']['tx']['total_pkts'] - self.last_tx_pkts - tx_pps = (tx_diff * 1000) / (current_time - self.last_time) - rx_diff = stats['overall']['rx']['total_pkts'] - self.last_rx_pkts - rx_pps = (rx_diff * 1000) / (current_time - self.last_time) - - self.last_rx_pkts = stats['overall']['rx']['total_pkts'] - self.last_tx_pkts = stats['overall']['tx']['total_pkts'] - self.last_time = current_time - - return { - 'tx_pps': tx_pps, - 'rx_pps': rx_pps, - 'drop_pct': max(0.0, (1 - (float(rx_pps) / tx_pps)) * 100), - 'time_ms': current_time - } + pass class IterationCollector(StatsCollector): diff --git a/nfvbench/stats_manager.py b/nfvbench/stats_manager.py index 98ac413..6fa98bd 100644 --- a/nfvbench/stats_manager.py +++ b/nfvbench/stats_manager.py @@ -15,9 +15,9 @@ # import time -from log import LOG -from packet_stats import PacketPathStatsManager -from stats_collector import IntervalCollector +from .log import LOG +from .packet_stats import PacketPathStatsManager +from .stats_collector import IntervalCollector class StatsManager(object): @@ -35,7 +35,7 @@ class StatsManager(object): if self.config.single_run: pps_list = [] self.traffic_client.insert_interface_stats(pps_list) - self.pps_mgr = PacketPathStatsManager(pps_list) + self.pps_mgr = PacketPathStatsManager(self.config, pps_list) else: self.pps_mgr = None self.worker = None diff --git a/nfvbench/summarizer.py b/nfvbench/summarizer.py index 7520076..7c69f52 100644 --- a/nfvbench/summarizer.py +++ b/nfvbench/summarizer.py @@ -47,7 +47,7 @@ def _annotate_chain_stats(chain_stats, nodrop_marker='=>'): In the case of shared net, some columns in packets array can have ''. Some columns cab also be None which means the data is not available. """ - for stats in chain_stats.values(): + for stats in list(chain_stats.values()): packets = stats['packets'] count = len(packets) if count > 1: @@ -97,7 +97,7 @@ class Formatter(object): def standard(data): if isinstance(data, int): return Formatter.int(data) - elif isinstance(data, float): + if isinstance(data, float): return Formatter.float(4)(data) return Formatter.fixed(data) @@ -130,7 +130,7 @@ class Formatter(object): def percentage(data): if data is None: return '' - elif math.isnan(data): + if math.isnan(data): return '-' return Formatter.suffix('%')(Formatter.float(4)(data)) @@ -139,7 +139,7 @@ class Table(object): """ASCII readable table class.""" def __init__(self, header): - header_row, self.formatters = zip(*header) + header_row, self.formatters = list(zip(*header)) self.data = [header_row] self.columns = len(header_row) @@ -195,7 +195,7 @@ class Summarizer(object): def _put_dict(self, data): with self._create_block(False): - for key, value in data.iteritems(): + for key, value in list(data.items()): if isinstance(value, dict): self._put(key + ':') self._put_dict(value) @@ -219,35 +219,6 @@ class Summarizer(object): class NFVBenchSummarizer(Summarizer): """Summarize nfvbench json result.""" - ndr_pdr_header = [ - ('-', Formatter.fixed), - ('L2 Frame Size', Formatter.standard), - ('Rate (fwd+rev)', Formatter.bits), - ('Rate (fwd+rev)', Formatter.suffix(' pps')), - ('Avg Drop Rate', Formatter.suffix('%')), - ('Avg Latency (usec)', Formatter.standard), - ('Min Latency (usec)', Formatter.standard), - ('Max Latency (usec)', Formatter.standard) - ] - - single_run_header = [ - ('L2 Frame Size', Formatter.standard), - ('Drop Rate', Formatter.suffix('%')), - ('Avg Latency (usec)', Formatter.standard), - ('Min Latency (usec)', Formatter.standard), - ('Max Latency (usec)', Formatter.standard) - ] - - config_header = [ - ('Direction', Formatter.standard), - ('Requested TX Rate (bps)', Formatter.bits), - ('Actual TX Rate (bps)', Formatter.bits), - ('RX Rate (bps)', Formatter.bits), - ('Requested TX Rate (pps)', Formatter.suffix(' pps')), - ('Actual TX Rate (pps)', Formatter.suffix(' pps')), - ('RX Rate (pps)', Formatter.suffix(' pps')) - ] - direction_keys = ['direction-forward', 'direction-reverse', 'direction-total'] direction_names = ['Forward', 'Reverse', 'Total'] @@ -259,6 +230,47 @@ class NFVBenchSummarizer(Summarizer): self.record_header = None self.record_data = None self.sender = sender + + self.ndr_pdr_header = [ + ('-', Formatter.fixed), + ('L2 Frame Size', Formatter.standard), + ('Rate (fwd+rev)', Formatter.bits), + ('Rate (fwd+rev)', Formatter.suffix(' pps')), + ('Avg Drop Rate', Formatter.suffix('%')), + ('Avg Latency (usec)', Formatter.standard), + ('Min Latency (usec)', Formatter.standard), + ('Max Latency (usec)', Formatter.standard) + ] + + self.single_run_header = [ + ('L2 Frame Size', Formatter.standard), + ('Drop Rate', Formatter.suffix('%')), + ('Avg Latency (usec)', Formatter.standard), + ('Min Latency (usec)', Formatter.standard), + ('Max Latency (usec)', Formatter.standard) + ] + + self.config_header = [ + ('Direction', Formatter.standard), + ('Requested TX Rate (bps)', Formatter.bits), + ('Actual TX Rate (bps)', Formatter.bits), + ('RX Rate (bps)', Formatter.bits), + ('Requested TX Rate (pps)', Formatter.suffix(' pps')), + ('Actual TX Rate (pps)', Formatter.suffix(' pps')), + ('RX Rate (pps)', Formatter.suffix(' pps')) + ] + + # add percentiles headers if hdrh enabled + if not self.config.disable_hdrh: + for percentile in self.config.lat_percentiles: + # 'append' expects a single parameter => double parentheses + self.ndr_pdr_header.append((str(percentile) + ' %ile lat.', Formatter.standard)) + self.single_run_header.append((str(percentile) + ' %ile lat.', Formatter.standard)) + + if self.config.periodic_gratuitous_arp: + self.direction_keys.insert(2, 'garp-direction-total') + self.direction_names.insert(2, 'Gratuitous ARP') + # if sender is available initialize record if self.sender: self.__record_init() @@ -297,7 +309,7 @@ class NFVBenchSummarizer(Summarizer): if network_benchmark['versions']: self._put('Versions:') with self._create_block(): - for component, version in network_benchmark['versions'].iteritems(): + for component, version in list(network_benchmark['versions'].items()): self._put(component + ':', version) if self.config['ndr_run'] or self.config['pdr_run']: @@ -308,7 +320,7 @@ class NFVBenchSummarizer(Summarizer): if self.config['pdr_run']: self._put('PDR:', self.config['measurement']['PDR']) self._put('Service chain:') - for result in network_benchmark['service_chain'].iteritems(): + for result in list(network_benchmark['service_chain'].items()): with self._create_block(): self.__chain_summarize(*result) @@ -325,13 +337,13 @@ class NFVBenchSummarizer(Summarizer): self._put('Bidirectional:', traffic_benchmark['bidirectional']) self._put('Flow count:', traffic_benchmark['flow_count']) self._put('Service chains count:', traffic_benchmark['service_chain_count']) - self._put('Compute nodes:', traffic_benchmark['compute_nodes'].keys()) + self._put('Compute nodes:', list(traffic_benchmark['compute_nodes'].keys())) self.__record_header_put('profile', traffic_benchmark['profile']) self.__record_header_put('bidirectional', traffic_benchmark['bidirectional']) self.__record_header_put('flow_count', traffic_benchmark['flow_count']) self.__record_header_put('sc_count', traffic_benchmark['service_chain_count']) - self.__record_header_put('compute_nodes', traffic_benchmark['compute_nodes'].keys()) + self.__record_header_put('compute_nodes', list(traffic_benchmark['compute_nodes'].keys())) with self._create_block(False): self._put() if not self.config['no_traffic']: @@ -345,7 +357,7 @@ class NFVBenchSummarizer(Summarizer): except KeyError: pass - for entry in traffic_benchmark['result'].iteritems(): + for entry in list(traffic_benchmark['result'].items()): if 'warning' in entry: continue self.__chain_analysis_summarize(*entry) @@ -391,10 +403,11 @@ class NFVBenchSummarizer(Summarizer): summary_table = Table(self.ndr_pdr_header) if self.config['ndr_run']: - for frame_size, analysis in traffic_result.iteritems(): + for frame_size, analysis in list(traffic_result.items()): if frame_size == 'warning': continue - summary_table.add_row([ + + row_data = [ 'NDR', frame_size, analysis['ndr']['rate_bps'], @@ -403,21 +416,34 @@ class NFVBenchSummarizer(Summarizer): analysis['ndr']['stats']['overall']['avg_delay_usec'], analysis['ndr']['stats']['overall']['min_delay_usec'], analysis['ndr']['stats']['overall']['max_delay_usec'] - ]) - self.__record_data_put(frame_size, {'ndr': { + ] + if not self.config.disable_hdrh: + self.extract_hdrh_percentiles( + analysis['ndr']['stats']['overall']['lat_percentile'], row_data) + summary_table.add_row(row_data) + + ndr_data = { 'type': 'NDR', 'rate_bps': analysis['ndr']['rate_bps'], 'rate_pps': analysis['ndr']['rate_pps'], + 'offered_tx_rate_bps': analysis['ndr']['stats']['offered_tx_rate_bps'], + 'theoretical_tx_rate_pps': analysis['ndr']['stats']['theoretical_tx_rate_pps'], + 'theoretical_tx_rate_bps': analysis['ndr']['stats']['theoretical_tx_rate_bps'], 'drop_percentage': analysis['ndr']['stats']['overall']['drop_percentage'], 'avg_delay_usec': analysis['ndr']['stats']['overall']['avg_delay_usec'], 'min_delay_usec': analysis['ndr']['stats']['overall']['min_delay_usec'], 'max_delay_usec': analysis['ndr']['stats']['overall']['max_delay_usec'] - }}) + } + if not self.config.disable_hdrh: + self.extract_hdrh_percentiles( + analysis['ndr']['stats']['overall']['lat_percentile'], ndr_data, True) + self.__record_data_put(frame_size, {'ndr': ndr_data}) if self.config['pdr_run']: - for frame_size, analysis in traffic_result.iteritems(): + for frame_size, analysis in list(traffic_result.items()): if frame_size == 'warning': continue - summary_table.add_row([ + + row_data = [ 'PDR', frame_size, analysis['pdr']['rate_bps'], @@ -426,34 +452,73 @@ class NFVBenchSummarizer(Summarizer): analysis['pdr']['stats']['overall']['avg_delay_usec'], analysis['pdr']['stats']['overall']['min_delay_usec'], analysis['pdr']['stats']['overall']['max_delay_usec'] - ]) - self.__record_data_put(frame_size, {'pdr': { + ] + if not self.config.disable_hdrh: + self.extract_hdrh_percentiles( + analysis['pdr']['stats']['overall']['lat_percentile'], row_data) + summary_table.add_row(row_data) + + pdr_data = { 'type': 'PDR', 'rate_bps': analysis['pdr']['rate_bps'], 'rate_pps': analysis['pdr']['rate_pps'], + 'offered_tx_rate_bps': analysis['pdr']['stats']['offered_tx_rate_bps'], + 'theoretical_tx_rate_pps': analysis['pdr']['stats']['theoretical_tx_rate_pps'], + 'theoretical_tx_rate_bps': analysis['pdr']['stats']['theoretical_tx_rate_bps'], 'drop_percentage': analysis['pdr']['stats']['overall']['drop_percentage'], 'avg_delay_usec': analysis['pdr']['stats']['overall']['avg_delay_usec'], 'min_delay_usec': analysis['pdr']['stats']['overall']['min_delay_usec'], 'max_delay_usec': analysis['pdr']['stats']['overall']['max_delay_usec'] - }}) + } + if not self.config.disable_hdrh: + self.extract_hdrh_percentiles( + analysis['pdr']['stats']['overall']['lat_percentile'], pdr_data, True) + self.__record_data_put(frame_size, {'pdr': pdr_data}) if self.config['single_run']: - for frame_size, analysis in traffic_result.iteritems(): - summary_table.add_row([ + for frame_size, analysis in list(traffic_result.items()): + row_data = [ frame_size, analysis['stats']['overall']['drop_rate_percent'], analysis['stats']['overall']['rx']['avg_delay_usec'], analysis['stats']['overall']['rx']['min_delay_usec'], analysis['stats']['overall']['rx']['max_delay_usec'] - ]) - self.__record_data_put(frame_size, {'single_run': { + ] + if not self.config.disable_hdrh: + self.extract_hdrh_percentiles( + analysis['stats']['overall']['rx']['lat_percentile'], row_data) + summary_table.add_row(row_data) + + single_run_data = { 'type': 'single_run', + 'offered_tx_rate_bps': analysis['stats']['offered_tx_rate_bps'], + 'theoretical_tx_rate_pps': analysis['stats']['theoretical_tx_rate_pps'], + 'theoretical_tx_rate_bps': analysis['stats']['theoretical_tx_rate_bps'], 'drop_rate_percent': analysis['stats']['overall']['drop_rate_percent'], 'avg_delay_usec': analysis['stats']['overall']['rx']['avg_delay_usec'], 'min_delay_usec': analysis['stats']['overall']['rx']['min_delay_usec'], 'max_delay_usec': analysis['stats']['overall']['rx']['max_delay_usec'] - }}) + } + if not self.config.disable_hdrh: + self.extract_hdrh_percentiles( + analysis['stats']['overall']['rx']['lat_percentile'], single_run_data, True) + self.__record_data_put(frame_size, {'single_run': single_run_data}) return summary_table + def extract_hdrh_percentiles(self, lat_percentile, data, add_key=False): + if add_key: + data['lat_percentile'] = {} + for percentile in self.config.lat_percentiles: + if add_key: + try: + data['lat_percentile_' + str(percentile)] = lat_percentile[percentile] + except TypeError: + data['lat_percentile_' + str(percentile)] = "n/a" + else: + try: + data.append(lat_percentile[percentile]) + except TypeError: + data.append("n/a") + def __get_config_table(self, run_config, frame_size): config_table = Table(self.config_header) for key, name in zip(self.direction_keys, self.direction_names): @@ -485,11 +550,11 @@ class NFVBenchSummarizer(Summarizer): chain_stats: { 'interfaces': ['Port0', 'drop %'', 'vhost0', 'Port1'], 'chains': { - 0: {'packets': [2000054, '-0.023%', 1999996, 1999996], + '0': {'packets': [2000054, '-0.023%', 1999996, 1999996], 'lat_min_usec': 10, 'lat_max_usec': 187, 'lat_avg_usec': 45}, - 1: {...}, + '1': {...}, 'total': {...} } } @@ -498,21 +563,43 @@ class NFVBenchSummarizer(Summarizer): _annotate_chain_stats(chains) header = [('Chain', Formatter.standard)] + \ [(ifname, Formatter.standard) for ifname in chain_stats['interfaces']] - # add latency columns if available Avg, Min, Max + # add latency columns if available Avg, Min, Max and percentiles lat_keys = [] lat_map = {'lat_avg_usec': 'Avg lat.', 'lat_min_usec': 'Min lat.', 'lat_max_usec': 'Max lat.'} - if 'lat_avg_usec' in chains[0]: + if 'lat_avg_usec' in chains['0']: lat_keys = ['lat_avg_usec', 'lat_min_usec', 'lat_max_usec'] - for key in lat_keys: - header.append((lat_map[key], Formatter.standard)) + + if not self.config.disable_hdrh: + lat_keys.append('lat_percentile') + for percentile in self.config.lat_percentiles: + lat_map['lat_' + str(percentile) + '_percentile'] = \ + str(percentile) + ' %ile lat.' + + for lat_value in lat_map.values(): + # 'append' expects a single parameter => double parentheses + header.append((lat_value, Formatter.standard)) table = Table(header) - for chain in sorted(chains.keys()): + for chain in sorted(list(chains.keys()), key=str): row = [chain] + chains[chain]['packets'] for lat_key in lat_keys: - row.append('{:,} usec'.format(chains[chain][lat_key])) + + if lat_key != 'lat_percentile': + if chains[chain].get(lat_key, None): + row.append(Formatter.standard(chains[chain][lat_key])) + else: + row.append('n/a') + else: + if not self.config.disable_hdrh: + if chains[chain].get(lat_key, None): + for percentile in chains[chain][lat_key]: + row.append(Formatter.standard( + chains[chain][lat_key][percentile])) + else: + for _ in self.config.lat_percentiles: + row.append('n/a') table.add_row(row) return table @@ -546,9 +633,9 @@ class NFVBenchSummarizer(Summarizer): run_specific_data['pdr'] = data['pdr'] run_specific_data['pdr']['drop_limit'] = self.config['measurement']['PDR'] del data['pdr'] - for key in run_specific_data: + for data_value in run_specific_data.values(): data_to_send = data.copy() - data_to_send.update(run_specific_data[key]) + data_to_send.update(data_value) self.sender.record_send(data_to_send) self.__record_init() diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py index dbb8206..47af265 100755 --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -13,43 +13,45 @@ # under the License. """Interface to the traffic generator clients including NDR/PDR binary search.""" - -from datetime import datetime import socket import struct import time +import sys from attrdict import AttrDict import bitmath +from hdrh.histogram import HdrHistogram from netaddr import IPNetwork # pylint: disable=import-error -from trex_stl_lib.api import STLError +from trex.stl.api import Ether +from trex.stl.api import STLError +from trex.stl.api import UDP +# pylint: disable=wrong-import-order +from scapy.contrib.mpls import MPLS # flake8: noqa +# pylint: enable=wrong-import-order # pylint: enable=import-error -from log import LOG -from packet_stats import InterfaceStats -from packet_stats import PacketPathStats -from stats_collector import IntervalCollector -from stats_collector import IterationCollector -import traffic_gen.traffic_utils as utils -from utils import cast_integer - +from .log import LOG +from .packet_stats import InterfaceStats +from .packet_stats import PacketPathStats +from .stats_collector import IntervalCollector +from .stats_collector import IterationCollector +from .traffic_gen import traffic_utils as utils +from .utils import cast_integer, find_max_size, find_tuples_equal_to_lcm_value, get_divisors, lcm class TrafficClientException(Exception): """Generic traffic client exception.""" - pass - - class TrafficRunner(object): """Serialize various steps required to run traffic.""" - def __init__(self, client, duration_sec, interval_sec=0): + def __init__(self, client, duration_sec, interval_sec=0, service_mode=False): """Create a traffic runner.""" self.client = client self.start_time = None self.duration_sec = duration_sec self.interval_sec = interval_sec + self.service_mode = service_mode def run(self): """Clear stats and instruct the traffic generator to start generating traffic.""" @@ -57,6 +59,13 @@ class TrafficRunner(object): return None LOG.info('Running traffic generator') self.client.gen.clear_stats() + # Debug use only: the service_mode flag may have been set in + # the configuration, in order to enable the 'service' mode + # in the trex generator, before starting the traffic (run). + # From this point, a T-rex console (launched in readonly mode) would + # then be able to capture the transmitted and/or received traffic. + self.client.gen.set_service_mode(enabled=self.service_mode) + LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis') self.client.gen.start_traffic() self.start_time = time.time() return self.poll_stats() @@ -110,6 +119,8 @@ class IpBlock(object): def __init__(self, base_ip, step_ip, count_ip): """Create an IP block.""" self.base_ip_int = Device.ip_to_int(base_ip) + if step_ip == 'random': + step_ip = '0.0.0.1' self.step = Device.ip_to_int(step_ip) self.max_available = count_ip self.next_free = 0 @@ -120,8 +131,15 @@ class IpBlock(object): raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available)) return Device.int_to_ip(self.base_ip_int + index * self.step) + def get_ip_from_chain_first_ip(self, first_ip, index=0): + """Return the IP address at given index starting from chain first ip.""" + if index < 0 or index >= self.max_available: + raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available)) + return Device.int_to_ip(first_ip + index * self.step) + def reserve_ip_range(self, count): - """Reserve a range of count consecutive IP addresses spaced by step.""" + """Reserve a range of count consecutive IP addresses spaced by step. + """ if self.next_free + count > self.max_available: raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' % (self.next_free, @@ -137,6 +155,27 @@ class IpBlock(object): self.next_free = 0 +class UdpPorts(object): + + def __init__(self, src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, step): + + self.src_min = int(src_min) + self.src_max = int(src_max) + self.dst_min = int(dst_min) + self.dst_max = int(dst_max) + self.udp_src_size = udp_src_size + self.udp_dst_size = udp_dst_size + self.step = step + + def get_src_max(self, index=0): + """Return the UDP src port at given index.""" + return int(self.src_min) + index * int(self.step) + + def get_dst_max(self, index=0): + """Return the UDP dst port at given index.""" + return int(self.dst_min) + index * int(self.step) + + class Device(object): """Represent a port device and all information associated to it. @@ -148,12 +187,19 @@ class Device(object): """Create a new device for a given port.""" self.generator_config = generator_config self.chain_count = generator_config.service_chain_count - self.flow_count = generator_config.flow_count / 2 + if generator_config.bidirectional: + self.flow_count = generator_config.flow_count / 2 + else: + self.flow_count = generator_config.flow_count + self.port = port self.switch_port = generator_config.interfaces[port].get('switch_port', None) self.vtep_vlan = None self.vtep_src_mac = None self.vxlan = False + self.mpls = False + self.inner_labels = None + self.outer_labels = None self.pci = generator_config.interfaces[port].pci self.mac = None self.dest_macs = None @@ -166,10 +212,50 @@ class Device(object): self.vnis = None self.vlans = None self.ip_addrs = generator_config.ip_addrs[port] - subnet = IPNetwork(self.ip_addrs) - self.ip = subnet.ip.format() + self.ip_src_static = generator_config.ip_src_static self.ip_addrs_step = generator_config.ip_addrs_step - self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count) + if self.ip_addrs_step == 'random': + # Set step to 1 to calculate the IP range size (see check_range_size below) + step = '0.0.0.1' + else: + step = self.ip_addrs_step + self.ip_size = self.check_range_size(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step)) + self.ip = str(IPNetwork(self.ip_addrs).network) + ip_addrs_left = generator_config.ip_addrs[0] + ip_addrs_right = generator_config.ip_addrs[1] + self.ip_addrs_size = { + 'left': self.check_range_size(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)), + 'right': self.check_range_size(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))} + udp_src_port = generator_config.gen_config.udp_src_port + if udp_src_port is None: + udp_src_port = 53 + udp_dst_port = generator_config.gen_config.udp_dst_port + if udp_dst_port is None: + udp_dst_port = 53 + src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port') + dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port') + if generator_config.gen_config.udp_port_step == 'random': + # Set step to 1 to calculate the UDP range size + udp_step = 1 + else: + udp_step = int(generator_config.gen_config.udp_port_step) + udp_src_size = self.check_range_size(int(src_max) - int(src_min) + 1, udp_step) + udp_dst_size = self.check_range_size(int(dst_max) - int(dst_min) + 1, udp_step) + lcm_port = lcm(udp_src_size, udp_dst_size) + if self.ip_src_static is True: + lcm_ip = lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right'])) + else: + lcm_ip = lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right']) + flow_max = lcm(lcm_port, lcm_ip) + if self.flow_count > flow_max: + raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' % + (self.flow_count, flow_max)) + + self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, + generator_config.gen_config.udp_port_step) + + self.ip_block = IpBlock(self.ip, step, self.ip_size) + self.gw_ip_block = IpBlock(generator_config.gateway_ips[port], generator_config.gateway_ip_addrs_step, self.chain_count) @@ -177,8 +263,146 @@ class Device(object): self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs, generator_config.tg_gateway_ip_addrs_step, self.chain_count) - self.udp_src_port = generator_config.udp_src_port - self.udp_dst_port = generator_config.udp_dst_port + + def limit_ip_udp_ranges(self, peer_ip_size, cur_chain_flow_count): + # init to min value in case of no matching values found with lcm calculation + new_src_ip_size = 1 + new_peer_ip_size = 1 + new_src_udp_size = 1 + new_dst_udp_size = 1 + + if self.ip_src_static is True: + src_ip_size = 1 + else: + src_ip_size = self.ip_size + ip_src_divisors = list(get_divisors(src_ip_size)) + ip_dst_divisors = list(get_divisors(peer_ip_size)) + udp_src_divisors = list(get_divisors(self.udp_ports.udp_src_size)) + udp_dst_divisors = list(get_divisors(self.udp_ports.udp_dst_size)) + fc = int(cur_chain_flow_count) + tuples_ip = list(find_tuples_equal_to_lcm_value(ip_src_divisors, ip_dst_divisors, fc)) + tuples_udp = list(find_tuples_equal_to_lcm_value(udp_src_divisors, udp_dst_divisors, fc)) + + if tuples_ip: + new_src_ip_size = tuples_ip[-1][0] + new_peer_ip_size = tuples_ip[-1][1] + + if tuples_udp: + new_src_udp_size = tuples_udp[-1][0] + new_dst_udp_size = tuples_udp[-1][1] + + tuples_src = [] + tuples_dst = [] + if not tuples_ip and not tuples_udp: + # in case of not divisors in common matching LCM value (i.e. requested flow count) + # try to find an accurate UDP range to fit requested flow count + udp_src_int = range(self.udp_ports.src_min, self.udp_ports.src_max) + udp_dst_int = range(self.udp_ports.dst_min, self.udp_ports.dst_max) + tuples_src = list(find_tuples_equal_to_lcm_value(ip_src_divisors, udp_src_int, fc)) + tuples_dst = list(find_tuples_equal_to_lcm_value(ip_dst_divisors, udp_dst_int, fc)) + + if not tuples_src and not tuples_dst: + # iterate IP and UDP ranges to find a tuple that match flow count values + src_ip_range = range(1,src_ip_size) + dst_ip_range = range(1, peer_ip_size) + tuples_src = list(find_tuples_equal_to_lcm_value(src_ip_range, udp_src_int, fc)) + tuples_dst = list(find_tuples_equal_to_lcm_value(dst_ip_range, udp_dst_int, fc)) + + if tuples_src or tuples_dst: + if tuples_src: + new_src_ip_size = tuples_src[-1][0] + new_src_udp_size = tuples_src[-1][1] + if tuples_dst: + new_peer_ip_size = tuples_dst[-1][0] + new_dst_udp_size = tuples_dst[-1][1] + else: + if not tuples_ip: + if src_ip_size != 1: + if src_ip_size > fc: + new_src_ip_size = fc + else: + new_src_ip_size = find_max_size(src_ip_size, tuples_udp, fc) + if peer_ip_size != 1: + if peer_ip_size > fc: + new_peer_ip_size = fc + else: + new_peer_ip_size = find_max_size(peer_ip_size, tuples_udp, fc) + + if not tuples_udp: + if self.udp_ports.udp_src_size != 1: + if self.udp_ports.udp_src_size > fc: + new_src_udp_size = fc + else: + new_src_udp_size = find_max_size(self.udp_ports.udp_src_size, + tuples_ip, fc) + if self.udp_ports.udp_dst_size != 1: + if self.udp_ports.udp_dst_size > fc: + new_dst_udp_size = fc + else: + new_dst_udp_size = find_max_size(self.udp_ports.udp_dst_size, + tuples_ip, fc) + max_possible_flows = lcm(lcm(new_src_ip_size, new_peer_ip_size), + lcm(new_src_udp_size, new_dst_udp_size)) + + LOG.debug("IP dst size: %d", new_peer_ip_size) + LOG.debug("LCM IP: %d", lcm(new_src_ip_size, new_peer_ip_size)) + LOG.debug("LCM UDP: %d", lcm(new_src_udp_size, new_dst_udp_size)) + LOG.debug("Global LCM: %d", max_possible_flows) + LOG.debug("IP src size: %d, IP dst size: %d, UDP src size: %d, UDP dst size: %d", + new_src_ip_size, new_peer_ip_size, self.udp_ports.udp_src_size, + self.udp_ports.udp_dst_size) + if not max_possible_flows == cur_chain_flow_count: + if (self.ip_addrs_step != '0.0.0.1' or self.udp_ports.step != '1') and not ( + self.ip_addrs_step == 'random' and self.udp_ports.step == 'random'): + LOG.warning("Current values of ip_addrs_step and/or udp_port_step properties " + "do not allow to control an accurate flow count. " + "Values will be overridden as follows:") + if self.ip_addrs_step != '0.0.0.1': + LOG.info("ip_addrs_step='0.0.0.1' (previous value: ip_addrs_step='%s')", + self.ip_addrs_step) + self.ip_addrs_step = '0.0.0.1' + + if self.udp_ports.step != '1': + LOG.info("udp_port_step='1' (previous value: udp_port_step='%s')", + self.udp_ports.step) + self.udp_ports.step = '1' + # override config for not logging random step warning message in trex_gen.py + self.generator_config.gen_config.udp_port_step = self.udp_ports.step + else: + LOG.error("Current values of ip_addrs_step and udp_port_step properties " + "do not allow to control an accurate flow count.") + else: + src_ip_size = new_src_ip_size + peer_ip_size = new_peer_ip_size + self.udp_ports.udp_src_size = new_src_udp_size + self.udp_ports.udp_dst_size = new_dst_udp_size + return src_ip_size, peer_ip_size + + @staticmethod + def define_udp_range(udp_port, property_name): + if isinstance(udp_port, int): + min = udp_port + max = min + elif isinstance(udp_port, tuple): + min = udp_port[0] + max = udp_port[1] + else: + raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])' + % property_name) + return max, min + + + @staticmethod + def check_range_size(range_size, step): + """Check and set the available IPs or UDP ports, considering the step.""" + try: + if range_size % step == 0: + value = range_size // step + else: + value = range_size // step + 1 + return value + except ZeroDivisionError: + raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError def set_mac(self, mac): """Set the local MAC for this port device.""" @@ -197,7 +421,7 @@ class Device(object): - VM macs discovered using openstack API - dest MACs provisioned in config file """ - self.vtep_dst_mac = map(str, dest_macs) + self.vtep_dst_mac = list(map(str, dest_macs)) def set_dest_macs(self, dest_macs): """Set the list of dest MACs indexed by the chain id. @@ -206,7 +430,7 @@ class Device(object): - VM macs discovered using openstack API - dest MACs provisioned in config file """ - self.dest_macs = map(str, dest_macs) + self.dest_macs = list(map(str, dest_macs)) def get_dest_macs(self): """Get the list of dest macs for this device. @@ -237,10 +461,30 @@ class Device(object): LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port, self.vtep_src_ip, self.vtep_dst_ip) + def set_mpls_peers(self, src_ip, dst_ip): + self.mpls = True + self.vtep_dst_ip = dst_ip + self.vtep_src_ip = src_ip + LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port, + self.vtep_src_ip, self.vtep_dst_ip) + def set_vxlans(self, vnis): self.vnis = vnis LOG.info("Port %d: VNIs %s", self.port, self.vnis) + def set_mpls_inner_labels(self, labels): + self.inner_labels = labels + LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels) + + def set_mpls_outer_labels(self, labels): + self.outer_labels = labels + LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels) + + def set_gw_ip(self, gateway_ip): + self.gw_ip_block = IpBlock(gateway_ip, + self.generator_config.gateway_ip_addrs_step, + self.chain_count) + def get_gw_ip(self, chain_index): """Retrieve the IP address assigned for the gateway of a given chain.""" return self.gw_ip_block.get_ip(chain_index) @@ -257,16 +501,44 @@ class Device(object): # calculated as (total_flows + chain_count - 1) / chain_count # - the first chain will have the remainder # example 11 flows and 3 chains => 3, 4, 4 - flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count - cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1) + flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count) + cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1)) + peer = self.get_peer_device() self.ip_block.reset_reservation() peer.ip_block.reset_reservation() dest_macs = self.get_dest_macs() - for chain_idx in xrange(self.chain_count): - src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count) - dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count) + # limit ranges of UDP ports and IP to avoid overflow of the number of flows + peer_size = peer.ip_size // self.chain_count + + for chain_idx in range(self.chain_count): + src_ip_size, peer_ip_size = self.limit_ip_udp_ranges(peer_size, cur_chain_flow_count) + + src_ip_first, src_ip_last = self.ip_block.reserve_ip_range \ + (src_ip_size) + dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range \ + (peer_ip_size) + + if self.ip_addrs_step != 'random': + src_ip_last = self.ip_block.get_ip_from_chain_first_ip( + Device.ip_to_int(src_ip_first), src_ip_size - 1) + dst_ip_last = peer.ip_block.get_ip_from_chain_first_ip( + Device.ip_to_int(dst_ip_first), peer_ip_size - 1) + if self.udp_ports.step != 'random': + self.udp_ports.src_max = self.udp_ports.get_src_max(self.udp_ports.udp_src_size - 1) + self.udp_ports.dst_max = self.udp_ports.get_dst_max(self.udp_ports.udp_dst_size - 1) + if self.ip_src_static: + src_ip_last = src_ip_first + + LOG.info("Port %d, chain %d: IP src range [%s,%s]", self.port, chain_idx, + src_ip_first, src_ip_last) + LOG.info("Port %d, chain %d: IP dst range [%s,%s]", self.port, chain_idx, + dst_ip_first, dst_ip_last) + LOG.info("Port %d, chain %d: UDP src range [%s,%s]", self.port, chain_idx, + self.udp_ports.src_min, self.udp_ports.src_max) + LOG.info("Port %d, chain %d: UDP dst range [%s,%s]", self.port, chain_idx, + self.udp_ports.dst_min, self.udp_ports.dst_max) configs.append({ 'count': cur_chain_flow_count, @@ -274,24 +546,34 @@ class Device(object): 'mac_dst': dest_macs[chain_idx], 'ip_src_addr': src_ip_first, 'ip_src_addr_max': src_ip_last, - 'ip_src_count': cur_chain_flow_count, + 'ip_src_count': src_ip_size, 'ip_dst_addr': dst_ip_first, 'ip_dst_addr_max': dst_ip_last, - 'ip_dst_count': cur_chain_flow_count, + 'ip_dst_count': peer_ip_size, 'ip_addrs_step': self.ip_addrs_step, - 'udp_src_port': self.udp_src_port, - 'udp_dst_port': self.udp_dst_port, + 'ip_src_static': self.ip_src_static, + 'udp_src_port': self.udp_ports.src_min, + 'udp_src_port_max': self.udp_ports.src_max, + 'udp_src_count': self.udp_ports.udp_src_size, + 'udp_dst_port': self.udp_ports.dst_min, + 'udp_dst_port_max': self.udp_ports.dst_max, + 'udp_dst_count': self.udp_ports.udp_dst_size, + 'udp_port_step': self.udp_ports.step, 'mac_discovery_gw': self.get_gw_ip(chain_idx), 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx), 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx), 'vlan_tag': self.vlans[chain_idx] if self.vlans else None, 'vxlan': self.vxlan, 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None, - 'vtep_src_mac': self.mac if self.vxlan is True else None, - 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None, + 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None, + 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None, 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None, 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None, - 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None + 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None, + 'mpls': self.mpls, + 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None, + 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None + }) # after first chain, fall back to the flow count for all other chains cur_chain_flow_count = flows_per_chain @@ -305,7 +587,7 @@ class Device(object): @staticmethod def int_to_ip(nvalue): """Convert an IP address from numeric to string.""" - return socket.inet_ntoa(struct.pack("!I", nvalue)) + return socket.inet_ntoa(struct.pack("!I", int(nvalue))) class GeneratorConfig(object): @@ -333,23 +615,37 @@ class GeneratorConfig(object): self.cores = config.cores else: self.cores = gen_config.get('cores', 1) + # let's report the value actually used in the end + config.cores_used = self.cores self.mbuf_factor = config.mbuf_factor - if gen_config.intf_speed: - # interface speed is overriden from config - self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits + self.mbuf_64 = config.mbuf_64 + self.hdrh = not config.disable_hdrh + if config.intf_speed: + # interface speed is overriden from the command line + self.intf_speed = config.intf_speed + elif gen_config.intf_speed: + # interface speed is overriden from the generator config + self.intf_speed = gen_config.intf_speed else: + self.intf_speed = "auto" + if self.intf_speed in ("auto", "0"): # interface speed is discovered/provided by the traffic generator self.intf_speed = 0 + else: + self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits + self.name = gen_config.name + self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500) + self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501) + self.limit_memory = gen_config.get('limit_memory', 1024) self.software_mode = gen_config.get('software_mode', False) self.interfaces = gen_config.interfaces if self.interfaces[0].port != 0 or self.interfaces[1].port != 1: raise TrafficClientException('Invalid port order/id in generator_profile.interfaces') - self.service_chain = config.service_chain self.service_chain_count = config.service_chain_count self.flow_count = config.flow_count self.host_name = gen_config.host_name - + self.bidirectional = config.traffic.bidirectional self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs self.ip_addrs = gen_config.ip_addrs self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP @@ -357,8 +653,7 @@ class GeneratorConfig(object): gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP self.gateway_ips = gen_config.gateway_ip_addrs - self.udp_src_port = gen_config.udp_src_port - self.udp_dst_port = gen_config.udp_dst_port + self.ip_src_static = gen_config.ip_src_static self.vteps = gen_config.get('vteps') self.devices = [Device(port, self) for port in [0, 1]] # This should normally always be [0, 1] @@ -386,10 +681,11 @@ class GeneratorConfig(object): port_index: the port for which dest macs must be set dest_macs: a list of dest MACs indexed by chain id """ - if len(dest_macs) != self.config.service_chain_count: + if len(dest_macs) < self.config.service_chain_count: raise TrafficClientException('Dest MAC list %s must have %d entries' % (dest_macs, self.config.service_chain_count)) - self.devices[port_index].set_dest_macs(dest_macs) + # only pass the first scc dest MACs + self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count]) LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs]) def set_vtep_dest_macs(self, port_index, dest_macs): @@ -402,7 +698,7 @@ class GeneratorConfig(object): raise TrafficClientException('Dest MAC list %s must have %d entries' % (dest_macs, self.config.service_chain_count)) self.devices[port_index].set_vtep_dst_mac(dest_macs) - LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs])) + LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs}) def get_dest_macs(self): """Return the list of dest macs indexed by port.""" @@ -430,6 +726,28 @@ class GeneratorConfig(object): (vxlans, self.config.service_chain_count)) self.devices[port_index].set_vxlans(vxlans) + def set_mpls_inner_labels(self, port_index, labels): + """Set the list of MPLS Labels to use indexed by the chain id on given port. + + port_index: the port for which Labels must be set + Labels: a list of Labels lists indexed by chain id + """ + if len(labels) != self.config.service_chain_count: + raise TrafficClientException('Inner MPLS list %s must have %d entries' % + (labels, self.config.service_chain_count)) + self.devices[port_index].set_mpls_inner_labels(labels) + + def set_mpls_outer_labels(self, port_index, labels): + """Set the list of MPLS Labels to use indexed by the chain id on given port. + + port_index: the port for which Labels must be set + Labels: a list of Labels lists indexed by chain id + """ + if len(labels) != self.config.service_chain_count: + raise TrafficClientException('Outer MPLS list %s must have %d entries' % + (labels, self.config.service_chain_count)) + self.devices[port_index].set_mpls_outer_labels(labels) + def set_vtep_vlan(self, port_index, vlan): """Set the vtep vlan to use indexed by the chain id on given port. port_index: the port for which VLAN must be set @@ -439,6 +757,9 @@ class GeneratorConfig(object): def set_vxlan_endpoints(self, port_index, src_ip, dst_ip): self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip) + def set_mpls_peers(self, port_index, src_ip, dst_ip): + self.devices[port_index].set_mpls_peers(src_ip, dst_ip) + @staticmethod def __match_generator_profile(traffic_generator, generator_profile): gen_config = AttrDict(traffic_generator) @@ -473,7 +794,8 @@ class TrafficClient(object): self.notifier = notifier self.interval_collector = None self.iteration_collector = None - self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec) + self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec, + self.config.service_mode) self.config.frame_sizes = self._get_frame_sizes() self.run_config = { 'l2frame_size': None, @@ -493,10 +815,10 @@ class TrafficClient(object): def _get_generator(self): tool = self.tool.lower() if tool == 'trex': - from traffic_gen import trex - return trex.TRex(self) + from .traffic_gen import trex_gen + return trex_gen.TRex(self) if tool == 'dummy': - from traffic_gen import dummy + from .traffic_gen import dummy return dummy.DummyTG(self) raise TrafficClientException('Unsupported generator tool name:' + self.tool) @@ -514,7 +836,7 @@ class TrafficClient(object): if len(matching_profiles) > 1: raise TrafficClientException('Multiple traffic profiles with name: ' + traffic_profile_name) - elif not matching_profiles: + if not matching_profiles: raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name) return matching_profiles[0].l2frame_size @@ -529,13 +851,17 @@ class TrafficClient(object): # interface speed is overriden from config if self.intf_speed != tg_if_speed: # Warn the user if the speed in the config is different - LOG.warning('Interface speed provided is different from actual speed (%d Gbps)', - intf_speeds[0]) + LOG.warning( + 'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)', + self.intf_speed / 1000000000.0, intf_speeds[0]) else: # interface speed not provisioned by config self.intf_speed = tg_if_speed # also update the speed in the tg config self.generator_config.intf_speed = tg_if_speed + # let's report detected and actually used interface speed + self.config.intf_speed_detected = tg_if_speed + self.config.intf_speed_used = self.intf_speed # Save the traffic generator local MAC for mac, device in zip(self.gen.get_macs(), self.generator_config.devices): @@ -571,11 +897,11 @@ class TrafficClient(object): LOG.info('Starting traffic generator to ensure end-to-end connectivity') # send 2pps on each chain and each direction rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)} - self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) - + self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False, + e2e=True) # ensures enough traffic is coming back - retry_count = (self.config.check_traffic_time_sec + - self.config.generic_poll_sec - 1) / self.config.generic_poll_sec + retry_count = int((self.config.check_traffic_time_sec + + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec) # we expect to see packets coming from 2 unique MAC per chain # because there can be flooding in the case of shared net @@ -591,9 +917,12 @@ class TrafficClient(object): get_mac_id = lambda packet: packet['binary'][60:66] elif self.config.vxlan: get_mac_id = lambda packet: packet['binary'][56:62] + elif self.config.mpls: + get_mac_id = lambda packet: packet['binary'][24:30] + # mpls_transport_label = lambda packet: packet['binary'][14:18] else: get_mac_id = lambda packet: packet['binary'][6:12] - for it in xrange(retry_count): + for it in range(retry_count): self.gen.clear_stats() self.gen.start_traffic() self.gen.start_capture() @@ -605,28 +934,47 @@ class TrafficClient(object): self.gen.stop_traffic() self.gen.fetch_capture_packets() self.gen.stop_capture() - for packet in self.gen.packet_list: - mac_id = get_mac_id(packet) + mac_id = get_mac_id(packet).decode('latin-1') src_mac = ':'.join(["%02x" % ord(x) for x in mac_id]) - if src_mac in mac_map: - port, chain = mac_map[src_mac] - LOG.info('Received packet from mac: %s (chain=%d, port=%d)', - src_mac, chain, port) - mac_map.pop(src_mac, None) + if self.config.mpls: + if src_mac in mac_map and self.is_mpls(packet): + port, chain = mac_map[src_mac] + LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)', + src_mac, chain, port) + mac_map.pop(src_mac, None) + else: + if src_mac in mac_map and self.is_udp(packet): + port, chain = mac_map[src_mac] + LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)', + src_mac, chain, port) + mac_map.pop(src_mac, None) if not mac_map: LOG.info('End-to-end connectivity established') return - + if self.config.l3_router and not self.config.no_arp: + # In case of L3 traffic mode, routers are not able to route traffic + # until VM interfaces are up and ARP requests are done + LOG.info('Waiting for loopback service completely started...') + LOG.info('Sending ARP request to assure end-to-end connectivity established') + self.ensure_arp_successful() raise TrafficClientException('End-to-end connectivity cannot be ensured') + def is_udp(self, packet): + pkt = Ether(packet['binary']) + return UDP in pkt + + def is_mpls(self, packet): + pkt = Ether(packet['binary']) + return MPLS in pkt + def ensure_arp_successful(self): """Resolve all IP using ARP and throw an exception in case of failure.""" dest_macs = self.gen.resolve_arp() if dest_macs: # all dest macs are discovered, saved them into the generator config - if self.config.vxlan: + if self.config.vxlan or self.config.mpls: self.generator_config.set_vtep_dest_macs(0, dest_macs[0]) self.generator_config.set_vtep_dest_macs(1, dest_macs[1]) else: @@ -652,7 +1000,13 @@ class TrafficClient(object): self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']} self.gen.clear_streamblock() - self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) + + if self.config.no_latency_streams: + LOG.info("Latency streams are disabled") + # in service mode, we must disable flow stats (e2e=True) + self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, + latency=not self.config.no_latency_streams, + e2e=self.runner.service_mode) def _modify_load(self, load): self.current_total_rate = {'rate_percent': str(load)} @@ -709,31 +1063,37 @@ class TrafficClient(object): def get_stats(self): """Collect final stats for previous run.""" - stats = self.gen.get_stats() - retDict = {'total_tx_rate': stats['total_tx_rate']} - for port in self.PORTS: - retDict[port] = {'tx': {}, 'rx': {}} + stats = self.gen.get_stats(self.ifstats) + retDict = {'total_tx_rate': stats['total_tx_rate'], + 'offered_tx_rate_bps': stats['offered_tx_rate_bps'], + 'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'], + 'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']} + + if self.config.periodic_gratuitous_arp: + retDict['garp_total_tx_rate'] = stats['garp_total_tx_rate'] tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate'] rx_keys = tx_keys + ['dropped_pkts'] for port in self.PORTS: + port_stats = {'tx': {}, 'rx': {}} for key in tx_keys: - retDict[port]['tx'][key] = int(stats[port]['tx'][key]) + port_stats['tx'][key] = int(stats[port]['tx'][key]) for key in rx_keys: try: - retDict[port]['rx'][key] = int(stats[port]['rx'][key]) + port_stats['rx'][key] = int(stats[port]['rx'][key]) except ValueError: - retDict[port]['rx'][key] = 0 - retDict[port]['rx']['avg_delay_usec'] = cast_integer( + port_stats['rx'][key] = 0 + port_stats['rx']['avg_delay_usec'] = cast_integer( stats[port]['rx']['avg_delay_usec']) - retDict[port]['rx']['min_delay_usec'] = cast_integer( + port_stats['rx']['min_delay_usec'] = cast_integer( stats[port]['rx']['min_delay_usec']) - retDict[port]['rx']['max_delay_usec'] = cast_integer( + port_stats['rx']['max_delay_usec'] = cast_integer( stats[port]['rx']['max_delay_usec']) - retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port]) + port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats) + retDict[str(port)] = port_stats - ports = sorted(retDict.keys()) + ports = sorted(list(retDict.keys()), key=str) if self.run_config['bidirectional']: retDict['overall'] = {'tx': {}, 'rx': {}} for key in tx_keys: @@ -759,6 +1119,22 @@ class TrafficClient(object): else: retDict['overall'] = retDict[ports[0]] retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall']) + + if 'overall_hdrh' in stats: + retDict['overall']['hdrh'] = stats.get('overall_hdrh', None) + decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh']) + retDict['overall']['rx']['lat_percentile'] = {} + # override min max and avg from hdrh (only if histogram is valid) + if decoded_histogram.get_total_count() != 0: + retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value() + retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value() + retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value() + for percentile in self.config.lat_percentiles: + retDict['overall']['rx']['lat_percentile'][percentile] = \ + decoded_histogram.get_value_at_percentile(percentile) + else: + for percentile in self.config.lat_percentiles: + retDict['overall']['rx']['lat_percentile'][percentile] = 'n/a' return retDict def __convert_rates(self, rate): @@ -774,6 +1150,7 @@ class TrafficClient(object): def __format_output_stats(self, stats): for key in self.PORTS + ['overall']: + key = str(key) interface = stats[key] stats[key] = { 'tx_pkts': interface['tx']['total_pkts'], @@ -785,10 +1162,26 @@ class TrafficClient(object): 'min_delay_usec': interface['rx']['min_delay_usec'], } + if key == 'overall': + if 'hdrh' in interface: + stats[key]['hdrh'] = interface.get('hdrh', None) + decoded_histogram = HdrHistogram.decode(stats[key]['hdrh']) + stats[key]['lat_percentile'] = {} + # override min max and avg from hdrh (only if histogram is valid) + if decoded_histogram.get_total_count() != 0: + stats[key]['min_delay_usec'] = decoded_histogram.get_min_value() + stats[key]['max_delay_usec'] = decoded_histogram.get_max_value() + stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value() + for percentile in self.config.lat_percentiles: + stats[key]['lat_percentile'][percentile] = decoded_histogram.\ + get_value_at_percentile(percentile) + else: + for percentile in self.config.lat_percentiles: + stats[key]['lat_percentile'][percentile] = 'n/a' return stats def __targets_found(self, rate, targets, results): - for tag, target in targets.iteritems(): + for tag, target in list(targets.items()): LOG.info('Found %s (%s) load: %s', tag, target, rate) self.__ndr_pdr_found(tag, rate) results[tag]['timestamp_sec'] = time.time() @@ -824,7 +1217,7 @@ class TrafficClient(object): # Split target dicts based on the avg drop rate left_targets = {} right_targets = {} - for tag, target in targets.iteritems(): + for tag, target in list(targets.items()): if stats['overall']['drop_rate_percent'] <= target: # record the best possible rate found for this target results[tag] = rates @@ -870,6 +1263,17 @@ class TrafficClient(object): """ self._modify_load(rate) + # There used to be a inconsistency in case of interface speed override. + # The emulated 'intf_speed' value is unknown to the T-Rex generator which + # refers to the detected line rate for converting relative traffic loads. + # Therefore, we need to convert actual rates here, in terms of packets/s. + + for idx, str_rate in enumerate(self.gen.rates): + if str_rate.endswith('%'): + float_rate = float(str_rate.replace('%', '').strip()) + pps_rate = self.__convert_rates({'rate_percent': float_rate})['rate_pps'] + self.gen.rates[idx] = str(pps_rate) + 'pps' + # poll interval stats and collect them for stats in self.run_traffic(): self.interval_collector.add(stats) @@ -893,25 +1297,32 @@ class TrafficClient(object): LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent']) return stats, current_traffic_config['direction-total'] - @staticmethod - def log_stats(stats): + def log_stats(self, stats): """Log estimated stats during run.""" - report = { - 'datetime': str(datetime.now()), - 'tx_packets': stats['overall']['tx']['total_pkts'], - 'rx_packets': stats['overall']['rx']['total_pkts'], - 'drop_packets': stats['overall']['rx']['dropped_pkts'], - 'drop_rate_percent': stats['overall']['drop_rate_percent'] - } - LOG.info('TX: %(tx_packets)d; ' - 'RX: %(rx_packets)d; ' - 'Est. Dropped: %(drop_packets)d; ' - 'Est. Drop rate: %(drop_rate_percent).4f%%', - report) + # Calculate a rolling drop rate based on differential to + # the previous reading + cur_tx = stats['overall']['tx']['total_pkts'] + cur_rx = stats['overall']['rx']['total_pkts'] + delta_tx = cur_tx - self.prev_tx + delta_rx = cur_rx - self.prev_rx + drops = delta_tx - delta_rx + if delta_tx == 0: + LOG.info("\x1b[1mConfiguration issue!\x1b[0m (no transmission)") + sys.exit(0) + drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx + self.prev_tx = cur_tx + self.prev_rx = cur_rx + LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%', + format(cur_tx, ',d'), + format(cur_rx, ',d'), + format(drops, ',d'), + drop_rate_pct) def run_traffic(self): """Start traffic and return intermediate stats for each interval.""" stats = self.runner.run() + self.prev_tx = 0 + self.prev_rx = 0 while self.runner.is_running: self.log_stats(stats) yield stats @@ -957,21 +1368,35 @@ class TrafficClient(object): # because we want each direction to have the far end RX rates, # use the far end index (1-idx) to retrieve the RX rates for idx, key in enumerate(["direction-forward", "direction-reverse"]): - tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec - rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec + tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec + rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec + + orig_rate = self.run_config['rates'][idx] + if self.config.periodic_gratuitous_arp: + orig_rate['rate_pps'] = float( + orig_rate['rate_pps']) - self.config.gratuitous_arp_pps + r[key] = { - "orig": self.__convert_rates(self.run_config['rates'][idx]), + "orig": self.__convert_rates(orig_rate), "tx": self.__convert_rates({'rate_pps': tx_rate}), "rx": self.__convert_rates({'rate_pps': rx_rate}) } + if self.config.periodic_gratuitous_arp: + r['garp-direction-total'] = { + "orig": self.__convert_rates({'rate_pps': self.config.gratuitous_arp_pps * 2}), + "tx": self.__convert_rates({'rate_pps': results["stats"]["garp_total_tx_rate"]}), + "rx": self.__convert_rates({'rate_pps': 0}) + } + total = {} for direction in ['orig', 'tx', 'rx']: total[direction] = {} for unit in ['rate_percent', 'rate_bps', 'rate_pps']: - total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()]) + total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())]) r['direction-total'] = total + return r def insert_interface_stats(self, pps_list): @@ -998,7 +1423,7 @@ class TrafficClient(object): for chain_idx in range(self.config.service_chain_count)] # note that we need to make a copy of the ifs list so that any modification in the # list from pps will not change the list saved in self.ifstats - self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats] + self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats] # insert the corresponding pps in the passed list pps_list.extend(self.pps_list) @@ -1017,7 +1442,7 @@ class TrafficClient(object): ] """ if diff: - stats = self.gen.get_stats() + stats = self.gen.get_stats(self.ifstats) for chain_idx, ifs in enumerate(self.ifstats): # each ifs has exactly 2 InterfaceStats and 2 Latency instances # corresponding to the diff --git a/nfvbench/traffic_gen/dummy.py b/nfvbench/traffic_gen/dummy.py index 9beea28..95147ab 100644 --- a/nfvbench/traffic_gen/dummy.py +++ b/nfvbench/traffic_gen/dummy.py @@ -13,8 +13,8 @@ # under the License. from nfvbench.log import LOG -from traffic_base import AbstractTrafficGenerator -import traffic_utils as utils +from .traffic_base import AbstractTrafficGenerator +from . import traffic_utils as utils class DummyTG(AbstractTrafficGenerator): @@ -95,14 +95,14 @@ class DummyTG(AbstractTrafficGenerator): ports = list(self.traffic_client.generator_config.ports) self.port_handle = ports - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): self.rates = [utils.to_rate_str(rate) for rate in rates] self.l2_frame_size = l2frame_size def clear_streamblock(self): pass - def get_stats(self): + def get_stats(self, ifstats=None): """Get stats from current run. The binary search mainly looks at 2 results to make the decision: @@ -147,6 +147,12 @@ class DummyTG(AbstractTrafficGenerator): total_tx_pps += tx_pps # actual total tx rate in pps result['total_tx_rate'] = total_tx_pps + # actual offered tx rate in bps + avg_packet_size = utils.get_average_packet_size(self.l2_frame_size) + total_tx_bps = utils.pps_to_bps(total_tx_pps, avg_packet_size) + result['offered_tx_rate_bps'] = total_tx_bps + + result.update(self.get_theoretical_rates(avg_packet_size)) return result def get_stream_stats(self, tg_stats, if_stats, latencies, chain_idx): @@ -176,8 +182,8 @@ class DummyTG(AbstractTrafficGenerator): def fetch_capture_packets(self): def _get_packet_capture(mac): # convert text to binary - src_mac = mac.replace(':', '').decode('hex') - return {'binary': 'SSSSSS' + src_mac} + src_mac = bytearray.fromhex(mac.replace(':', '')).decode() + return {'binary': bytes('SSSSSS' + src_mac, 'ascii')} # for packet capture, generate 2*scc random packets # normally we should generate packets coming from the right dest macs @@ -201,6 +207,9 @@ class DummyTG(AbstractTrafficGenerator): def set_mode(self): pass + def set_service_mode(self, enabled=True): + pass + def resolve_arp(self): """Resolve ARP sucessfully.""" def get_macs(port, scc): diff --git a/nfvbench/traffic_gen/traffic_base.py b/nfvbench/traffic_gen/traffic_base.py index 0360591..30aec6e 100644 --- a/nfvbench/traffic_gen/traffic_base.py +++ b/nfvbench/traffic_gen/traffic_base.py @@ -16,7 +16,9 @@ import abc import sys from nfvbench.log import LOG -import traffic_utils +from . import traffic_utils +from hdrh.histogram import HdrHistogram +from functools import reduce class Latency(object): @@ -27,29 +29,42 @@ class Latency(object): latency_list: aggregate all latency values from list if not None """ - self.min_usec = sys.maxint + self.min_usec = sys.maxsize self.max_usec = 0 self.avg_usec = 0 + self.hdrh = None if latency_list: + hdrh_list = [] for lat in latency_list: if lat.available(): self.min_usec = min(self.min_usec, lat.min_usec) self.max_usec = max(self.max_usec, lat.max_usec) self.avg_usec += lat.avg_usec + if lat.hdrh_available(): + hdrh_list.append(HdrHistogram.decode(lat.hdrh)) + + # aggregate histograms if any + if hdrh_list: + def add_hdrh(x, y): + x.add(y) + return x + decoded_hdrh = reduce(add_hdrh, hdrh_list) + self.hdrh = HdrHistogram.encode(decoded_hdrh).decode('utf-8') + # round to nearest usec self.avg_usec = int(round(float(self.avg_usec) / len(latency_list))) def available(self): """Return True if latency information is available.""" - return self.min_usec != sys.maxint + return self.min_usec != sys.maxsize + def hdrh_available(self): + """Return True if latency histogram information is available.""" + return self.hdrh is not None class TrafficGeneratorException(Exception): """Exception for traffic generator.""" - pass - - class AbstractTrafficGenerator(object): def __init__(self, traffic_client): @@ -68,7 +83,7 @@ class AbstractTrafficGenerator(object): return None @abc.abstractmethod - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): # Must be implemented by sub classes return None @@ -84,7 +99,7 @@ class AbstractTrafficGenerator(object): LOG.info('Modified traffic stream for port %s, new rate=%s.', port, self.rates[port_index]) @abc.abstractmethod - def get_stats(self): + def get_stats(self, ifstats): # Must be implemented by sub classes return None @@ -105,7 +120,6 @@ class AbstractTrafficGenerator(object): def clear_streamblock(self): """Clear all streams from the traffic generator.""" - pass @abc.abstractmethod def resolve_arp(self): @@ -115,7 +129,6 @@ class AbstractTrafficGenerator(object): else a dict of list of dest macs indexed by port# the dest macs in the list are indexed by the chain id """ - pass @abc.abstractmethod def get_macs(self): @@ -123,7 +136,6 @@ class AbstractTrafficGenerator(object): return: a list of MAC addresses indexed by the port# """ - pass @abc.abstractmethod def get_port_speed_gbps(self): @@ -131,4 +143,25 @@ class AbstractTrafficGenerator(object): return: a list of speed in Gbps indexed by the port# """ - pass + + def get_theoretical_rates(self, avg_packet_size): + + result = {} + + # actual interface speed? (may be a virtual override) + intf_speed = self.config.intf_speed_used + + if hasattr(self.config, 'user_info') and self.config.user_info is not None: + if "extra_encapsulation_bytes" in self.config.user_info: + frame_size_full_encapsulation = avg_packet_size + self.config.user_info[ + "extra_encapsulation_bytes"] + result['theoretical_tx_rate_pps'] = traffic_utils.bps_to_pps( + intf_speed, frame_size_full_encapsulation) * 2 + result['theoretical_tx_rate_bps'] = traffic_utils.pps_to_bps( + result['theoretical_tx_rate_pps'], avg_packet_size) + else: + result['theoretical_tx_rate_pps'] = traffic_utils.bps_to_pps(intf_speed, + avg_packet_size) * 2 + result['theoretical_tx_rate_bps'] = traffic_utils.pps_to_bps( + result['theoretical_tx_rate_pps'], avg_packet_size) + return result diff --git a/nfvbench/traffic_gen/traffic_utils.py b/nfvbench/traffic_gen/traffic_utils.py index f856267..4366a6c 100644 --- a/nfvbench/traffic_gen/traffic_utils.py +++ b/nfvbench/traffic_gen/traffic_utils.py @@ -14,7 +14,6 @@ import bitmath -from nfvbench.utils import multiplier_map # IMIX frame size including the 4-byte FCS field IMIX_L2_SIZES = [64, 594, 1518] @@ -23,6 +22,11 @@ IMIX_RATIOS = [7, 4, 1] IMIX_AVG_L2_FRAME_SIZE = sum( [1.0 * imix[0] * imix[1] for imix in zip(IMIX_L2_SIZES, IMIX_RATIOS)]) / sum(IMIX_RATIOS) +multiplier_map = { + 'K': 1000, + 'M': 1000000, + 'G': 1000000000 +} def convert_rates(l2frame_size, rate, intf_speed): """Convert a given rate unit into the other rate units. @@ -54,12 +58,11 @@ def convert_rates(l2frame_size, rate, intf_speed): pps = bps_to_pps(bps, avg_packet_size) else: raise Exception('Traffic config needs to have a rate type key') - return { 'initial_rate_type': initial_rate_type, - 'rate_pps': int(pps), + 'rate_pps': int(float(pps)), 'rate_percent': load, - 'rate_bps': int(bps) + 'rate_bps': int(float(bps)) } @@ -113,23 +116,22 @@ def parse_rate_str(rate_str): rate_pps = rate_pps[:-1] except KeyError: multiplier = 1 - rate_pps = int(rate_pps.strip()) * multiplier + rate_pps = int(float(rate_pps.strip()) * multiplier) if rate_pps <= 0: raise Exception('%s is out of valid range' % rate_str) return {'rate_pps': str(rate_pps)} - elif rate_str.endswith('ps'): + if rate_str.endswith('ps'): rate = rate_str.replace('ps', '').strip() bit_rate = bitmath.parse_string(rate).bits if bit_rate <= 0: raise Exception('%s is out of valid range' % rate_str) return {'rate_bps': str(int(bit_rate))} - elif rate_str.endswith('%'): + if rate_str.endswith('%'): rate_percent = float(rate_str.replace('%', '').strip()) if rate_percent <= 0 or rate_percent > 100.0: raise Exception('%s is out of valid range (must be 1-100%%)' % rate_str) return {'rate_percent': str(rate_percent)} - else: - raise Exception('Unknown rate string format %s' % rate_str) + raise Exception('Unknown rate string format %s' % rate_str) def get_load_from_rate(rate_str, avg_frame_size=64, line_rate='10Gbps'): '''From any rate string (with unit) return the corresponding load (in % unit) @@ -172,10 +174,10 @@ def to_rate_str(rate): if 'rate_pps' in rate: pps = rate['rate_pps'] return '{}pps'.format(pps) - elif 'rate_bps' in rate: + if 'rate_bps' in rate: bps = rate['rate_bps'] return '{}bps'.format(bps) - elif 'rate_percent' in rate: + if 'rate_percent' in rate: load = rate['rate_percent'] return '{}%'.format(load) assert False @@ -185,7 +187,7 @@ def to_rate_str(rate): def nan_replace(d): """Replaces every occurence of 'N/A' with float nan.""" - for k, v in d.iteritems(): + for k, v in d.items(): if isinstance(v, dict): nan_replace(v) elif v == 'N/A': @@ -200,5 +202,5 @@ def mac_to_int(mac): def int_to_mac(i): """Converts integer representation of MAC address to hex string.""" mac = format(i, 'x').zfill(12) - blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)] + blocks = [mac[x:x + 2] for x in range(0, len(mac), 2)] return ':'.join(blocks) diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py deleted file mode 100644 index 1f460f6..0000000 --- a/nfvbench/traffic_gen/trex.py +++ /dev/null @@ -1,700 +0,0 @@ -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Driver module for TRex traffic generator.""" - -import math -import os -import random -import time -import traceback - -from itertools import count -from nfvbench.log import LOG -from nfvbench.traffic_server import TRexTrafficServer -from nfvbench.utils import cast_integer -from nfvbench.utils import timeout -from nfvbench.utils import TimeoutError -from traffic_base import AbstractTrafficGenerator -from traffic_base import TrafficGeneratorException -import traffic_utils as utils -from traffic_utils import IMIX_AVG_L2_FRAME_SIZE -from traffic_utils import IMIX_L2_SIZES -from traffic_utils import IMIX_RATIOS - -# pylint: disable=import-error -from trex_stl_lib.api import bind_layers -from trex_stl_lib.api import CTRexVmInsFixHwCs -from trex_stl_lib.api import Dot1Q -from trex_stl_lib.api import Ether -from trex_stl_lib.api import FlagsField -from trex_stl_lib.api import IP -from trex_stl_lib.api import Packet -from trex_stl_lib.api import STLClient -from trex_stl_lib.api import STLError -from trex_stl_lib.api import STLFlowLatencyStats -from trex_stl_lib.api import STLFlowStats -from trex_stl_lib.api import STLPktBuilder -from trex_stl_lib.api import STLScVmRaw -from trex_stl_lib.api import STLStream -from trex_stl_lib.api import STLTXCont -from trex_stl_lib.api import STLVmFixChecksumHw -from trex_stl_lib.api import STLVmFlowVar -from trex_stl_lib.api import STLVmFlowVarRepetableRandom -from trex_stl_lib.api import STLVmWrFlowVar -from trex_stl_lib.api import ThreeBytesField -from trex_stl_lib.api import UDP -from trex_stl_lib.api import XByteField -from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP - - -# pylint: enable=import-error - -class VXLAN(Packet): - """VxLAN class.""" - - _VXLAN_FLAGS = ['R' * 27] + ['I'] + ['R' * 5] - name = "VXLAN" - fields_desc = [FlagsField("flags", 0x08000000, 32, _VXLAN_FLAGS), - ThreeBytesField("vni", 0), - XByteField("reserved", 0x00)] - - def mysummary(self): - """Summary.""" - return self.sprintf("VXLAN (vni=%VXLAN.vni%)") - -class TRex(AbstractTrafficGenerator): - """TRex traffic generator driver.""" - - LATENCY_PPS = 1000 - CHAIN_PG_ID_MASK = 0x007F - PORT_PG_ID_MASK = 0x0080 - LATENCY_PG_ID_MASK = 0x0100 - - def __init__(self, traffic_client): - """Trex driver.""" - AbstractTrafficGenerator.__init__(self, traffic_client) - self.client = None - self.id = count() - self.port_handle = [] - self.chain_count = self.generator_config.service_chain_count - self.rates = [] - self.capture_id = None - self.packet_list = [] - - def get_version(self): - """Get the Trex version.""" - return self.client.get_server_version() if self.client else '' - - def get_pg_id(self, port, chain_id): - """Calculate the packet group IDs to use for a given port/stream type/chain_id. - - port: 0 or 1 - chain_id: identifies to which chain the pg_id is associated (0 to 255) - return: pg_id, lat_pg_id - - We use a bit mask to set up the 3 fields: - 0x007F: chain ID (8 bits for a max of 128 chains) - 0x0080: port bit - 0x0100: latency bit - """ - pg_id = port * TRex.PORT_PG_ID_MASK | chain_id - return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK - - def extract_stats(self, in_stats): - """Extract stats from dict returned by Trex API. - - :param in_stats: dict as returned by TRex api - """ - utils.nan_replace(in_stats) - # LOG.debug(in_stats) - - result = {} - # port_handles should have only 2 elements: [0, 1] - # so (1 - ph) will be the index for the far end port - for ph in self.port_handle: - stats = in_stats[ph] - far_end_stats = in_stats[1 - ph] - result[ph] = { - 'tx': { - 'total_pkts': cast_integer(stats['opackets']), - 'total_pkt_bytes': cast_integer(stats['obytes']), - 'pkt_rate': cast_integer(stats['tx_pps']), - 'pkt_bit_rate': cast_integer(stats['tx_bps']) - }, - 'rx': { - 'total_pkts': cast_integer(stats['ipackets']), - 'total_pkt_bytes': cast_integer(stats['ibytes']), - 'pkt_rate': cast_integer(stats['rx_pps']), - 'pkt_bit_rate': cast_integer(stats['rx_bps']), - # how many pkts were dropped in RX direction - # need to take the tx counter on the far end port - 'dropped_pkts': cast_integer( - far_end_stats['opackets'] - stats['ipackets']) - } - } - self.__combine_latencies(in_stats, result[ph]['rx'], ph) - - total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] - result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec) - result["flow_stats"] = in_stats["flow_stats"] - result["latency"] = in_stats["latency"] - return result - - def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx): - """Extract the aggregated stats for a given chain. - - trex_stats: stats as returned by get_stats() - if_stats: a list of 2 interface stats to update (port 0 and 1) - latencies: a list of 2 Latency instances to update for this chain (port 0 and 1) - latencies[p] is the latency for packets sent on port p - if there are no latency streams, the Latency instances are not modified - chain_idx: chain index of the interface stats - - The packet counts include normal and latency streams. - - Trex returns flows stats as follows: - - 'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0}, - 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, - 'rx_bytes': {0: nan, 1: nan, 'total': nan}, - 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, - 'rx_pps': {0: 0, 1: 0, 'total': 0}, - 'tx_bps': {0: 0, 1: 0, 'total': 0}, - 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, - 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, - 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, - 'tx_pps': {0: 0, 1: 0, 'total': 0}}, - 1: {'rx_bps': {0: 0, 1: 0, 'total': 0}, - 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, - 'rx_bytes': {0: nan, 1: nan, 'total': nan}, - 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, - 'rx_pps': {0: 0, 1: 0, 'total': 0}, - 'tx_bps': {0: 0, 1: 0, 'total': 0}, - 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, - 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, - 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, - 'tx_pps': {0: 0, 1: 0, 'total': 0}}, - 128: {'rx_bps': {0: 0, 1: 0, 'total': 0}, - 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, - 'rx_bytes': {0: nan, 1: nan, 'total': nan}, - 'rx_pkts': {0: 15001, 1: 0, 'total': 15001}, - 'rx_pps': {0: 0, 1: 0, 'total': 0}, - 'tx_bps': {0: 0, 1: 0, 'total': 0}, - 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, - 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068}, - 'tx_pkts': {0: 0, 1: 15001, 'total': 15001}, - 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc... - - the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the - get_pg_id() method. - packet counters for a given stream sent on port p are reported as: - - tx_pkts[p] on port p - - rx_pkts[1-p] on the far end port - - This is a tricky/critical counter transposition operation because - the results are grouped by port (not by stream): - tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0] - rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0] - tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1] - rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1] - - or using a more generic formula: - tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p] - rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p] - - the second formula is equivalent to - rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p] - - If there are latency streams, those same counters need to be added in the same way - """ - def get_latency(lval): - try: - return int(round(lval)) - except ValueError: - return 0 - - for ifs in if_stats: - ifs.tx = ifs.rx = 0 - for port in range(2): - pg_id, lat_pg_id = self.get_pg_id(port, chain_idx) - for pid in [pg_id, lat_pg_id]: - try: - pg_stats = trex_stats['flow_stats'][pid] - if_stats[port].tx += pg_stats['tx_pkts'][port] - if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port] - except KeyError: - pass - try: - lat = trex_stats['latency'][lat_pg_id]['latency'] - # dropped_pkts += lat['err_cntrs']['dropped'] - latencies[port].max_usec = get_latency(lat['total_max']) - if math.isnan(lat['total_min']): - latencies[port].min_usec = 0 - latencies[port].avg_usec = 0 - else: - latencies[port].min_usec = get_latency(lat['total_min']) - latencies[port].avg_usec = get_latency(lat['average']) - except KeyError: - pass - - def __combine_latencies(self, in_stats, results, port_handle): - """Traverse TRex result dictionary and combines chosen latency stats.""" - total_max = 0 - average = 0 - total_min = float("inf") - for chain_id in range(self.chain_count): - try: - _, lat_pg_id = self.get_pg_id(port_handle, chain_id) - lat = in_stats['latency'][lat_pg_id]['latency'] - # dropped_pkts += lat['err_cntrs']['dropped'] - total_max = max(lat['total_max'], total_max) - total_min = min(lat['total_min'], total_min) - average += lat['average'] - except KeyError: - pass - if total_min == float("inf"): - total_min = 0 - results['min_delay_usec'] = total_min - results['max_delay_usec'] = total_max - results['avg_delay_usec'] = int(average / self.chain_count) - - def _bind_vxlan(self): - bind_layers(UDP, VXLAN, dport=4789) - bind_layers(VXLAN, Ether) - - def _create_pkt(self, stream_cfg, l2frame_size): - """Create a packet of given size. - - l2frame_size: size of the L2 frame in bytes (including the 32-bit FCS) - """ - # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size - frame_size = int(l2frame_size) - 4 - - if stream_cfg['vxlan'] is True: - self._bind_vxlan() - encap_level = '1' - pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac']) - if stream_cfg['vtep_vlan'] is not None: - pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) - pkt_base /= IP(src=stream_cfg['vtep_src_ip'], dst=stream_cfg['vtep_dst_ip']) - pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789) - pkt_base /= VXLAN(vni=stream_cfg['net_vni']) - pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) - else: - encap_level = '0' - pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) - - if stream_cfg['vlan_tag'] is not None: - pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) - - udp_args = {} - if stream_cfg['udp_src_port']: - udp_args['sport'] = int(stream_cfg['udp_src_port']) - if stream_cfg['udp_dst_port']: - udp_args['dport'] = int(stream_cfg['udp_dst_port']) - pkt_base /= IP() / UDP(**udp_args) - - if stream_cfg['ip_addrs_step'] == 'random': - src_fv = STLVmFlowVarRepetableRandom( - name="ip_src", - min_value=stream_cfg['ip_src_addr'], - max_value=stream_cfg['ip_src_addr_max'], - size=4, - seed=random.randint(0, 32767), - limit=stream_cfg['ip_src_count']) - dst_fv = STLVmFlowVarRepetableRandom( - name="ip_dst", - min_value=stream_cfg['ip_dst_addr'], - max_value=stream_cfg['ip_dst_addr_max'], - size=4, - seed=random.randint(0, 32767), - limit=stream_cfg['ip_dst_count']) - else: - src_fv = STLVmFlowVar( - name="ip_src", - min_value=stream_cfg['ip_src_addr'], - max_value=stream_cfg['ip_src_addr'], - size=4, - op="inc", - step=stream_cfg['ip_addrs_step']) - dst_fv = STLVmFlowVar( - name="ip_dst", - min_value=stream_cfg['ip_dst_addr'], - max_value=stream_cfg['ip_dst_addr_max'], - size=4, - op="inc", - step=stream_cfg['ip_addrs_step']) - - vm_param = [ - src_fv, - STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)), - dst_fv, - STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)), - STLVmFixChecksumHw(l3_offset="IP:{}".format(encap_level), - l4_offset="UDP:{}".format(encap_level), - l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP) - ] - pad = max(0, frame_size - len(pkt_base)) * 'x' - - return STLPktBuilder(pkt=pkt_base / pad, vm=STLScVmRaw(vm_param)) - - def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True): - """Create a list of streams corresponding to a given chain and stream config. - - port: port where the streams originate (0 or 1) - chain_id: the chain to which the streams are associated to - stream_cfg: stream configuration - l2frame: L2 frame size (including 4-byte FCS) or 'IMIX' - latency: if True also create a latency stream - """ - streams = [] - pg_id, lat_pg_id = self.get_pg_id(port, chain_id) - if l2frame == 'IMIX': - for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES): - pkt = self._create_pkt(stream_cfg, l2_frame_size) - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowStats(pg_id=pg_id), - mode=STLTXCont(pps=ratio))) - - if latency: - # for IMIX, the latency packets have the average IMIX packet size - pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE) - - else: - l2frame_size = int(l2frame) - pkt = self._create_pkt(stream_cfg, l2frame_size) - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowStats(pg_id=pg_id), - mode=STLTXCont())) - # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging - # without vlan, the min l2 frame size is 64 - # with vlan it is 68 - # This only applies to the latency stream - if latency and stream_cfg['vlan_tag'] and l2frame_size < 68: - pkt = self._create_pkt(stream_cfg, 68) - - if latency: - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id), - mode=STLTXCont(pps=self.LATENCY_PPS))) - return streams - - @timeout(5) - def __connect(self, client): - client.connect() - - def __connect_after_start(self): - # after start, Trex may take a bit of time to initialize - # so we need to retry a few times - for it in xrange(self.config.generic_retry_count): - try: - time.sleep(1) - self.client.connect() - break - except Exception as ex: - if it == (self.config.generic_retry_count - 1): - raise - LOG.info("Retrying connection to TRex (%s)...", ex.message) - - def connect(self): - """Connect to the TRex server.""" - server_ip = self.generator_config.ip - LOG.info("Connecting to TRex (%s)...", server_ip) - - # Connect to TRex server - self.client = STLClient(server=server_ip) - try: - self.__connect(self.client) - except (TimeoutError, STLError) as e: - if server_ip == '127.0.0.1': - try: - self.__start_server() - self.__connect_after_start() - except (TimeoutError, STLError) as e: - LOG.error('Cannot connect to TRex') - LOG.error(traceback.format_exc()) - logpath = '/tmp/trex.log' - if os.path.isfile(logpath): - # Wait for TRex to finish writing error message - last_size = 0 - for _ in xrange(self.config.generic_retry_count): - size = os.path.getsize(logpath) - if size == last_size: - # probably not writing anymore - break - last_size = size - time.sleep(1) - with open(logpath, 'r') as f: - message = f.read() - else: - message = e.message - raise TrafficGeneratorException(message) - else: - raise TrafficGeneratorException(e.message) - - ports = list(self.generator_config.ports) - self.port_handle = ports - # Prepare the ports - self.client.reset(ports) - # Read HW information from each port - # this returns an array of dict (1 per port) - """ - Example of output for Intel XL710 - [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True, - 'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes', - u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']}, - u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE', - u'description': u'Ethernet Controller XL710 for 40GbE QSFP+', - 'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-', - u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match', - 'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0', - 'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off', - 'layer_mode': 'Ethernet', u'numa': 0}, ...] - """ - self.port_info = self.client.get_port_info(ports) - LOG.info('Connected to TRex') - for id, port in enumerate(self.port_info): - LOG.info(' Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s', - id, port['description'], port['speed'], port['src_mac'], - port['pci_addr'], port['driver']) - # Make sure the 2 ports have the same speed - if self.port_info[0]['speed'] != self.port_info[1]['speed']: - raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' % - (self.port_info[0]['speed'], - self.port_info[1]['speed'])) - - def __start_server(self): - server = TRexTrafficServer() - server.run_server(self.generator_config) - - def resolve_arp(self): - """Resolve all configured remote IP addresses. - - return: None if ARP failed to resolve for all IP addresses - else a dict of list of dest macs indexed by port# - the dest macs in the list are indexed by the chain id - """ - self.client.set_service_mode(ports=self.port_handle) - LOG.info('Polling ARP until successful...') - arp_dest_macs = {} - for port, device in zip(self.port_handle, self.generator_config.devices): - # there should be 1 stream config per chain - stream_configs = device.get_stream_configs() - chain_count = len(stream_configs) - ctx = self.client.create_service_ctx(port=port) - # all dest macs on this port indexed by chain ID - dst_macs = [None] * chain_count - dst_macs_count = 0 - # the index in the list is the chain id - if self.config.vxlan: - arps = [ - STLServiceARP(ctx, - src_ip=device.vtep_src_ip, - dst_ip=device.vtep_dst_ip, - vlan=device.vtep_vlan) - for cfg in stream_configs - ] - else: - arps = [ - STLServiceARP(ctx, - src_ip=cfg['ip_src_tg_gw'], - dst_ip=cfg['mac_discovery_gw'], - # will be None if no vlan tagging - vlan=cfg['vlan_tag']) - for cfg in stream_configs - ] - - for attempt in range(self.config.generic_retry_count): - try: - ctx.run(arps) - except STLError: - LOG.error(traceback.format_exc()) - continue - - unresolved = [] - for chain_id, mac in enumerate(dst_macs): - if not mac: - arp_record = arps[chain_id].get_record() - if arp_record.dst_mac: - dst_macs[chain_id] = arp_record.dst_mac - dst_macs_count += 1 - LOG.info(' ARP: port=%d chain=%d src IP=%s dst IP=%s -> MAC=%s', - port, chain_id, - arp_record.src_ip, - arp_record.dst_ip, arp_record.dst_mac) - else: - unresolved.append(arp_record.dst_ip) - if dst_macs_count == chain_count: - arp_dest_macs[port] = dst_macs - LOG.info('ARP resolved successfully for port %s', port) - break - else: - retry = attempt + 1 - LOG.info('Retrying ARP for: %s (retry %d/%d)', - unresolved, retry, self.config.generic_retry_count) - if retry < self.config.generic_retry_count: - time.sleep(self.config.generic_poll_sec) - else: - LOG.error('ARP timed out for port %s (resolved %d out of %d)', - port, - dst_macs_count, - chain_count) - break - - self.client.set_service_mode(ports=self.port_handle, enabled=False) - if len(arp_dest_macs) == len(self.port_handle): - return arp_dest_macs - return None - - def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency): - """Check if rate provided by user is above requirements. Applies only if latency is True.""" - intf_speed = self.generator_config.intf_speed - if latency: - if bidirectional: - mult = 2 - total_rate = 0 - for rate in rates: - r = utils.convert_rates(l2frame_size, rate, intf_speed) - total_rate += int(r['rate_pps']) - else: - mult = 1 - total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed) - # rate must be enough for latency stream and at least 1 pps for base stream per chain - required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult - result = utils.convert_rates(l2frame_size, - {'rate_pps': required_rate}, - intf_speed * mult) - result['result'] = total_rate >= required_rate - return result - - return {'result': True} - - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): - """Program all the streams in Trex server. - - l2frame_size: L2 frame size or IMIX - rates: a list of 2 rates to run each direction - each rate is a dict like {'rate_pps': '10kpps'} - bidirectional: True if bidirectional - latency: True if latency measurement is needed - """ - r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) - if not r['result']: - raise TrafficGeneratorException( - 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.' - .format(pps=r['rate_pps'], - bps=r['rate_bps'], - load=r['rate_percent'])) - # a dict of list of streams indexed by port# - # in case of fixed size, has self.chain_count * 2 * 2 streams - # (1 normal + 1 latency stream per direction per chain) - # for IMIX, has self.chain_count * 2 * 4 streams - # (3 normal + 1 latency stream per direction per chain) - streamblock = {} - for port in self.port_handle: - streamblock[port] = [] - stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices] - self.rates = [utils.to_rate_str(rate) for rate in rates] - for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): - streamblock[0].extend(self.generate_streams(self.port_handle[0], - chain_id, - fwd_stream_cfg, - l2frame_size, - latency=latency)) - if len(self.rates) > 1: - streamblock[1].extend(self.generate_streams(self.port_handle[1], - chain_id, - rev_stream_cfg, - l2frame_size, - latency=bidirectional and latency)) - - for port in self.port_handle: - self.client.add_streams(streamblock[port], ports=port) - LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port) - - def clear_streamblock(self): - """Clear all streams from TRex.""" - self.rates = [] - self.client.reset(self.port_handle) - LOG.info('Cleared all existing streams') - - def get_stats(self): - """Get stats from Trex.""" - stats = self.client.get_stats() - return self.extract_stats(stats) - - def get_macs(self): - """Return the Trex local port MAC addresses. - - return: a list of MAC addresses indexed by the port# - """ - return [port['src_mac'] for port in self.port_info] - - def get_port_speed_gbps(self): - """Return the Trex local port MAC addresses. - - return: a list of speed in Gbps indexed by the port# - """ - return [port['speed'] for port in self.port_info] - - def clear_stats(self): - """Clear all stats in the traffic gneerator.""" - if self.port_handle: - self.client.clear_stats() - - def start_traffic(self): - """Start generating traffic in all ports.""" - for port, rate in zip(self.port_handle, self.rates): - self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True) - - def stop_traffic(self): - """Stop generating traffic.""" - self.client.stop(ports=self.port_handle) - - def start_capture(self): - """Capture all packets on both ports that are unicast to us.""" - if self.capture_id: - self.stop_capture() - # Need to filter out unwanted packets so we do not end up counting - # src MACs of frames that are not unicast to us - src_mac_list = self.get_macs() - bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1]) - # ports must be set in service in order to enable capture - self.client.set_service_mode(ports=self.port_handle) - self.capture_id = self.client.start_capture(rx_ports=self.port_handle, - bpf_filter=bpf_filter) - - def fetch_capture_packets(self): - """Fetch capture packets in capture mode.""" - if self.capture_id: - self.packet_list = [] - self.client.fetch_capture_packets(capture_id=self.capture_id['id'], - output=self.packet_list) - - def stop_capture(self): - """Stop capturing packets.""" - if self.capture_id: - self.client.stop_capture(capture_id=self.capture_id['id']) - self.capture_id = None - self.client.set_service_mode(ports=self.port_handle, enabled=False) - - def cleanup(self): - """Cleanup Trex driver.""" - if self.client: - try: - self.client.reset(self.port_handle) - self.client.disconnect() - except STLError: - # TRex does not like a reset while in disconnected state - pass diff --git a/nfvbench/traffic_gen/trex_gen.py b/nfvbench/traffic_gen/trex_gen.py new file mode 100644 index 0000000..dff72ac --- /dev/null +++ b/nfvbench/traffic_gen/trex_gen.py @@ -0,0 +1,1208 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Driver module for TRex traffic generator.""" + +import math +import os +import sys +import random +import time +import traceback +from functools import reduce + +from itertools import count +# pylint: disable=import-error +from scapy.contrib.mpls import MPLS # flake8: noqa +# pylint: enable=import-error +from nfvbench.log import LOG +from nfvbench.specs import ChainType +from nfvbench.traffic_server import TRexTrafficServer +from nfvbench.utils import cast_integer +from nfvbench.utils import timeout +from nfvbench.utils import TimeoutError + +from hdrh.histogram import HdrHistogram + +# pylint: disable=import-error +from trex.common.services.trex_service_arp import ServiceARP +from trex.stl.api import ARP +from trex.stl.api import bind_layers +from trex.stl.api import CTRexVmInsFixHwCs +from trex.stl.api import Dot1Q +from trex.stl.api import Ether +from trex.stl.api import FlagsField +from trex.stl.api import IP +from trex.stl.api import Packet +from trex.stl.api import STLClient +from trex.stl.api import STLError +from trex.stl.api import STLFlowLatencyStats +from trex.stl.api import STLFlowStats +from trex.stl.api import STLPktBuilder +from trex.stl.api import STLScVmRaw +from trex.stl.api import STLStream +from trex.stl.api import STLTXCont +from trex.stl.api import STLTXMultiBurst +from trex.stl.api import STLVmFixChecksumHw +from trex.stl.api import STLVmFixIpv4 +from trex.stl.api import STLVmFlowVar +from trex.stl.api import STLVmFlowVarRepeatableRandom +from trex.stl.api import STLVmTupleGen +from trex.stl.api import STLVmWrFlowVar +from trex.stl.api import ThreeBytesField +from trex.stl.api import UDP +from trex.stl.api import XByteField + +# pylint: enable=import-error + +from .traffic_base import AbstractTrafficGenerator +from .traffic_base import TrafficGeneratorException +from . import traffic_utils as utils +from .traffic_utils import IMIX_AVG_L2_FRAME_SIZE +from .traffic_utils import IMIX_L2_SIZES +from .traffic_utils import IMIX_RATIOS + +class VXLAN(Packet): + """VxLAN class.""" + + _VXLAN_FLAGS = ['R' * 27] + ['I'] + ['R' * 5] + name = "VXLAN" + fields_desc = [FlagsField("flags", 0x08000000, 32, _VXLAN_FLAGS), + ThreeBytesField("vni", 0), + XByteField("reserved", 0x00)] + + def mysummary(self): + """Summary.""" + return self.sprintf("VXLAN (vni=%VXLAN.vni%)") + +class TRex(AbstractTrafficGenerator): + """TRex traffic generator driver.""" + + LATENCY_PPS = 1000 + CHAIN_PG_ID_MASK = 0x007F + PORT_PG_ID_MASK = 0x0080 + LATENCY_PG_ID_MASK = 0x0100 + + def __init__(self, traffic_client): + """Trex driver.""" + AbstractTrafficGenerator.__init__(self, traffic_client) + self.client = None + self.id = count() + self.port_handle = [] + self.chain_count = self.generator_config.service_chain_count + self.rates = [] + self.capture_id = None + self.packet_list = [] + self.l2_frame_size = 0 + + def get_version(self): + """Get the Trex version.""" + return self.client.get_server_version() if self.client else '' + + def get_pg_id(self, port, chain_id): + """Calculate the packet group IDs to use for a given port/stream type/chain_id. + + port: 0 or 1 + chain_id: identifies to which chain the pg_id is associated (0 to 255) + return: pg_id, lat_pg_id + + We use a bit mask to set up the 3 fields: + 0x007F: chain ID (8 bits for a max of 128 chains) + 0x0080: port bit + 0x0100: latency bit + """ + pg_id = port * TRex.PORT_PG_ID_MASK | chain_id + return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK + + def extract_stats(self, in_stats, ifstats): + """Extract stats from dict returned by Trex API. + + :param in_stats: dict as returned by TRex api + """ + utils.nan_replace(in_stats) + # LOG.debug(in_stats) + + result = {} + # port_handles should have only 2 elements: [0, 1] + # so (1 - ph) will be the index for the far end port + for ph in self.port_handle: + stats = in_stats[ph] + far_end_stats = in_stats[1 - ph] + result[ph] = { + 'tx': { + 'total_pkts': cast_integer(stats['opackets']), + 'total_pkt_bytes': cast_integer(stats['obytes']), + 'pkt_rate': cast_integer(stats['tx_pps']), + 'pkt_bit_rate': cast_integer(stats['tx_bps']) + }, + 'rx': { + 'total_pkts': cast_integer(stats['ipackets']), + 'total_pkt_bytes': cast_integer(stats['ibytes']), + 'pkt_rate': cast_integer(stats['rx_pps']), + 'pkt_bit_rate': cast_integer(stats['rx_bps']), + # how many pkts were dropped in RX direction + # need to take the tx counter on the far end port + 'dropped_pkts': cast_integer( + far_end_stats['opackets'] - stats['ipackets']) + } + } + self.__combine_latencies(in_stats, result[ph]['rx'], ph) + + total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] + + # in case of GARP packets we need to base total_tx_pkts value using flow_stats + # as no GARP packets have no flow stats and will not be received on the other port + if self.config.periodic_gratuitous_arp: + if not self.config.no_flow_stats and not self.config.no_latency_stats: + global_total_tx_pkts = total_tx_pkts + total_tx_pkts = 0 + if ifstats: + for chain_id, _ in enumerate(ifstats): + for ph in self.port_handle: + pg_id, lat_pg_id = self.get_pg_id(ph, chain_id) + flows_tx_pkts = in_stats['flow_stats'][pg_id]['tx_pkts']['total'] + \ + in_stats['flow_stats'][lat_pg_id]['tx_pkts']['total'] + result[ph]['tx']['total_pkts'] = flows_tx_pkts + total_tx_pkts += flows_tx_pkts + else: + for pg_id in in_stats['flow_stats']: + if pg_id != 'global': + total_tx_pkts += in_stats['flow_stats'][pg_id]['tx_pkts']['total'] + result["garp_total_tx_rate"] = cast_integer( + (global_total_tx_pkts - total_tx_pkts) / self.config.duration_sec) + else: + LOG.warning("Gratuitous ARP are not received by the other port so TRex and NFVbench" + " see these packets as dropped. Please do not activate no_flow_stats" + " and no_latency_stats properties to have a better drop rate.") + + result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec) + # actual offered tx rate in bps + avg_packet_size = utils.get_average_packet_size(self.l2_frame_size) + total_tx_bps = utils.pps_to_bps(result["total_tx_rate"], avg_packet_size) + result['offered_tx_rate_bps'] = total_tx_bps + + result.update(self.get_theoretical_rates(avg_packet_size)) + + result["flow_stats"] = in_stats["flow_stats"] + result["latency"] = in_stats["latency"] + + # Merge HDRHistogram to have an overall value for all chains and ports + # (provided that the histogram exists in the stats returned by T-Rex) + # Of course, empty histograms will produce an empty (invalid) histogram. + try: + hdrh_list = [] + if ifstats: + for chain_id, _ in enumerate(ifstats): + for ph in self.port_handle: + _, lat_pg_id = self.get_pg_id(ph, chain_id) + hdrh_list.append( + HdrHistogram.decode(in_stats['latency'][lat_pg_id]['latency']['hdrh'])) + else: + for pg_id in in_stats['latency']: + if pg_id != 'global': + hdrh_list.append( + HdrHistogram.decode(in_stats['latency'][pg_id]['latency']['hdrh'])) + + def add_hdrh(x, y): + x.add(y) + return x + decoded_hdrh = reduce(add_hdrh, hdrh_list) + result["overall_hdrh"] = HdrHistogram.encode(decoded_hdrh).decode('utf-8') + except KeyError: + pass + + return result + + def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx): + """Extract the aggregated stats for a given chain. + + trex_stats: stats as returned by get_stats() + if_stats: a list of 2 interface stats to update (port 0 and 1) + latencies: a list of 2 Latency instances to update for this chain (port 0 and 1) + latencies[p] is the latency for packets sent on port p + if there are no latency streams, the Latency instances are not modified + chain_idx: chain index of the interface stats + + The packet counts include normal and latency streams. + + Trex returns flows stats as follows: + + 'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, + 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}}, + 1: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, + 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}}, + 128: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068}, + 'tx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc... + + the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the + get_pg_id() method. + packet counters for a given stream sent on port p are reported as: + - tx_pkts[p] on port p + - rx_pkts[1-p] on the far end port + + This is a tricky/critical counter transposition operation because + the results are grouped by port (not by stream): + tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0] + rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0] + tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1] + rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1] + + or using a more generic formula: + tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p] + rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p] + + the second formula is equivalent to + rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p] + + If there are latency streams, those same counters need to be added in the same way + """ + def get_latency(lval): + try: + return int(round(lval)) + except ValueError: + return 0 + + for ifs in if_stats: + ifs.tx = ifs.rx = 0 + for port in range(2): + pg_id, lat_pg_id = self.get_pg_id(port, chain_idx) + for pid in [pg_id, lat_pg_id]: + try: + pg_stats = trex_stats['flow_stats'][pid] + if_stats[port].tx += pg_stats['tx_pkts'][port] + if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port] + except KeyError: + pass + try: + lat = trex_stats['latency'][lat_pg_id]['latency'] + # dropped_pkts += lat['err_cntrs']['dropped'] + latencies[port].max_usec = get_latency(lat['total_max']) + if math.isnan(lat['total_min']): + latencies[port].min_usec = 0 + latencies[port].avg_usec = 0 + else: + latencies[port].min_usec = get_latency(lat['total_min']) + latencies[port].avg_usec = get_latency(lat['average']) + # pick up the HDR histogram if present (otherwise will raise KeyError) + latencies[port].hdrh = lat['hdrh'] + except KeyError: + pass + + def __combine_latencies(self, in_stats, results, port_handle): + """Traverse TRex result dictionary and combines chosen latency stats. + + example of latency dict returned by trex (2 chains): + 'latency': {256: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 26.5, + 'hdrh': u'HISTFAAAAEx4nJNpmSgz...bFRgxi', + 'histogram': {20: 303, + 30: 320, + 40: 300, + 50: 73, + 60: 4, + 70: 1}, + 'jitter': 14, + 'last_max': 63, + 'total_max': 63, + 'total_min': 20}}, + 257: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 29.75, + 'hdrh': u'HISTFAAAAEV4nJN...CALilDG0=', + 'histogram': {20: 261, + 30: 431, + 40: 3, + 50: 80, + 60: 225}, + 'jitter': 23, + 'last_max': 67, + 'total_max': 67, + 'total_min': 20}}, + 384: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 18.0, + 'hdrh': u'HISTFAAAADR4nJNpm...MjCwDDxAZG', + 'histogram': {20: 987, 30: 14}, + 'jitter': 0, + 'last_max': 34, + 'total_max': 34, + 'total_min': 20}}, + 385: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 19.0, + 'hdrh': u'HISTFAAAADR4nJNpm...NkYmJgDdagfK', + 'histogram': {20: 989, 30: 11}, + 'jitter': 0, + 'last_max': 38, + 'total_max': 38, + 'total_min': 20}}, + 'global': {'bad_hdr': 0, 'old_flow': 0}}, + """ + total_max = 0 + average = 0 + total_min = float("inf") + for chain_id in range(self.chain_count): + try: + _, lat_pg_id = self.get_pg_id(port_handle, chain_id) + lat = in_stats['latency'][lat_pg_id]['latency'] + # dropped_pkts += lat['err_cntrs']['dropped'] + total_max = max(lat['total_max'], total_max) + total_min = min(lat['total_min'], total_min) + average += lat['average'] + except KeyError: + pass + if total_min == float("inf"): + total_min = 0 + results['min_delay_usec'] = total_min + results['max_delay_usec'] = total_max + results['avg_delay_usec'] = int(average / self.chain_count) + + def _bind_vxlan(self): + bind_layers(UDP, VXLAN, dport=4789) + bind_layers(VXLAN, Ether) + + def _create_pkt(self, stream_cfg, l2frame_size, disable_random_latency_flow=False): + """Create a packet of given size. + + l2frame_size: size of the L2 frame in bytes (including the 32-bit FCS) + """ + # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size + frame_size = int(l2frame_size) - 4 + vm_param = [] + if stream_cfg['vxlan'] is True: + self._bind_vxlan() + encap_level = '1' + pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac']) + if stream_cfg['vtep_vlan'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + pkt_base /= IP(src=stream_cfg['vtep_src_ip'], dst=stream_cfg['vtep_dst_ip']) + pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789) + pkt_base /= VXLAN(vni=stream_cfg['net_vni']) + pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + # need to randomize the outer header UDP src port based on flow + vxlan_udp_src_fv = STLVmFlowVar( + name="vxlan_udp_src", + min_value=1337, + max_value=32767, + size=2, + op="random") + vm_param = [vxlan_udp_src_fv, + STLVmWrFlowVar(fv_name="vxlan_udp_src", pkt_offset="UDP.sport")] + elif stream_cfg['mpls'] is True: + encap_level = '0' + pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac']) + if stream_cfg['vtep_vlan'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + if stream_cfg['mpls_outer_label'] is not None: + pkt_base /= MPLS(label=stream_cfg['mpls_outer_label'], cos=1, s=0, ttl=255) + if stream_cfg['mpls_inner_label'] is not None: + pkt_base /= MPLS(label=stream_cfg['mpls_inner_label'], cos=1, s=1, ttl=255) + # Flow stats and MPLS labels randomization TBD + pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + else: + encap_level = '0' + pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + + if stream_cfg['vlan_tag'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) + + udp_args = {} + if stream_cfg['udp_src_port']: + udp_args['sport'] = int(stream_cfg['udp_src_port']) + if stream_cfg['udp_port_step'] == 'random': + step = 1 + else: + step = stream_cfg['udp_port_step'] + udp_args['sport_step'] = int(step) + udp_args['sport_max'] = int(stream_cfg['udp_src_port_max']) + if stream_cfg['udp_dst_port']: + udp_args['dport'] = int(stream_cfg['udp_dst_port']) + if stream_cfg['udp_port_step'] == 'random': + step = 1 + else: + step = stream_cfg['udp_port_step'] + udp_args['dport_step'] = int(step) + udp_args['dport_max'] = int(stream_cfg['udp_dst_port_max']) + + pkt_base /= IP(src=stream_cfg['ip_src_addr'], dst=stream_cfg['ip_dst_addr']) / \ + UDP(dport=udp_args['dport'], sport=udp_args['sport']) + + # STLVmTupleGen need flow count >= cores used by TRex, if FC < cores we used STLVmFlowVar + if stream_cfg['ip_addrs_step'] == '0.0.0.1' and stream_cfg['udp_port_step'] == '1' and \ + stream_cfg['count'] >= self.generator_config.cores: + src_fv = STLVmTupleGen(ip_min=stream_cfg['ip_src_addr'], + ip_max=stream_cfg['ip_src_addr_max'], + port_min=udp_args['sport'], + port_max=udp_args['sport_max'], + name="tuple_src", + limit_flows=stream_cfg['count']) + dst_fv = STLVmTupleGen(ip_min=stream_cfg['ip_dst_addr'], + ip_max=stream_cfg['ip_dst_addr_max'], + port_min=udp_args['dport'], + port_max=udp_args['dport_max'], + name="tuple_dst", + limit_flows=stream_cfg['count']) + vm_param = [ + src_fv, + STLVmWrFlowVar(fv_name="tuple_src.ip", + pkt_offset="IP:{}.src".format(encap_level)), + STLVmWrFlowVar(fv_name="tuple_src.port", + pkt_offset="UDP:{}.sport".format(encap_level)), + dst_fv, + STLVmWrFlowVar(fv_name="tuple_dst.ip", + pkt_offset="IP:{}.dst".format(encap_level)), + STLVmWrFlowVar(fv_name="tuple_dst.port", + pkt_offset="UDP:{}.dport".format(encap_level)), + ] + else: + if disable_random_latency_flow: + src_fv_ip = STLVmFlowVar( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr'], + size=4) + dst_fv_ip = STLVmFlowVar( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr'], + size=4) + elif stream_cfg['ip_addrs_step'] == 'random': + src_fv_ip = STLVmFlowVarRepeatableRandom( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr_max'], + size=4, + seed=random.randint(0, 32767), + limit=stream_cfg['ip_src_count']) + dst_fv_ip = STLVmFlowVarRepeatableRandom( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr_max'], + size=4, + seed=random.randint(0, 32767), + limit=stream_cfg['ip_dst_count']) + else: + src_fv_ip = STLVmFlowVar( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr_max'], + size=4, + op="inc", + step=stream_cfg['ip_addrs_step']) + dst_fv_ip = STLVmFlowVar( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr_max'], + size=4, + op="inc", + step=stream_cfg['ip_addrs_step']) + + if disable_random_latency_flow: + src_fv_port = STLVmFlowVar( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport'], + size=2) + dst_fv_port = STLVmFlowVar( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport'], + size=2) + elif stream_cfg['udp_port_step'] == 'random': + src_fv_port = STLVmFlowVarRepeatableRandom( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport_max'], + size=2, + seed=random.randint(0, 32767), + limit=stream_cfg['udp_src_count']) + dst_fv_port = STLVmFlowVarRepeatableRandom( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport_max'], + size=2, + seed=random.randint(0, 32767), + limit=stream_cfg['udp_dst_count']) + else: + src_fv_port = STLVmFlowVar( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport_max'], + size=2, + op="inc", + step=udp_args['sport_step']) + dst_fv_port = STLVmFlowVar( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport_max'], + size=2, + op="inc", + step=udp_args['dport_step']) + vm_param = [ + src_fv_ip, + STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)), + src_fv_port, + STLVmWrFlowVar(fv_name="p_src", pkt_offset="UDP:{}.sport".format(encap_level)), + dst_fv_ip, + STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)), + dst_fv_port, + STLVmWrFlowVar(fv_name="p_dst", pkt_offset="UDP:{}.dport".format(encap_level)), + ] + # Use HW Offload to calculate the outter IP/UDP packet + vm_param.append(STLVmFixChecksumHw(l3_offset="IP:0", + l4_offset="UDP:0", + l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)) + # Use software to fix the inner IP/UDP payload for VxLAN packets + if int(encap_level): + vm_param.append(STLVmFixIpv4(offset="IP:1")) + pad = max(0, frame_size - len(pkt_base)) * 'x' + + return STLPktBuilder(pkt=pkt_base / pad, + vm=STLScVmRaw(vm_param, cache_size=int(self.config.cache_size))) + + def _create_gratuitous_arp_pkt(self, stream_cfg): + """Create a GARP packet. + + """ + pkt_base = Ether(src=stream_cfg['mac_src'], dst="ff:ff:ff:ff:ff:ff") + + if self.config.vxlan or self.config.mpls: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + elif stream_cfg['vlan_tag'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) + + pkt_base /= ARP(psrc=stream_cfg['ip_src_tg_gw'], hwsrc=stream_cfg['mac_src'], + hwdst=stream_cfg['mac_src'], pdst=stream_cfg['ip_src_tg_gw']) + + return STLPktBuilder(pkt=pkt_base) + + def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True, + e2e=False): + """Create a list of streams corresponding to a given chain and stream config. + + port: port where the streams originate (0 or 1) + chain_id: the chain to which the streams are associated to + stream_cfg: stream configuration + l2frame: L2 frame size (including 4-byte FCS) or 'IMIX' + latency: if True also create a latency stream + e2e: True if performing "end to end" connectivity check + """ + streams = [] + pg_id, lat_pg_id = self.get_pg_id(port, chain_id) + if l2frame == 'IMIX': + for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES): + pkt = self._create_pkt(stream_cfg, l2_frame_size) + if e2e or stream_cfg['mpls']: + streams.append(STLStream(packet=pkt, + mode=STLTXCont(pps=ratio))) + else: + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id, + vxlan=True) + if not self.config.no_flow_stats else None, + mode=STLTXCont(pps=ratio))) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id) + if not self.config.no_flow_stats else None, + mode=STLTXCont(pps=ratio))) + + if latency: + # for IMIX, the latency packets have the average IMIX packet size + if stream_cfg['ip_addrs_step'] == 'random' or \ + stream_cfg['udp_port_step'] == 'random': + # Force latency flow to only one flow to avoid creating flows + # over requested flow count + pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE, True) + else: + pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE) + + else: + l2frame_size = int(l2frame) + pkt = self._create_pkt(stream_cfg, l2frame_size) + if self.config.periodic_gratuitous_arp: + requested_pps = int(utils.parse_rate_str(self.rates[0])[ + 'rate_pps']) - self.config.gratuitous_arp_pps + if latency: + requested_pps -= self.LATENCY_PPS + stltx_cont = STLTXCont(pps=requested_pps) + else: + stltx_cont = STLTXCont() + if e2e or stream_cfg['mpls']: + streams.append(STLStream(packet=pkt, + # Flow stats is disabled for MPLS now + # flow_stats=STLFlowStats(pg_id=pg_id), + mode=stltx_cont)) + else: + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id, + vxlan=True) + if not self.config.no_flow_stats else None, + mode=stltx_cont)) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id) + if not self.config.no_flow_stats else None, + mode=stltx_cont)) + # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging + # without vlan, the min l2 frame size is 64 + # with vlan it is 68 + # This only applies to the latency stream + if latency: + if stream_cfg['vlan_tag'] and l2frame_size < 68: + l2frame_size = 68 + if stream_cfg['ip_addrs_step'] == 'random' or \ + stream_cfg['udp_port_step'] == 'random': + # Force latency flow to only one flow to avoid creating flows + # over requested flow count + pkt = self._create_pkt(stream_cfg, l2frame_size, True) + else: + pkt = self._create_pkt(stream_cfg, l2frame_size) + + if latency: + if self.config.no_latency_stats: + LOG.info("Latency flow statistics are disabled.") + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id, + vxlan=True) + if not self.config.no_latency_stats else None, + mode=STLTXCont(pps=self.LATENCY_PPS))) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id) + if not self.config.no_latency_stats else None, + mode=STLTXCont(pps=self.LATENCY_PPS))) + + if self.config.periodic_gratuitous_arp and ( + self.config.l3_router or self.config.service_chain == ChainType.EXT): + # In case of L3 router feature or EXT chain with router + # and depending on ARP stale time SUT configuration + # Gratuitous ARP from TG port to the router is needed to keep traffic up + garp_pkt = self._create_gratuitous_arp_pkt(stream_cfg) + ibg = self.config.gratuitous_arp_pps * 1000000.0 + packets_count = int(self.config.duration_sec / self.config.gratuitous_arp_pps) + streams.append( + STLStream(packet=garp_pkt, + mode=STLTXMultiBurst(pkts_per_burst=1, count=packets_count, ibg=ibg))) + return streams + + @timeout(5) + def __connect(self, client): + client.connect() + + def __local_server_status(self): + """ The TRex server may have started but failed initializing... and stopped. + This piece of code is especially designed to address + the case when a fatal failure occurs on a DPDK init call. + The TRex algorihm should be revised to include some missing timeouts (?) + status returned: + 0: no error detected + 1: fatal error detected - should lead to exiting the run + 2: error detected that could be solved by starting again + The diagnostic is based on parsing the local trex log file (improvable) + """ + status = 0 + message = None + failure = None + exited = None + cause = None + error = None + before = None + after = None + last = None + try: + with open('/tmp/trex.log', 'r', encoding="utf-8") as trex_log: + for _line in trex_log: + line = _line.strip() + if line.startswith('Usage:'): + break + if 'ports are bound' in line: + continue + if 'please wait' in line: + continue + if 'exit' in line.lower(): + exited = line + elif 'cause' in line.lower(): + cause = line + elif 'fail' in line.lower(): + failure = line + elif 'msg' in line.lower(): + message = line + elif (error is not None) and line: + after = line + elif line.startswith('Error:') or line.startswith('ERROR'): + error = line + before = last + last = line + except FileNotFoundError: + pass + if exited is not None: + status = 1 + LOG.info("\x1b[1m%s\x1b[0m %s", 'TRex failed initializing:', exited) + if cause is not None: + LOG.info("TRex [cont'd] %s", cause) + if failure is not None: + LOG.info("TRex [cont'd] %s", failure) + if message is not None: + LOG.info("TRex [cont'd] %s", message) + if 'not supported yet' in message.lower(): + LOG.info("TRex [cont'd] Try starting again!") + status = 2 + elif error is not None: + status = 1 + LOG.info("\x1b[1m%s\x1b[0m %s", 'TRex failed initializing:', error) + if after is not None: + LOG.info("TRex [cont'd] %s", after) + elif before is not None: + LOG.info("TRex [cont'd] %s", before) + return status + + def __connect_after_start(self): + # after start, Trex may take a bit of time to initialize + # so we need to retry a few times + # we try to capture recoverable error cases (checking status) + status = 0 + for it in range(self.config.generic_retry_count): + try: + time.sleep(1) + self.client.connect() + break + except Exception as ex: + if it == (self.config.generic_retry_count - 1): + raise + status = self.__local_server_status() + if status > 0: + # No need to wait anymore, something went wrong and TRex exited + if status == 1: + LOG.info("\x1b[1m%s\x1b[0m", 'TRex failed starting!') + print("More information? Try the command: " + + "\x1b[1mnfvbench --show-trex-log\x1b[0m") + sys.exit(0) + if status == 2: + # a new start will follow + return status + LOG.info("Retrying connection to TRex (%s)...", ex.msg) + return status + + def connect(self): + """Connect to the TRex server.""" + status = 0 + server_ip = self.generator_config.ip + LOG.info("Connecting to TRex (%s)...", server_ip) + + # Connect to TRex server + self.client = STLClient(server=server_ip, sync_port=self.generator_config.zmq_rpc_port, + async_port=self.generator_config.zmq_pub_port) + try: + self.__connect(self.client) + if server_ip == '127.0.0.1': + config_updated = self.__check_config() + if config_updated or self.config.restart: + status = self.__restart() + except (TimeoutError, STLError) as e: + if server_ip == '127.0.0.1': + status = self.__start_local_server() + else: + raise TrafficGeneratorException(e.message) from e + + if status == 2: + # Workaround in case of a failed TRex server initialization + # we try to start it again (twice maximum) + # which may allow low level initialization to complete. + if self.__start_local_server() == 2: + self.__start_local_server() + + ports = list(self.generator_config.ports) + self.port_handle = ports + # Prepare the ports + self.client.reset(ports) + # Read HW information from each port + # this returns an array of dict (1 per port) + """ + Example of output for Intel XL710 + [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True, + 'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes', + u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']}, + u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE', + u'description': u'Ethernet Controller XL710 for 40GbE QSFP+', + 'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-', + u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match', + 'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0', + 'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off', + 'layer_mode': 'Ethernet', u'numa': 0}, ...] + """ + self.port_info = self.client.get_port_info(ports) + LOG.info('Connected to TRex') + for id, port in enumerate(self.port_info): + LOG.info(' Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s', + id, port['description'], port['speed'], port['src_mac'], + port['pci_addr'], port['driver']) + # Make sure the 2 ports have the same speed + if self.port_info[0]['speed'] != self.port_info[1]['speed']: + raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' % + (self.port_info[0]['speed'], + self.port_info[1]['speed'])) + + def __start_local_server(self): + try: + LOG.info("Starting TRex ...") + self.__start_server() + status = self.__connect_after_start() + except (TimeoutError, STLError) as e: + LOG.error('Cannot connect to TRex') + LOG.error(traceback.format_exc()) + logpath = '/tmp/trex.log' + if os.path.isfile(logpath): + # Wait for TRex to finish writing error message + last_size = 0 + for _ in range(self.config.generic_retry_count): + size = os.path.getsize(logpath) + if size == last_size: + # probably not writing anymore + break + last_size = size + time.sleep(1) + with open(logpath, 'r', encoding="utf-8") as f: + message = f.read() + else: + message = e.message + raise TrafficGeneratorException(message) from e + return status + + def __start_server(self): + server = TRexTrafficServer() + server.run_server(self.generator_config) + + def __check_config(self): + server = TRexTrafficServer() + return server.check_config_updated(self.generator_config) + + def __restart(self): + LOG.info("Restarting TRex ...") + self.__stop_server() + # Wait for server stopped + for _ in range(self.config.generic_retry_count): + time.sleep(1) + if not self.client.is_connected(): + LOG.info("TRex is stopped...") + break + # Start and report a possible failure + return self.__start_local_server() + + def __stop_server(self): + if self.generator_config.ip == '127.0.0.1': + ports = self.client.get_acquired_ports() + LOG.info('Release ports %s and stopping TRex...', ports) + try: + if ports: + self.client.release(ports=ports) + self.client.server_shutdown() + except STLError as e: + LOG.warning('Unable to stop TRex. Error: %s', e) + else: + LOG.info('Using remote TRex. Unable to stop TRex') + + def resolve_arp(self): + """Resolve all configured remote IP addresses. + + return: None if ARP failed to resolve for all IP addresses + else a dict of list of dest macs indexed by port# + the dest macs in the list are indexed by the chain id + """ + self.client.set_service_mode(ports=self.port_handle) + LOG.info('Polling ARP until successful...') + arp_dest_macs = {} + for port, device in zip(self.port_handle, self.generator_config.devices): + # there should be 1 stream config per chain + stream_configs = device.get_stream_configs() + chain_count = len(stream_configs) + ctx = self.client.create_service_ctx(port=port) + # all dest macs on this port indexed by chain ID + dst_macs = [None] * chain_count + dst_macs_count = 0 + # the index in the list is the chain id + if self.config.vxlan or self.config.mpls: + arps = [ + ServiceARP(ctx, + src_ip=device.vtep_src_ip, + dst_ip=device.vtep_dst_ip, + vlan=device.vtep_vlan) + for cfg in stream_configs + ] + else: + arps = [ + ServiceARP(ctx, + src_ip=cfg['ip_src_tg_gw'], + dst_ip=cfg['mac_discovery_gw'], + # will be None if no vlan tagging + vlan=cfg['vlan_tag']) + for cfg in stream_configs + ] + + for attempt in range(self.config.generic_retry_count): + try: + ctx.run(arps) + except STLError: + LOG.error(traceback.format_exc()) + continue + + unresolved = [] + for chain_id, mac in enumerate(dst_macs): + if not mac: + arp_record = arps[chain_id].get_record() + if arp_record.dst_mac: + dst_macs[chain_id] = arp_record.dst_mac + dst_macs_count += 1 + LOG.info(' ARP: port=%d chain=%d src IP=%s dst IP=%s -> MAC=%s', + port, chain_id, + arp_record.src_ip, + arp_record.dst_ip, arp_record.dst_mac) + else: + unresolved.append(arp_record.dst_ip) + if dst_macs_count == chain_count: + arp_dest_macs[port] = dst_macs + LOG.info('ARP resolved successfully for port %s', port) + break + + retry = attempt + 1 + LOG.info('Retrying ARP for: %s (retry %d/%d)', + unresolved, retry, self.config.generic_retry_count) + if retry < self.config.generic_retry_count: + time.sleep(self.config.generic_poll_sec) + else: + LOG.error('ARP timed out for port %s (resolved %d out of %d)', + port, + dst_macs_count, + chain_count) + break + + # A traffic capture may have been started (from a T-Rex console) at this time. + # If asked so, we keep the service mode enabled here, and disable it otherwise. + # | Disabling the service mode while a capture is in progress + # | would cause the application to stop/crash with an error. + if not self.config.service_mode: + self.client.set_service_mode(ports=self.port_handle, enabled=False) + if len(arp_dest_macs) == len(self.port_handle): + return arp_dest_macs + return None + + def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency): + """Check if rate provided by user is above requirements. Applies only if latency is True.""" + intf_speed = self.generator_config.intf_speed + if latency: + if bidirectional: + mult = 2 + total_rate = 0 + for rate in rates: + r = utils.convert_rates(l2frame_size, rate, intf_speed) + total_rate += int(r['rate_pps']) + else: + mult = 1 + r = utils.convert_rates(l2frame_size, rates[0], intf_speed) + total_rate = int(r['rate_pps']) + # rate must be enough for latency stream and at least 1 pps for base stream per chain + if self.config.periodic_gratuitous_arp: + required_rate = (self.LATENCY_PPS + 1 + self.config.gratuitous_arp_pps) \ + * self.config.service_chain_count * mult + else: + required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult + result = utils.convert_rates(l2frame_size, + {'rate_pps': required_rate}, + intf_speed * mult) + result['result'] = total_rate >= required_rate + return result + + return {'result': True} + + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): + """Program all the streams in Trex server. + + l2frame_size: L2 frame size or IMIX + rates: a list of 2 rates to run each direction + each rate is a dict like {'rate_pps': '10kpps'} + bidirectional: True if bidirectional + latency: True if latency measurement is needed + e2e: True if performing "end to end" connectivity check + """ + if self.config.no_flow_stats: + LOG.info("Traffic flow statistics are disabled.") + r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) + if not r['result']: + raise TrafficGeneratorException( + 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.' + .format(pps=r['rate_pps'], + bps=r['rate_bps'], + load=r['rate_percent'])) + self.l2_frame_size = l2frame_size + # a dict of list of streams indexed by port# + # in case of fixed size, has self.chain_count * 2 * 2 streams + # (1 normal + 1 latency stream per direction per chain) + # for IMIX, has self.chain_count * 2 * 4 streams + # (3 normal + 1 latency stream per direction per chain) + streamblock = {} + for port in self.port_handle: + streamblock[port] = [] + stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices] + if self.generator_config.ip_addrs_step == 'random' \ + or self.generator_config.gen_config.udp_port_step == 'random': + LOG.warning("Using random step, the number of flows can be less than " + "the requested number of flows due to repeatable multivariate random " + "generation which can reproduce the same pattern of values") + self.rates = [utils.to_rate_str(rate) for rate in rates] + for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): + streamblock[0].extend(self.generate_streams(self.port_handle[0], + chain_id, + fwd_stream_cfg, + l2frame_size, + latency=latency, + e2e=e2e)) + if len(self.rates) > 1: + streamblock[1].extend(self.generate_streams(self.port_handle[1], + chain_id, + rev_stream_cfg, + l2frame_size, + latency=bidirectional and latency, + e2e=e2e)) + + for port in self.port_handle: + if self.config.vxlan: + self.client.set_port_attr(ports=port, vxlan_fs=[4789]) + else: + self.client.set_port_attr(ports=port, vxlan_fs=None) + self.client.add_streams(streamblock[port], ports=port) + LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port) + + def clear_streamblock(self): + """Clear all streams from TRex.""" + self.rates = [] + self.client.reset(self.port_handle) + LOG.info('Cleared all existing streams') + + def get_stats(self, ifstats=None): + """Get stats from Trex.""" + stats = self.client.get_stats() + return self.extract_stats(stats, ifstats) + + def get_macs(self): + """Return the Trex local port MAC addresses. + + return: a list of MAC addresses indexed by the port# + """ + return [port['src_mac'] for port in self.port_info] + + def get_port_speed_gbps(self): + """Return the Trex local port MAC addresses. + + return: a list of speed in Gbps indexed by the port# + """ + return [port['speed'] for port in self.port_info] + + def clear_stats(self): + """Clear all stats in the traffic gneerator.""" + if self.port_handle: + self.client.clear_stats() + + def start_traffic(self): + """Start generating traffic in all ports.""" + for port, rate in zip(self.port_handle, self.rates): + self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True) + + def stop_traffic(self): + """Stop generating traffic.""" + self.client.stop(ports=self.port_handle) + + def start_capture(self): + """Capture all packets on both ports that are unicast to us.""" + if self.capture_id: + self.stop_capture() + # Need to filter out unwanted packets so we do not end up counting + # src MACs of frames that are not unicast to us + src_mac_list = self.get_macs() + bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1]) + # ports must be set in service in order to enable capture + self.client.set_service_mode(ports=self.port_handle) + self.capture_id = self.client.start_capture \ + (rx_ports=self.port_handle, bpf_filter=bpf_filter) + + def fetch_capture_packets(self): + """Fetch capture packets in capture mode.""" + if self.capture_id: + self.packet_list = [] + self.client.fetch_capture_packets(capture_id=self.capture_id['id'], + output=self.packet_list) + + def stop_capture(self): + """Stop capturing packets.""" + if self.capture_id: + self.client.stop_capture(capture_id=self.capture_id['id']) + self.capture_id = None + # A traffic capture may have been started (from a T-Rex console) at this time. + # If asked so, we keep the service mode enabled here, and disable it otherwise. + # | Disabling the service mode while a capture is in progress + # | would cause the application to stop/crash with an error. + if not self.config.service_mode: + self.client.set_service_mode(ports=self.port_handle, enabled=False) + + def cleanup(self): + """Cleanup Trex driver.""" + if self.client: + try: + self.client.reset(self.port_handle) + self.client.disconnect() + except STLError: + # TRex does not like a reset while in disconnected state + pass + + def set_service_mode(self, enabled=True): + """Enable/disable the 'service' mode.""" + self.client.set_service_mode(ports=self.port_handle, enabled=enabled) diff --git a/nfvbench/traffic_server.py b/nfvbench/traffic_server.py index c3d4d14..5111b32 100644 --- a/nfvbench/traffic_server.py +++ b/nfvbench/traffic_server.py @@ -16,7 +16,7 @@ import os import subprocess import yaml -from log import LOG +from .log import LOG class TrafficServerException(Exception): @@ -49,26 +49,134 @@ class TRexTrafficServer(TrafficServer): mbuf_opt = "--mbuf-factor " + str(generator_config.mbuf_factor) else: mbuf_opt = "" - subprocess.Popen(['nohup', '/bin/bash', '-c', - './t-rex-64 -i -c {} --iom 0 --no-scapy-server --close-at-end {} ' - '{} {} --cfg {} &> /tmp/trex.log & disown'.format(cores, sw_mode, - vlan_opt, - mbuf_opt, cfg)], - cwd=self.trex_dir) - LOG.info('TRex server is running...') + hdrh_opt = "--hdrh" if generator_config.hdrh else "" + # --unbind-unused-ports: for NIC that have more than 2 ports such as Intel X710 + # this will instruct trex to unbind all ports that are unused instead of + # erroring out with an exception (i40e only) + # Try: --ignore-528-issue -> neither unbind nor exit with error, + # just proceed cause it might work! + # Note that force unbinding is probably a bad choice: + # we can't assume for sure that other ports are "unused". + # The default TRex behaviour - exit - is indeed a safer option; + # a message informs about the ports that should be unbound. + i40e_opt = ("--ignore-528-issue" if + generator_config.config.i40e_mixed == 'ignore' else + "--unbind-unused-ports" if + generator_config.config.i40e_mixed == 'unbind' else "") + cmd = ['nohup', '/bin/bash', '-c', + './t-rex-64 -i -c {} --iom 0 --no-scapy-server ' + '--close-at-end {} {} {} ' + '{} {} --cfg {} &> /tmp/trex.log & disown'.format(cores, sw_mode, + i40e_opt, + vlan_opt, + hdrh_opt, + mbuf_opt, cfg)] + LOG.info(' '.join(cmd)) + with subprocess.Popen(cmd, cwd=self.trex_dir) as trex_process: + LOG.info('TRex server is running (PID: %s)...', trex_process.pid) - def __save_config(self, generator_config, filename): - ifs = ",".join([repr(pci) for pci in generator_config.pcis]) - - result = """# Config generated by NFVbench - - port_limit : 2 - version : 2 - interfaces : [{ifs}]""".format(ifs=ifs) + def __load_config(self, filename): + result = {} + if os.path.exists(filename): + with open(filename, 'r', encoding="utf-8") as stream: + try: + result = yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(exc) + return result + def __save_config(self, generator_config, filename): + result = self.__prepare_config(generator_config) yaml.safe_load(result) if os.path.exists(filename): os.remove(filename) - with open(filename, 'w') as f: + with open(filename, 'w', encoding="utf-8") as f: f.write(result) - return filename + + def __prepare_config(self, generator_config): + ifs = ",".join([repr(pci) for pci in generator_config.pcis]) + + # For consistency and stability reasons, the T-Rex server + # should be forciby restarted each time the value of a + # parameter, specified as one of the starting command line + # arguments, has been modified since the last launch. + # Hence we add some extra fields to the config file + # (nb_cores, use_vlan, mbuf_factor, i40e_mixed, hdrh) + # which will serve as a memory between runs - + # while being actually ignored by the T-Rex server. + + result = """# Config generated by NFVbench + - port_limit : 2 + version : 2 + zmq_pub_port : {zmq_pub_port} + zmq_rpc_port : {zmq_rpc_port} + prefix : {prefix} + limit_memory : {limit_memory} + command_line : + sw_mode : {sw_mode} + mbuf_factor: {mbuf_factor} + hdrh : {hdrh} + nb_cores : {nb_cores} + use_vlan : {use_vlan} + i40e_mixed : {i40e_mixed} + interfaces : [{ifs}]""".format( + zmq_pub_port=generator_config.zmq_pub_port, + zmq_rpc_port=generator_config.zmq_rpc_port, + prefix=generator_config.name, + limit_memory=generator_config.limit_memory, + sw_mode=generator_config.software_mode, + mbuf_factor=generator_config.mbuf_factor, + hdrh=generator_config.hdrh, + nb_cores=generator_config.cores, + use_vlan=generator_config.gen_config.get('vtep_vlan') or + generator_config.vlan_tagging, + i40e_mixed=generator_config.config.i40e_mixed, + ifs=ifs) + + if hasattr(generator_config, 'mbuf_64') and generator_config.mbuf_64: + result += """ + memory : + mbuf_64 : {mbuf_64}""".format(mbuf_64=generator_config.mbuf_64) + + if self.__check_platform_config(generator_config): + try: + platform = """ + platform : + master_thread_id : {master_thread_id} + latency_thread_id : {latency_thread_id} + dual_if:""".format(master_thread_id=generator_config.gen_config.platform. + master_thread_id, + latency_thread_id=generator_config.gen_config.platform. + latency_thread_id) + result += platform + + for core in generator_config.gen_config.platform.dual_if: + threads = "" + try: + threads = ",".join([repr(thread) for thread in core.threads]) + except TypeError: + LOG.warning("No threads defined for socket %s", core.socket) + core_result = """ + - socket : {socket} + threads : [{threads}]""".format(socket=core.socket, threads=threads) + result += core_result + except (KeyError, AttributeError): + pass + return result + "\n" + + def __check_platform_config(self, generator_config): + return hasattr(generator_config.gen_config, 'platform') \ + and hasattr(generator_config.gen_config.platform, "master_thread_id") \ + and generator_config.gen_config.platform.master_thread_id is not None \ + and hasattr(generator_config.gen_config.platform, "latency_thread_id") \ + and generator_config.gen_config.platform.latency_thread_id is not None + + def check_config_updated(self, generator_config): + existing_config = self.__load_config(filename='/etc/trex_cfg.yaml') + new_config = yaml.safe_load(self.__prepare_config(generator_config)) + LOG.debug("Existing config: %s", existing_config) + LOG.debug("New config: %s", new_config) + if existing_config == new_config: + return False + return True diff --git a/nfvbench/utils.py b/nfvbench/utils.py index ecbb55a..07a38cb 100644 --- a/nfvbench/utils.py +++ b/nfvbench/utils.py @@ -13,6 +13,8 @@ # under the License. import glob +import time +from math import gcd from math import isnan import os import re @@ -23,8 +25,9 @@ import errno import fcntl from functools import wraps import json -from log import LOG - +from .log import LOG +from nfvbench.traffic_gen.traffic_utils import multiplier_map +from novaclient.exceptions import NotFound class TimeoutError(Exception): pass @@ -50,7 +53,7 @@ def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def save_json_result(result, json_file, std_json_path, service_chain, service_chain_count, - flow_count, frame_sizes): + flow_count, frame_sizes, user_id=None, group_id=None): """Save results in json format file.""" filepaths = [] if json_file: @@ -63,29 +66,18 @@ def save_json_result(result, json_file, std_json_path, service_chain, service_ch if filepaths: for file_path in filepaths: LOG.info('Saving results in json file: %s...', file_path) - with open(file_path, 'w') as jfp: + with open(file_path, 'w', encoding="utf-8") as jfp: json.dump(result, jfp, indent=4, sort_keys=True, separators=(',', ': '), default=lambda obj: obj.to_json()) - - -def byteify(data, ignore_dicts=False): - # if this is a unicode string, return its string representation - if isinstance(data, unicode): - return data.encode('utf-8') - # if this is a list of values, return list of byteified values - if isinstance(data, list): - return [byteify(item, ignore_dicts=ignore_dicts) for item in data] - # if this is a dictionary, return dictionary of byteified keys and values - # but only if we haven't already byteified it - if isinstance(data, dict) and not ignore_dicts: - return {byteify(key, ignore_dicts=ignore_dicts): byteify(value, ignore_dicts=ignore_dicts) - for key, value in data.iteritems()} - # if it's anything else, return it in its original form - return data + # possibly change file ownership + if group_id is None: + group_id = user_id + if user_id is not None: + os.chown(file_path, user_id, group_id) def dict_to_json_dict(record): @@ -113,8 +105,8 @@ def get_intel_pci(nic_slot=None, nic_ports=None): if nic_slot and nic_ports: dmidecode = subprocess.check_output(['dmidecode', '-t', 'slot']) - regex = r"(?<=SlotID:%s).*?(....:..:..\..)" % nic_slot - match = re.search(regex, dmidecode, flags=re.DOTALL) + regex = r"(?<=SlotID:{}).*?(....:..:..\..)".format(nic_slot) + match = re.search(regex, dmidecode.decode('utf-8'), flags=re.DOTALL) if not match: return None @@ -135,47 +127,41 @@ def get_intel_pci(nic_slot=None, nic_ports=None): trex_base_dir = '/opt/trex' contents = os.listdir(trex_base_dir) trex_dir = os.path.join(trex_base_dir, contents[0]) - process = subprocess.Popen(['python', 'dpdk_setup_ports.py', '-s'], - cwd=trex_dir, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - devices, _ = process.communicate() + with subprocess.Popen(['python', 'dpdk_setup_ports.py', '-s'], + cwd=trex_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) as process: + devices, _ = process.communicate() except Exception: devices = '' for driver in ['i40e', 'ixgbe']: - matches = re.findall(regex.format(hx=hx, driver=driver), devices) + matches = re.findall(regex.format(hx=hx, driver=driver), devices.decode("utf-8")) if not matches: continue matches.sort() + device_list = list(x[0].split('.')[0] for x in matches) + device_ports_list = {i: {'ports': device_list.count(i)} for i in device_list} for port in matches: intf_name = glob.glob("/sys/bus/pci/devices/%s/net/*" % port[0]) - if not intf_name: - # Interface is not bind to kernel driver, so take it - pcis.append(port[1]) - else: + if intf_name: intf_name = intf_name[0][intf_name[0].rfind('/') + 1:] - process = subprocess.Popen(['ip', '-o', '-d', 'link', 'show', intf_name], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - intf_info, _ = process.communicate() - if not re.search('team_slave|bond_slave', intf_info): - pcis.append(port[1]) - + with subprocess.Popen(['ip', '-o', '-d', 'link', 'show', intf_name], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) as process: + intf_info, _ = process.communicate() + if re.search('team_slave|bond_slave', intf_info.decode("utf-8")): + device_ports_list[port[0].split('.')[0]]['busy'] = True + for port in matches: + if not device_ports_list[port[0].split('.')[0]].get('busy'): + pcis.append(port[1]) if len(pcis) == 2: break return pcis -multiplier_map = { - 'K': 1000, - 'M': 1000000, - 'G': 1000000000 -} - - def parse_flow_count(flow_count): flow_count = str(flow_count) input_fc = flow_count @@ -187,13 +173,14 @@ def parse_flow_count(flow_count): try: flow_count = int(flow_count) except ValueError: - raise Exception("Unknown flow count format '{}'".format(input_fc)) + raise Exception("Unknown flow count format '{}'".format(input_fc)) from ValueError return flow_count * multiplier def cast_integer(value): - return int(value) if not isnan(value) else value + # force 0 value if NaN value from TRex to avoid error in JSON result parsing + return int(value) if not isnan(value) else 0 class RunLock(object): @@ -210,8 +197,8 @@ class RunLock(object): try: self._fd = os.open(self._path, os.O_CREAT) fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except (OSError, IOError): - raise Exception('Other NFVbench process is running. Please wait') + except (OSError, IOError) as e: + raise Exception('Other NFVbench process is running. Please wait') from e def __exit__(self, *args): fcntl.flock(self._fd, fcntl.LOCK_UN) @@ -223,3 +210,82 @@ class RunLock(object): os.unlink(self._path) except (OSError, IOError): pass + + +def get_divisors(n): + for i in range(1, int(n / 2) + 1): + if n % i == 0: + yield i + yield n + + +def lcm(a, b): + """ + Calculate the maximum possible value for both IP and ports, + eventually for maximum possible flow. + """ + if a != 0 and b != 0: + lcm_value = a * b // gcd(a, b) + return lcm_value + raise TypeError(" IP size or port range can't be zero !") + + +def find_tuples_equal_to_lcm_value(a, b, lcm_value): + """ + Find numbers from two list matching a LCM value. + """ + for x in a: + for y in b: + if lcm(x, y) == lcm_value: + yield (x, y) + + +def find_max_size(max_size, tuples, flow): + if tuples: + if max_size > tuples[-1][0]: + max_size = tuples[-1][0] + return int(max_size) + if max_size > tuples[-1][1]: + max_size = tuples[-1][1] + return int(max_size) + + for i in range(max_size, 1, -1): + if flow % i == 0: + return int(i) + return 1 + + +def delete_server(nova_client, server): + try: + LOG.info('Deleting instance %s...', server.name) + nova_client.servers.delete(server.id) + except Exception: + LOG.exception("Instance %s deletion failed", server.name) + + +def instance_exists(nova_client, server): + try: + nova_client.servers.get(server.id) + except NotFound: + return False + return True + + +def waiting_servers_deletion(nova_client, servers): + LOG.info(' Waiting for %d instances to be fully deleted...', len(servers)) + retry_count = 15 + len(servers) * 5 + while True: + retry_count -= 1 + servers = [server for server in servers if instance_exists(nova_client, server)] + if not servers: + break + + if retry_count: + LOG.info(' %d yet to be deleted by Nova, retries left=%d...', + len(servers), retry_count) + time.sleep(2) + else: + LOG.warning( + ' instance deletion verification time-out: %d still not deleted', + len(servers)) + break |