From 391dcf76fefb747888a3411ae3b8df7b1ad26685 Mon Sep 17 00:00:00 2001 From: ahothan Date: Sun, 7 Oct 2018 15:55:25 -0700 Subject: 2.0 beta NFVBENCH-91 Allow multi-chaining with separate edge networks Includes complete refactoring of code Beta for NFVbench 2.0 Change-Id: I2997f0fb7722d5ac626cd11a68692ae458c7676e Signed-off-by: ahothan --- nfvbench/cfg.default.yaml | 194 ++++--- nfvbench/chain_clients.py | 633 ---------------------- nfvbench/chain_managers.py | 256 --------- nfvbench/chain_runner.py | 178 ++++-- nfvbench/chain_workers.py | 46 +- nfvbench/chaining.py | 988 ++++++++++++++++++++++++++++++++++ nfvbench/cleanup.py | 25 +- nfvbench/compute.py | 245 +-------- nfvbench/config_plugin.py | 35 +- nfvbench/factory.py | 46 +- nfvbench/network.py | 91 ---- nfvbench/nfvbench.py | 180 +++---- nfvbench/packet_analyzer.py | 64 --- nfvbench/packet_stats.py | 309 +++++++++++ nfvbench/service_chain.py | 148 ----- nfvbench/specs.py | 18 +- nfvbench/stats_manager.py | 101 ++++ nfvbench/summarizer.py | 152 ++++-- nfvbench/tor_client.py | 52 -- nfvbench/traffic_client.py | 574 +++++++++++--------- nfvbench/traffic_gen/dummy.py | 66 ++- nfvbench/traffic_gen/traffic_base.py | 89 ++- nfvbench/traffic_gen/traffic_utils.py | 11 + nfvbench/traffic_gen/trex.py | 428 +++++++++++---- nfvbench/traffic_server.py | 16 +- 25 files changed, 2640 insertions(+), 2305 deletions(-) delete mode 100644 nfvbench/chain_clients.py delete mode 100644 nfvbench/chain_managers.py create mode 100644 nfvbench/chaining.py delete mode 100644 nfvbench/network.py delete mode 100644 nfvbench/packet_analyzer.py create mode 100644 nfvbench/packet_stats.py delete mode 100644 nfvbench/service_chain.py create mode 100644 nfvbench/stats_manager.py delete mode 100644 nfvbench/tor_client.py (limited to 'nfvbench') diff --git a/nfvbench/cfg.default.yaml b/nfvbench/cfg.default.yaml index 2af6d63..fdf40c6 100755 --- a/nfvbench/cfg.default.yaml +++ b/nfvbench/cfg.default.yaml @@ -18,7 +18,8 @@ # Fields that can be over-ridden at the command line are marked with the corresponding # option, e.g. "--interval" -# The OpenStack openrc file to use (must be a valid full pathname). If running + +# The OpenStack openrc file to use - must be a valid full pathname. If running # in a container, this path must be valid in the container. # # The only case where this field can be empty is when measuring a system that does not run @@ -29,7 +30,7 @@ openrc_file: # Forwarder to use in nfvbenchvm image. Available options: ['vpp', 'testpmd'] vm_forwarder: testpmd -# By default (empty) NFVBench will try to locate a VM image file +# By default (empty) NFVbench will try to locate a VM image file # from the package root directory named "nfvbench-.qcow2" and # upload that file. The image name will be "nfvbench-" # This can be overridden by specifying here a pathname of a file @@ -67,39 +68,10 @@ flavor: # If the selected zone contains only 1 compute node and PVVP inter-node flow is selected, # application will use intra-node PVVP flow. # List of compute nodes can be specified, must be in given availability zone if not empty -#availability_zone: 'nova' +# availability_zone: 'nova' availability_zone: compute_nodes: - -# Credentials for SSH connection to TOR switches. -tor: - # Leave type empty or switch list empty to skip TOR switches configuration. - # Preferably use 'no_tor_access' to achieve the same behavior. - # (skipping TOR config will require the user to pre-stitch the traffic generator interfaces - # to the service chain under test, needed only if configured in access mode) - type: - # Switches are only needed if type is not empty. - # You can configure 0, 1 or 2 switches - # no switch: in this case NFVbench will not attempt to ssh to the switch - # and stitching of traffic must be done externally - # 1 switch: this assumes that both traffic generator interfaces are wired to the same switch - # 2 switches: this is the recommended setting wuth redundant switches, in this case each - # traffic generator interface must be wired to a different switch - switches: - - host: - username: - password: - port: - -# Skip TOR switch configuration and retrieving of stats -# Can be overriden by --no-tor-access -no_tor_access: false - -# Skip vswitch configuration and retrieving of stats -# Can be overriden by --no-vswitch-access -no_vswitch_access: false - # Type of service chain to run, possible options are PVP, PVVP and EXT # PVP - port to VM to port # PVVP - port to VM to VM to port @@ -112,6 +84,9 @@ service_chain: 'PVP' # Can be overriden by --service-chain-count service_chain_count: 1 +# Specifies if all chains share the same right/left/middle networks +service_chain_shared_net: false + # Total number of traffic flows for all chains and directions generated by the traffic generator. # Minimum is '2 * service_chain_count', it is automatically adjusted if too small # value was configured. Must be even. @@ -119,20 +94,13 @@ service_chain_count: 1 # Can be overriden by --flow-count flow_count: 10000 -# Used by PVVP chain to spawn VMs on different compute nodes -# Can be overriden by --inter-node -inter_node: false - # set to true if service chains should use SRIOV # This requires SRIOV to be available on compute nodes sriov: false -# Skip interfaces config on EXT service chain -# Can be overriden by --no-int-config -no_int_config: false - # Perform port to port loopback (direct or through switch) # Should be used with EXT service chain and no ARP (no_arp: true) +# When enabled, the vlans property must contain the same VLAN id for all chains. # Can be overriden by --l2-loopback l2_loopback: false @@ -151,54 +119,94 @@ traffic_generator: default_profile: trex-local # IP addresses for L3 traffic. + # This section describes the addresses to use to fill in the UDP packets sent by the + # traffic generator. If you VNFs are L2 forwarders, these fields below do not need to change. + # If your VNFs are L3 routers, the fields below must match the static routes in your VNFs + # so that UDP packets can be routed back to the peer port of the traffic generator. + # All of the IPs are used as base for IP sequence computed based on chain or flow count. + # (sim-devices-left)---(tg-gateway-left)---(vnf-left)- ... + # -(vnf-right)---(tg-gateway-right)---(sim-devices-right) # # `ip_addrs` base IPs used as src and dst in packet header, quantity depends on flow count + # these are used for addressing virtual devices simulated by the traffic generator + # and be a different subnet than tg_gateway_ip_addrs and gateway_ip_addrs # `ip_addrs_step`: step for generating IP sequence. Use "random" for random patterns, default is 0.0.0.1. - # `tg_gateway_ip_addrs` base IPs for traffic generator ports, quantity depends on chain count + ip_addrs: ['10.0.0.0/8', '20.0.0.0/8'] + ip_addrs_step: 0.0.0.1 + # `tg_gateway_ip_addrs` base IP for traffic generator ports in the left and right networks to the VNFs + # chain count consecutive IP addresses spaced by tg_gateway_ip_addrs_step will be used # `tg_gateway_ip_addrs__step`: step for generating traffic generator gateway sequences. default is 0.0.0.1 - # `gateway_ip_addrs`: base IPs of router gateways on both networks, quantity depends on chain count + tg_gateway_ip_addrs: ['1.1.0.100', '2.2.0.100'] + tg_gateway_ip_addrs_step: 0.0.0.1 + # `gateway_ip_addrs`: base IPs of VNF router gateways (left and right), quantity used depends on chain count + # must correspond to the public IP on the left and right networks + # for each left-most and right-most VNF of every chain. + # must be the same subnet but not same IP as tg_gateway_ip_addrs. + # chain count consecutive IP addresses spaced by gateway_ip_addrs_step will be used # `gateway_ip_addrs_step`: step for generating router gateway sequences. default is 0.0.0.1 + gateway_ip_addrs: ['1.1.0.2', '2.2.0.2'] + gateway_ip_addrs_step: 0.0.0.1 # `udp_src_port`: the source port for sending UDP traffic, default is picked by TRex (53) # `udp_dst_port`: the destination port for sending UDP traffic, default is picked by TRex (53) - # `mac_addrs_left` & `mac_addrs_right`: Lists of MAC addresses corresponding to the number of chains - # specified for `service_chain_count`. + udp_src_port: + udp_dst_port: + + # L2 ADDRESSING OF UDP PACKETS + # Lists of dest MAC addresses to use on each traffic generator port (one dest MAC per chain) + # Leave empty for PVP, PVVP, EXT with ARP + # Only used when `service_chain` is EXT and `no_arp` is true. # - If both lists are empty the far end MAC of the traffic generator will be used for left and right - # - The MAC addresses will only be used when `service_chain` is EXT and `no_arp` is true. - # - The length of each list must match the number of chains being used. + # (this is typicaly used to loop back on the first hop switch or using a loopback cable) + # - The length of each list must match the number of chains being used! # - The index of each list must correspond to the chain index to ensure proper pairing. # - Below is an example of using two chains: # - mac_addrs_left: ['00:00:00:00:01:00', '00:00:00:00:02:00'] # - mac_addrs_right: ['00:00:00:00:01:01', '00:00:00:00:02:01'] - ip_addrs: ['10.0.0.0/8', '20.0.0.0/8'] - ip_addrs_step: 0.0.0.1 - tg_gateway_ip_addrs: ['1.1.0.100', '2.2.0.100'] - tg_gateway_ip_addrs_step: 0.0.0.1 - gateway_ip_addrs: ['1.1.0.2', '2.2.0.2'] - gateway_ip_addrs_step: 0.0.0.1 - udp_src_port: - udp_dst_port: + # UDP packets sent on port 0 will use dest MAC '00:00:00:00:01:00' for chain #0 and + # dest MAC '00:00:00:00:02:00' for chain #1 + # UDP packets sent on port 1 will use dest MAC '00:00:00:00:01:01' for chain #0 and + # dest MAC '00:00:00:00:02:01' for chain #1 + # It is expected that the looping device (L2 forwarder) will rewrite the src and dst MAC + # of the looping UDP packet so that it can reach back to the peer port of the traffic + # generator. + # mac_addrs_left: mac_addrs_right: # Traffic Generator Profiles # In case you have multiple testbeds or traffic generators, # you can define one traffic generator profile per testbed/traffic generator. + # In most cases you only need to fill in the pci address for the 2 ports used by the + # traffic generator and leave all other fields unchanged # # Generator profiles are listed in the following format: # `name`: Traffic generator profile name (use a unique name, no space or special character) + # DFo not change this field # `tool`: Traffic generator tool to be used (currently supported is `TRex`). + # Do not change this field # `ip`: IP address of the traffic generator. - # `cores`: Specify the number of cores for TRex traffic generator. ONLY applies to trex-local. + # The default loopback address is used when the traffic generator runs on the same host + # as NFVbench. + # `cores`: Specify the number of cores for running the TRex traffic generator. + # ONLY applies to trex-local. # `software_mode`: Advice TRex to use software mode which provides the best compability. But # note that TRex will not use any hardware acceleration technology under # software mode, therefore the performance of TRex will be significantly # lower. ONLY applies to trex-local. + # Recommended to leave the default value (false) # `interfaces`: Configuration of traffic generator interfaces. # `interfaces.port`: The port of the traffic generator to be used (leave as 0 and 1 resp.) - # `interfaces.switch_port`: Leave empty (reserved for advanced use cases) + # `interfaces.switch_port`: Leave empty (deprecated) # `interfaces.pci`: The PCI address of the intel NIC interface associated to this port + # This field is required and cannot be empty + # Use lspci to list the PCI address of all devices + # Example of value: "0000:5e:00.0" # `intf_speed`: The speed of the interfaces used by the traffic generator (per direction). + # Empty value (default) to use the speed discovered by the traffic generator. + # Recommended to leave this field empty. + # Do not use unless you want to override the speed discovered by the + # traffic generator. Expected format: 10Gbps # generator_profile: - name: trex-local @@ -208,12 +216,12 @@ traffic_generator: software_mode: false interfaces: - port: 0 - switch_port: pci: - - port: 1 switch_port: + - port: 1 pci: - intf_speed: 10Gbps + switch_port: + intf_speed: # ----------------------------------------------------------------------------- # These variables are not likely to be changed @@ -257,22 +265,22 @@ loop_vm_name: 'nfvbench-loop-vm' internal_networks: left: - name: 'nfvbench-net0' - subnet: 'nfvbench-subnet0' + name: 'nfvbench-lnet' + subnet: 'nfvbench-lsubnet' cidr: '192.168.1.0/24' network_type: 'vlan' segmentation_id: physical_network: right: - name: 'nfvbench-net1' - subnet: 'nfvbench-subnet1' + name: 'nfvbench-rnet' + subnet: 'nfvbench-rsubnet' cidr: '192.168.2.0/24' network_type: 'vlan' segmentation_id: physical_network: middle: - name: 'nfvbench-net2' - subnet: 'nfvbench-subnet2' + name: 'nfvbench-mnet' + subnet: 'nfvbench-msubnet' cidr: '192.168.3.0/24' network_type: 'vlan' segmentation_id: @@ -283,25 +291,45 @@ internal_networks: # SRIOV can be used by toggling below setting. use_sriov_middle_net: false -# EXT chain only. Names of edge networks which will be used to send traffic via traffic generator. +# EXT chain only. Prefix names of edge networks which will be used to send traffic via traffic generator. +# +# If service_chain_shared_net is true, the left and right networks must pre-exist and match exactly by name. +# +# If service_chain_shared_net is false, each chain must have its own pre-existing left and right networks. +# An index will be appended to each network name to form the final name: +# ext-lnet0 ext-rnet0 for chain #0 +# ext-lnet1 ext-rnet1 for chain #1 +# etc... external_networks: - left: 'nfvbench-net0' - right: 'nfvbench-net1' + left: 'ext-lnet' + right: 'ext-rnet' # Use 'true' to enable VLAN tagging of packets generated and sent by the traffic generator -# Leave empty you do not want the traffic generator to insert the VLAN tag. This is -# needed for example if VLAN tagging is enabled on switch (trunk mode) or if you want to hook directly to a NIC -# By default is set to true (which is the nominal use case with TOR and trunk mode to Trex) +# Leave empty or set to false if you do not want the traffic generator to insert the VLAN tag (this is +# needed for example if VLAN tagging is enabled on switch (access mode) or if you want to hook +# directly to a NIC). +# By default is set to true (which is the nominal use case with TOR and trunk mode to Trex ports) vlan_tagging: true -# Specify only when you want to override VLAN IDs used for tagging with own values (exactly 2). -# Default behavior (empty list) is to retrieve VLAN IDs from OpenStack networks described in external_networks. -# This property is ignored in the case of l2-loopback -# Example: [1998, 1999] +# Used only in the case of EXT chain and no openstack to specify the VLAN IDs to use. +# This property is ignored when OpenStakc is used or in the case of l2-loopback. +# If OpenStack is used leave the list empty, VLAN IDs are retrieved from OpenStack networks using Neutron API. +# If networks are shared across all chains (service_chain_shared_net=true), the list should have exactly 2 values +# If networks are not shared across chains (service_chain_shared_net=false), the list should have +# 2 list of vlan IDs +# In the special case of l2-loopback the list should have the same VLAN id for all chains +# Examples: +# [1998, 1999] left network uses vlan 1998 right network uses vlan 1999 +# [[1,2],[3,4]] chain 0 left vlan 1, right vlan 2 - chain 1 left vlan 3 right vlan 4 +# [1010, 1010] same VLAN id with l2-loopback enabled +# vlans: [] -# Used only with EXT chain. MAC addresses of traffic generator ports are used as destination -# if 'no_arp' is set to 'true'. Otherwise ARP requests are sent to find out destination MAC addresses. +# ARP is used to discover the MAC address of VNFs that run L3 routing. +# Used only with EXT chain. +# False (default): ARP requests are sent to find out dest MAC addresses. +# True: do not send ARP but use provisioned dest macs instead +# (see mac_addrs_left and mac_addrs_right) no_arp: false # Traffic Profiles @@ -330,10 +358,6 @@ traffic: # Can be overriden by --no-traffic no_traffic: false -# Do not reset tx/rx counters prior to running -# Can be overriden by --no-reset -no_reset: false - # Test configuration # The rate pps for traffic going in reverse direction in case of unidirectional flow. Default to 1. @@ -434,3 +458,11 @@ factory_class: 'BasicFactory' # Custom label added for every perf record generated during this run. # Can be overriden by --user-label user_label: + + +# THESE FIELDS SHOULD BE USED VERY RARELY + +# Skip vswitch configuration and retrieving of stats +# Can be overriden by --no-vswitch-access +# Should be left to the default value (false) +no_vswitch_access: false diff --git a/nfvbench/chain_clients.py b/nfvbench/chain_clients.py deleted file mode 100644 index 71c6c97..0000000 --- a/nfvbench/chain_clients.py +++ /dev/null @@ -1,633 +0,0 @@ -#!/usr/bin/env python -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import os -import re -import time - -from glanceclient.v2 import client as glanceclient -from neutronclient.neutron import client as neutronclient -from novaclient.client import Client - -import compute -from log import LOG - -class StageClientException(Exception): - pass - - -class BasicStageClient(object): - """Client for spawning and accessing the VM setup""" - - nfvbenchvm_config_name = 'nfvbenchvm.conf' - - def __init__(self, config, cred): - self.comp = None - self.image_instance = None - self.image_name = None - self.config = config - self.cred = cred - self.nets = [] - self.vms = [] - self.created_ports = [] - self.ports = {} - self.compute_nodes = set([]) - self.comp = None - self.neutron = None - self.flavor_type = {'is_reuse': True, 'flavor': None} - self.host_ips = None - - def _ensure_vms_active(self): - retry_count = (self.config.check_traffic_time_sec + - self.config.generic_poll_sec - 1) / self.config.generic_poll_sec - for _ in range(retry_count): - for i, instance in enumerate(self.vms): - if instance.status == 'ACTIVE': - continue - is_reuse = getattr(instance, 'is_reuse', True) - instance = self.comp.poll_server(instance) - if instance.status == 'ERROR': - raise StageClientException('Instance creation error: %s' % - instance.fault['message']) - if instance.status == 'ACTIVE': - LOG.info('Created instance: %s', instance.name) - self.vms[i] = instance - setattr(self.vms[i], 'is_reuse', is_reuse) - - if all([(vm.status == 'ACTIVE') for vm in self.vms]): - return - time.sleep(self.config.generic_poll_sec) - raise StageClientException('Timed out waiting for VMs to spawn') - - def _setup_openstack_clients(self): - self.session = self.cred.get_session() - nova_client = Client(2, session=self.session) - self.neutron = neutronclient.Client('2.0', session=self.session) - self.glance_client = glanceclient.Client('2', - session=self.session) - self.comp = compute.Compute(nova_client, self.glance_client, self.neutron, self.config) - - def _lookup_network(self, network_name): - networks = self.neutron.list_networks(name=network_name) - return networks['networks'][0] if networks['networks'] else None - - def _create_net(self, name, subnet, cidr, network_type=None, - segmentation_id=None, physical_network=None): - network = self._lookup_network(name) - if network: - # a network of same name already exists, we need to verify it has the same - # characteristics - if segmentation_id: - if network['provider:segmentation_id'] != segmentation_id: - raise StageClientException("Mismatch of 'segmentation_id' for reused " - "network '{net}'. Network has id '{seg_id1}', " - "configuration requires '{seg_id2}'." - .format(net=name, - seg_id1=network['provider:segmentation_id'], - seg_id2=segmentation_id)) - - if physical_network: - if network['provider:physical_network'] != physical_network: - raise StageClientException("Mismatch of 'physical_network' for reused " - "network '{net}'. Network has '{phys1}', " - "configuration requires '{phys2}'." - .format(net=name, - phys1=network['provider:physical_network'], - phys2=physical_network)) - - LOG.info('Reusing existing network: %s', name) - network['is_reuse'] = True - return network - - body = { - 'network': { - 'name': name, - 'admin_state_up': True - } - } - - if network_type: - body['network']['provider:network_type'] = network_type - if segmentation_id: - body['network']['provider:segmentation_id'] = segmentation_id - if physical_network: - body['network']['provider:physical_network'] = physical_network - - network = self.neutron.create_network(body)['network'] - body = { - 'subnet': { - 'name': subnet, - 'cidr': cidr, - 'network_id': network['id'], - 'enable_dhcp': False, - 'ip_version': 4, - 'dns_nameservers': [] - } - } - subnet = self.neutron.create_subnet(body)['subnet'] - # add subnet id to the network dict since it has just been added - network['subnets'] = [subnet['id']] - network['is_reuse'] = False - LOG.info('Created network: %s.', name) - return network - - def _create_port(self, net, vnic_type='normal'): - body = { - "port": { - 'network_id': net['id'], - 'binding:vnic_type': vnic_type - } - } - port = self.neutron.create_port(body) - return port['port'] - - def __delete_port(self, port): - retry = 0 - while retry < self.config.generic_retry_count: - try: - self.neutron.delete_port(port['id']) - return - except Exception: - retry += 1 - time.sleep(self.config.generic_poll_sec) - LOG.error('Unable to delete port: %s', port['id']) - - def __delete_net(self, network): - retry = 0 - while retry < self.config.generic_retry_count: - try: - self.neutron.delete_network(network['id']) - return - except Exception: - retry += 1 - time.sleep(self.config.generic_poll_sec) - LOG.error('Unable to delete network: %s', network['name']) - - def __get_server_az(self, server): - availability_zone = getattr(server, 'OS-EXT-AZ:availability_zone', None) - host = getattr(server, 'OS-EXT-SRV-ATTR:host', None) - if availability_zone is None: - return None - if host is None: - return None - return availability_zone + ':' + host - - def _lookup_servers(self, name=None, nets=None, flavor_id=None): - error_msg = 'VM with the same name, but non-matching {} found. Aborting.' - networks = set([net['name'] for net in nets]) if nets else None - server_list = self.comp.get_server_list() - matching_servers = [] - - for server in server_list: - if name and server.name != name: - continue - - if flavor_id and server.flavor['id'] != flavor_id: - raise StageClientException(error_msg.format('flavors')) - - if networks and not set(server.networks.keys()).issuperset(networks): - raise StageClientException(error_msg.format('networks')) - - if server.status != "ACTIVE": - raise StageClientException(error_msg.format('state')) - - # everything matches - matching_servers.append(server) - - return matching_servers - - def _create_server(self, name, ports, az, nfvbenchvm_config): - port_ids = [{'port-id': port['id']} for port in ports] - nfvbenchvm_config_location = os.path.join('/etc/', self.nfvbenchvm_config_name) - server = self.comp.create_server(name, - self.image_instance, - self.flavor_type['flavor'], - None, - port_ids, - None, - avail_zone=az, - user_data=None, - config_drive=True, - files={nfvbenchvm_config_location: nfvbenchvm_config}) - if server: - setattr(server, 'is_reuse', False) - msg = 'Creating instance: %s' % name - if az: - msg += ' on %s' % az - LOG.info(msg) - else: - raise StageClientException('Unable to create instance: %s.' % (name)) - return server - - def _setup_resources(self): - # To avoid reuploading image in server mode, check whether image_name is set or not - if self.image_name: - self.image_instance = self.comp.find_image(self.image_name) - if self.image_instance: - LOG.info("Reusing image %s", self.image_name) - else: - image_name_search_pattern = r'(nfvbenchvm-\d+(\.\d+)*).qcow2' - if self.config.vm_image_file: - match = re.search(image_name_search_pattern, self.config.vm_image_file) - if match: - self.image_name = match.group(1) - LOG.info('Using provided VM image file %s', self.config.vm_image_file) - else: - raise StageClientException('Provided VM image file name %s must start with ' - '"nfvbenchvm-"' % self.config.vm_image_file) - else: - pkg_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - for f in os.listdir(pkg_root): - if re.search(image_name_search_pattern, f): - self.config.vm_image_file = pkg_root + '/' + f - self.image_name = f.replace('.qcow2', '') - LOG.info('Found built-in VM image file %s', f) - break - else: - raise StageClientException('Cannot find any built-in VM image file.') - if self.image_name: - self.image_instance = self.comp.find_image(self.image_name) - if not self.image_instance: - LOG.info('Uploading %s', self.image_name) - res = self.comp.upload_image_via_url(self.image_name, - self.config.vm_image_file) - - if not res: - raise StageClientException('Error uploading image %s from %s. ABORTING.' - % (self.image_name, - self.config.vm_image_file)) - LOG.info('Image %s successfully uploaded.', self.image_name) - self.image_instance = self.comp.find_image(self.image_name) - - self.__setup_flavor() - - def __setup_flavor(self): - if self.flavor_type.get('flavor', False): - return - - self.flavor_type['flavor'] = self.comp.find_flavor(self.config.flavor_type) - if self.flavor_type['flavor']: - self.flavor_type['is_reuse'] = True - else: - flavor_dict = self.config.flavor - extra_specs = flavor_dict.pop('extra_specs', None) - - self.flavor_type['flavor'] = self.comp.create_flavor(self.config.flavor_type, - override=True, - **flavor_dict) - - LOG.info("Flavor '%s' was created.", self.config.flavor_type) - - if extra_specs: - self.flavor_type['flavor'].set_keys(extra_specs) - - self.flavor_type['is_reuse'] = False - - if self.flavor_type['flavor'] is None: - raise StageClientException('%s: flavor to launch VM not found. ABORTING.' - % self.config.flavor_type) - - def __delete_flavor(self, flavor): - if self.comp.delete_flavor(flavor=flavor): - LOG.info("Flavor '%s' deleted", self.config.flavor_type) - self.flavor_type = {'is_reuse': False, 'flavor': None} - else: - LOG.error('Unable to delete flavor: %s', self.config.flavor_type) - - def get_config_file(self, chain_index, src_mac, dst_mac, intf_mac1, intf_mac2): - boot_script_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), - 'nfvbenchvm/', self.nfvbenchvm_config_name) - - with open(boot_script_file, 'r') as boot_script: - content = boot_script.read() - - g1cidr = self.config.generator_config.src_device.get_gw_ip(chain_index) + '/8' - g2cidr = self.config.generator_config.dst_device.get_gw_ip(chain_index) + '/8' - - vm_config = { - 'forwarder': self.config.vm_forwarder, - 'intf_mac1': intf_mac1, - 'intf_mac2': intf_mac2, - 'tg_gateway1_ip': self.config.traffic_generator.tg_gateway_ip_addrs[0], - 'tg_gateway2_ip': self.config.traffic_generator.tg_gateway_ip_addrs[1], - 'tg_net1': self.config.traffic_generator.ip_addrs[0], - 'tg_net2': self.config.traffic_generator.ip_addrs[1], - 'vnf_gateway1_cidr': g1cidr, - 'vnf_gateway2_cidr': g2cidr, - 'tg_mac1': src_mac, - 'tg_mac2': dst_mac - } - - return content.format(**vm_config) - - def set_ports(self): - """Stores all ports of NFVbench networks.""" - nets = self.get_networks_uuids() - for port in self.neutron.list_ports()['ports']: - if port['network_id'] in nets: - ports = self.ports.setdefault(port['network_id'], []) - ports.append(port) - - def disable_port_security(self): - """ - Disable security at port level. - """ - vm_ids = [vm.id for vm in self.vms] - for net in self.nets: - for port in self.ports[net['id']]: - if port['device_id'] in vm_ids: - try: - self.neutron.update_port(port['id'], { - 'port': { - 'security_groups': [], - 'port_security_enabled': False, - } - }) - LOG.info('Security disabled on port %s', port['id']) - except Exception: - LOG.warning('Failed to disable port security on port %s, ignoring...', - port['id']) - - - def get_loop_vm_hostnames(self): - return [getattr(vm, 'OS-EXT-SRV-ATTR:hypervisor_hostname') for vm in self.vms] - - def get_host_ips(self): - '''Return the IP adresss(es) of the host compute nodes for this VMclient instance. - Returns a list of 1 IP adress or 2 IP addresses (PVVP inter-node) - ''' - if not self.host_ips: - # get the hypervisor object from the host name - self.host_ips = [self.comp.get_hypervisor( - getattr(vm, 'OS-EXT-SRV-ATTR:hypervisor_hostname')).host_ip for vm in self.vms] - return self.host_ips - - def get_loop_vm_compute_nodes(self): - compute_nodes = [] - for vm in self.vms: - az = getattr(vm, 'OS-EXT-AZ:availability_zone') - hostname = getattr(vm, 'OS-EXT-SRV-ATTR:hypervisor_hostname') - compute_nodes.append(az + ':' + hostname) - return compute_nodes - - def get_reusable_vm(self, name, nets): - servers = self._lookup_servers(name=name, nets=nets, - flavor_id=self.flavor_type['flavor'].id) - if servers: - server = servers[0] - LOG.info('Reusing existing server: %s', name) - setattr(server, 'is_reuse', True) - return server - return None - - def get_networks_uuids(self): - """ - Extract UUID of used networks. Order is important. - - :return: list of UUIDs of created networks - """ - return [net['id'] for net in self.nets] - - def get_vlans(self): - """ - Extract vlans of used networks. Order is important. - - :return: list of UUIDs of created networks - """ - vlans = [] - for net in self.nets: - assert net['provider:network_type'] == 'vlan' - vlans.append(net['provider:segmentation_id']) - - return vlans - - def setup(self): - """ - Creates two networks and spawn a VM which act as a loop VM connected - with the two networks. - """ - if self.cred: - self._setup_openstack_clients() - - def dispose(self, only_vm=False): - """ - Deletes the created two networks and the VM. - """ - for vm in self.vms: - if vm: - if not getattr(vm, 'is_reuse', True): - self.comp.delete_server(vm) - else: - LOG.info('Server %s not removed since it is reused', vm.name) - - for port in self.created_ports: - self.__delete_port(port) - - if not only_vm: - for net in self.nets: - if 'is_reuse' in net and not net['is_reuse']: - self.__delete_net(net) - else: - LOG.info('Network %s not removed since it is reused', net['name']) - - if not self.flavor_type['is_reuse']: - self.__delete_flavor(self.flavor_type['flavor']) - - -class EXTStageClient(BasicStageClient): - def setup(self): - super(EXTStageClient, self).setup() - - # Lookup two existing networks - if self.cred: - for net_name in [self.config.external_networks.left, - self.config.external_networks.right]: - net = self._lookup_network(net_name) - if net: - self.nets.append(net) - else: - raise StageClientException('Existing network {} cannot be found.'. - format(net_name)) - - -class PVPStageClient(BasicStageClient): - def get_end_port_macs(self): - vm_ids = [vm.id for vm in self.vms] - port_macs = [] - for _index, net in enumerate(self.nets): - vm_mac_map = {port['device_id']: port['mac_address'] for port in self.ports[net['id']]} - port_macs.append([vm_mac_map[vm_id] for vm_id in vm_ids]) - return port_macs - - def setup(self): - super(PVPStageClient, self).setup() - self._setup_resources() - - # Create two networks - nets = self.config.internal_networks - self.nets.extend([self._create_net(**n) for n in [nets.left, nets.right]]) - - if self.comp.config.compute_nodes: - az_list = self.comp.get_enabled_az_host_list(required_count=1) - if not az_list: - raise Exception('Not enough hosts found.') - az = az_list[0] - else: - az = None - - for chain_index in xrange(self.config.service_chain_count): - name = self.config.loop_vm_name + str(chain_index) - server = self.get_reusable_vm(name, self.nets) - if server: - self.vms.append(server) - else: - vnic_type = 'direct' if self.config.sriov else 'normal' - ports = [self._create_port(net, vnic_type) for net in self.nets] - config_file = self.get_config_file(chain_index, - self.config.generator_config.src_device.mac, - self.config.generator_config.dst_device.mac, - ports[0]['mac_address'], - ports[1]['mac_address']) - self.created_ports.extend(ports) - server = self._create_server(name, ports, az, config_file) - self.vms.append(server) - - if chain_index == 0: - # First VM, save the hypervisor name. Used in future for - # maintain affinity. - self._ensure_vms_active() - server = self.comp.poll_server(server) - az = "%s:%s" % (getattr(server, 'OS-EXT-AZ:availability_zone'), - getattr(server, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) - - self._ensure_vms_active() - self.compute_nodes = set(self.get_loop_vm_compute_nodes()) - self.set_ports() - - -class PVVPStageClient(BasicStageClient): - def get_end_port_macs(self): - port_macs = [] - for index, net in enumerate(self.nets[:2]): - vm_ids = [vm.id for vm in self.vms[index::2]] - vm_mac_map = {port['device_id']: port['mac_address'] for port in self.ports[net['id']]} - port_macs.append([vm_mac_map[vm_id] for vm_id in vm_ids]) - return port_macs - - def setup(self): - super(PVVPStageClient, self).setup() - self._setup_resources() - - # Create two networks - nets = self.config.internal_networks - self.nets.extend([self._create_net(**n) for n in [nets.left, nets.right, nets.middle]]) - - if self.comp.config.compute_nodes: - required_count = 2 if self.config.inter_node else 1 - az_list = self.comp.get_enabled_az_host_list(required_count=required_count) - if not az_list: - raise Exception('Not enough hosts found.') - - az1 = az2 = az_list[0] - if self.config.inter_node: - if len(az_list) > 1: - az1 = az_list[0] - az2 = az_list[1] - else: - # fallback to intra-node - az1 = az2 = az_list[0] - self.config.inter_node = False - LOG.info('Using intra-node instead of inter-node.') - else: - az1 = az2 = None - - # Create loop VMs - for chain_index in xrange(self.config.service_chain_count): - name0 = self.config.loop_vm_name + str(chain_index) + 'a' - # Attach first VM to net0 and net2 - vm0_nets = self.nets[0::2] - reusable_vm0 = self.get_reusable_vm(name0, vm0_nets) - - name1 = self.config.loop_vm_name + str(chain_index) + 'b' - # Attach second VM to net1 and net2 - vm1_nets = self.nets[1:] - reusable_vm1 = self.get_reusable_vm(name1, vm1_nets) - - if reusable_vm0 and reusable_vm1: - self.vms.extend([reusable_vm0, reusable_vm1]) - else: - edge_vnic_type = 'direct' if self.config.sriov else 'normal' - middle_vnic_type = 'direct' \ - if self.config.sriov and self.config.use_sriov_middle_net \ - else 'normal' - vm0_port_net0 = self._create_port(vm0_nets[0], edge_vnic_type) - vm0_port_net2 = self._create_port(vm0_nets[1], middle_vnic_type) - - vm1_port_net2 = self._create_port(vm1_nets[1], middle_vnic_type) - vm1_port_net1 = self._create_port(vm1_nets[0], edge_vnic_type) - - self.created_ports.extend([vm0_port_net0, - vm0_port_net2, - vm1_port_net2, - vm1_port_net1]) - - # order of ports is important for sections below - # order of MAC addresses needs to follow order of interfaces - # TG0 (net0) -> VM0 (net2) -> VM1 (net2) -> TG1 (net1) - config_file0 = self.get_config_file(chain_index, - self.config.generator_config.src_device.mac, - vm1_port_net2['mac_address'], - vm0_port_net0['mac_address'], - vm0_port_net2['mac_address']) - config_file1 = self.get_config_file(chain_index, - vm0_port_net2['mac_address'], - self.config.generator_config.dst_device.mac, - vm1_port_net2['mac_address'], - vm1_port_net1['mac_address']) - - vm1 = self._create_server(name0, [vm0_port_net0, vm0_port_net2], az1, config_file0) - self.vms.append(vm1) - if chain_index == 0: - # First VM on first chain, save the hypervisor name. Used - # in future for maintain affinity. - self._ensure_vms_active() - vm1 = self.comp.poll_server(vm1) - az1 = "%s:%s" % (getattr(vm1, 'OS-EXT-AZ:availability_zone'), - getattr(vm1, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) - if not self.config.inter_node: - # By default, NOVA scheduler will try first with - # different hypervisor for workload balance, but when - # inter-node is not configured, use the same AZ to run - # intra-node test case. - az2 = az1 - - vm2 = self._create_server(name1, [vm1_port_net2, vm1_port_net1], az2, config_file1) - self.vms.append(vm2) - if chain_index == 0 and self.config.inter_node: - # Second VM on first chain, save the hypervisor name. Used - # in future for maintain affinity. - self._ensure_vms_active() - vm2 = self.comp.poll_server(vm2) - az2 = "%s:%s" % (getattr(vm2, 'OS-EXT-AZ:availability_zone'), - getattr(vm2, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) - if az1 == az2: - # Configure to run inter-node, but not enough node to run - self.config.inter_node = False - LOG.info('Using intra-node instead of inter-node.') - - self._ensure_vms_active() - self.compute_nodes = set(self.get_loop_vm_compute_nodes()) - self.set_ports() diff --git a/nfvbench/chain_managers.py b/nfvbench/chain_managers.py deleted file mode 100644 index 5882913..0000000 --- a/nfvbench/chain_managers.py +++ /dev/null @@ -1,256 +0,0 @@ -#!/usr/bin/env python -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -import time - -from log import LOG -from network import Network -from packet_analyzer import PacketAnalyzer -from specs import ChainType -from stats_collector import IntervalCollector - - -class StageManager(object): - """A class to stage resources in the systenm under test.""" - - def __init__(self, config, cred, factory): - self.config = config - self.client = None - # conditions due to EXT chain special cases - if (config.vlan_tagging and not config.vlans) or not config.no_int_config: - VM_CLASS = factory.get_stage_class(config.service_chain) - self.client = VM_CLASS(config, cred) - self.client.setup() - - def get_vlans(self): - return self.client.get_vlans() if self.client else [] - - def get_host_ips(self): - return self.client.get_host_ips() - - def get_networks_uuids(self): - return self.client.get_networks_uuids() - - def disable_port_security(self): - self.client.disable_port_security() - - def get_vms(self): - return self.client.vms - - def get_nets(self): - return self.client.nets - - def get_ports(self): - return self.client.ports - - def get_compute_nodes(self): - return self.client.compute_nodes if self.client else {} - - def set_vm_macs(self): - if self.client and self.config.service_chain != ChainType.EXT: - self.config.generator_config.set_vm_mac_list(self.client.get_end_port_macs()) - - def close(self): - if not self.config.no_cleanup and self.client: - self.client.dispose() - - -class PVPStatsManager(object): - """A class to generate traffic and extract results for PVP chains.""" - - def __init__(self, config, clients, specs, factory, vlans, notifier=None): - self.config = config - self.clients = clients - self.specs = specs - self.notifier = notifier - self.interval_collector = None - self.vlans = vlans - self.factory = factory - self._setup() - - def set_vlan_tag(self, device, vlan): - if self.worker: - self.worker.set_vlan_tag(device, vlan) - else: - device.set_vlan_tag(vlan) - - def _setup(self): - WORKER_CLASS = self.factory.get_chain_worker(self.specs.openstack.encaps, - self.config.service_chain) - self.worker = WORKER_CLASS(self.config, self.clients, self.specs) - try: - self.worker.set_vlans(self.vlans) - self._config_interfaces() - except Exception: - # since the wrorker is up and running, we need to close it - # in case of exception - self.close() - raise - - def _get_data(self): - return self.worker.get_data() if self.worker else {} - - def _get_network(self, traffic_port, stats, reverse=False): - """Get the Network object corresponding to a given TG port. - - :param traffic_port: must be either 0 or 1 - :param stats: TG stats for given traffic port - :param reverse: specifies if the interface list for this network - should go from TG to loopback point (reverse=false) or - from loopback point to TG (reverse=true) - """ - # build the interface list in fwd direction (TG To loopback point) - interfaces = [self.clients['traffic'].get_interface(traffic_port, stats)] - if self.worker: - # if available, - # interfaces for workers must be aligned on the TG port number - interfaces.extend(self.worker.get_network_interfaces(traffic_port)) - # let Network reverse the interface order if needed - return Network(interfaces, reverse) - - def _config_interfaces(self): - if self.config.service_chain != ChainType.EXT: - self.clients['vm'].disable_port_security() - - self.worker.config_interfaces() - - def _generate_traffic(self): - if self.config.no_traffic: - return {} - - self.interval_collector = IntervalCollector(time.time()) - self.interval_collector.attach_notifier(self.notifier) - LOG.info('Starting to generate traffic...') - stats = {} - for stats in self.clients['traffic'].run_traffic(): - self.interval_collector.add(stats) - - LOG.info('...traffic generating ended.') - return stats - - def get_stats(self): - return self.interval_collector.get() if self.interval_collector else [] - - def get_version(self): - return self.worker.get_version() if self.worker else {} - - def run(self): - """Run analysis in both direction and return the analysis.""" - if self.worker: - self.worker.run() - - stats = self._generate_traffic() - result = { - 'raw_data': self._get_data(), - 'packet_analysis': {}, - 'stats': stats - } - - # fetch latest stats from traffic gen - stats = self.clients['traffic'].get_stats() - LOG.info('Requesting packet analysis on the forward direction...') - result['packet_analysis']['direction-forward'] = \ - self.get_analysis([self._get_network(0, stats), - self._get_network(1, stats, reverse=True)]) - LOG.info('Packet analysis on the forward direction completed') - - LOG.info('Requesting packet analysis on the reverse direction...') - result['packet_analysis']['direction-reverse'] = \ - self.get_analysis([self._get_network(1, stats), - self._get_network(0, stats, reverse=True)]) - - LOG.info('Packet analysis on the reverse direction completed') - return result - - def get_compute_nodes_bios(self): - return self.worker.get_compute_nodes_bios() if self.worker else {} - - @staticmethod - def get_analysis(nets): - LOG.info('Starting traffic analysis...') - - packet_analyzer = PacketAnalyzer() - # Traffic types are assumed to always alternate in every chain. Add a no stats interface in - # between if that is not the case. - tx = True - for network in nets: - for interface in network.get_interfaces(): - packet_analyzer.record(interface, 'tx' if tx else 'rx') - tx = not tx - - LOG.info('...traffic analysis completed') - return packet_analyzer.get_analysis() - - def close(self): - if self.worker: - self.worker.close() - - -class PVVPStatsManager(PVPStatsManager): - """A Class to generate traffic and extract results for PVVP chains.""" - - def __init__(self, config, clients, specs, factory, vlans, notifier=None): - PVPStatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) - - def run(self): - """Run analysis in both direction and return the analysis.""" - fwd_v2v_net, rev_v2v_net = self.worker.run() - - stats = self._generate_traffic() - result = { - 'raw_data': self._get_data(), - 'packet_analysis': {}, - 'stats': stats - } - # fetch latest stats from traffic gen - stats = self.clients['traffic'].get_stats() - fwd_nets = [self._get_network(0, stats)] - if fwd_v2v_net: - fwd_nets.append(fwd_v2v_net) - fwd_nets.append(self._get_network(1, stats, reverse=True)) - - rev_nets = [self._get_network(1, stats)] - if rev_v2v_net: - rev_nets.append(rev_v2v_net) - rev_nets.append(self._get_network(0, stats, reverse=True)) - - LOG.info('Requesting packet analysis on the forward direction...') - result['packet_analysis']['direction-forward'] = self.get_analysis(fwd_nets) - LOG.info('Packet analysis on the forward direction completed') - - LOG.info('Requesting packet analysis on the reverse direction...') - result['packet_analysis']['direction-reverse'] = self.get_analysis(rev_nets) - - LOG.info('Packet analysis on the reverse direction completed') - return result - - -class EXTStatsManager(PVPStatsManager): - """A Class to generate traffic and extract results for EXT chains.""" - - def __init__(self, config, clients, specs, factory, vlans, notifier=None): - PVPStatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) - - def _setup(self): - if self.specs.openstack: - WORKER_CLASS = self.factory.get_chain_worker(self.specs.openstack.encaps, - self.config.service_chain) - self.worker = WORKER_CLASS(self.config, self.clients, self.specs) - self.worker.set_vlans(self.vlans) - - if not self.config.no_int_config: - self._config_interfaces() - else: - self.worker = None diff --git a/nfvbench/chain_runner.py b/nfvbench/chain_runner.py index 63cc48f..0a2665d 100644 --- a/nfvbench/chain_runner.py +++ b/nfvbench/chain_runner.py @@ -13,71 +13,175 @@ # License for the specific language governing permissions and limitations # under the License. # +"""This module takes care of coordinating a benchmark run between various modules. -import traceback +The ChainRunner class is in charge of coordinating: +- the chain manager which takes care of staging resources +- traffic generator client which drives the traffic generator +- the stats manager which collects and aggregates stats +""" +from collections import OrderedDict + +from chaining import ChainManager from log import LOG -from service_chain import ServiceChain +from specs import ChainType +from stats_manager import StatsManager from traffic_client import TrafficClient class ChainRunner(object): """Run selected chain, collect results and analyse them.""" - def __init__(self, config, clients, cred, specs, factory, notifier=None): + def __init__(self, config, cred, specs, factory, notifier=None): + """Create a new instance of chain runner. + + Create dependent components + A new instance is created everytime the nfvbench config may have changed. + + config: the new nfvbench config to use for this run + cred: openstack credentials (or None if no openstack) + specs: TBD + factory: + notifier: + """ self.config = config - self.clients = clients + self.cred = cred self.specs = specs self.factory = factory + self.notifier = notifier self.chain_name = self.config.service_chain - try: - TORClass = factory.get_tor_class(self.config.tor.type, self.config.no_tor_access) - except AttributeError: - raise Exception("Requested TOR class '{}' was not found.".format(self.config.tor.type)) - - self.clients['tor'] = TORClass(self.config.tor.switches) - self.clients['traffic'] = TrafficClient(config, notifier) - self.chain = ServiceChain(config, clients, cred, specs, factory, notifier) + # get an instance of traffic client + self.traffic_client = TrafficClient(config, notifier) + + if self.config.no_traffic: + LOG.info('Dry run: traffic generation is disabled') + else: + # Start the traffic generator server + self.traffic_client.start_traffic_generator() + + # get an instance of a chain manager + self.chain_manager = ChainManager(self) + + # at this point all resources are setup/discovered + # we need to program the traffic dest MAC and VLANs + gen_config = self.traffic_client.generator_config + if config.vlan_tagging: + # VLAN is discovered from the networks + gen_config.set_vlans(0, self.chain_manager.get_chain_vlans(0)) + gen_config.set_vlans(1, self.chain_manager.get_chain_vlans(1)) + + # the only case we do not need to set the dest MAC is in the case of + # l2-loopback (because the traffic gen will default to use the peer MAC) + # or EXT+ARP (because dest MAC will be discovered by TRex ARP) + if not config.l2_loopback and (config.service_chain != ChainType.EXT or config.no_arp): + gen_config.set_dest_macs(0, self.chain_manager.get_dest_macs(0)) + gen_config.set_dest_macs(1, self.chain_manager.get_dest_macs(1)) + + # get an instance of the stats manager + self.stats_manager = StatsManager(self) + LOG.info('ChainRunner initialized') + + def __setup_traffic(self): + self.traffic_client.setup() + if not self.config.no_traffic: + if self.config.service_chain == ChainType.EXT and not self.config.no_arp: + self.traffic_client.ensure_arp_successful() + self.traffic_client.ensure_end_to_end() + + def __get_result_per_frame_size(self, frame_size, actual_frame_size, bidirectional): + traffic_result = { + frame_size: {} + } + result = {} + if not self.config.no_traffic: + self.traffic_client.set_traffic(actual_frame_size, bidirectional) - LOG.info('ChainRunner initialized.') + if self.config.single_run: + result = self.stats_manager.run_fixed_rate() + else: + results = self.traffic_client.get_ndr_and_pdr() + + for dr in ['pdr', 'ndr']: + if dr in results: + if frame_size != actual_frame_size: + results[dr]['l2frame_size'] = frame_size + results[dr]['actual_l2frame_size'] = actual_frame_size + traffic_result[frame_size][dr] = results[dr] + if 'warning' in results[dr]['stats'] and results[dr]['stats']['warning']: + traffic_result['warning'] = results[dr]['stats']['warning'] + traffic_result[frame_size]['iteration_stats'] = results['iteration_stats'] + + if self.config.single_run: + result['run_config'] = self.traffic_client.get_run_config(result) + required = result['run_config']['direction-total']['orig']['rate_pps'] + actual = result['stats']['total_tx_rate'] + if frame_size != actual_frame_size: + result['actual_l2frame_size'] = actual_frame_size + warning = self.traffic_client.compare_tx_rates(required, actual) + if warning is not None: + result['run_config']['warning'] = warning + + traffic_result[frame_size].update(result) + return traffic_result + + def __get_chain_result(self): + result = OrderedDict() + for fs, actual_fs in zip(self.config.frame_sizes, self.config.actual_frame_sizes): + result.update(self.__get_result_per_frame_size(fs, + actual_fs, + self.config.traffic.bidirectional)) + chain_result = { + 'flow_count': self.config.flow_count, + 'service_chain_count': self.config.service_chain_count, + 'bidirectional': self.config.traffic.bidirectional, + 'profile': self.config.traffic.profile, + 'compute_nodes': self.stats_manager.get_compute_nodes_bios(), + 'result': result + } + return chain_result def run(self): - """ - Run a chain, collect and analyse results. + """Run the requested benchmark. - :return: dictionary + return: the results of the benchmark as a dict """ - self.clients['traffic'].start_traffic_generator() - self.clients['traffic'].set_macs() + LOG.info('Starting %s chain...', self.chain_name) + + results = {} + self.__setup_traffic() + # now that the dest MAC for all VNFs is known in all cases, it is time to create + # workers as they might be needed to extract stats prior to sending traffic + self.stats_manager.create_worker() - return self.chain.run() + results[self.chain_name] = {'result': self.__get_chain_result()} + + LOG.info("Service chain '%s' run completed.", self.chain_name) + return results def close(self): + """Close this instance of chain runner and delete resources if applicable.""" try: if not self.config.no_cleanup: LOG.info('Cleaning up...') + if self.chain_manager: + self.chain_manager.delete() else: LOG.info('Clean up skipped.') - - for client in ['traffic', 'tor']: - try: - self.clients[client].close() - except Exception as e: - traceback.print_exc() - LOG.error(e) - - self.chain.close() + try: + self.traffic_client.close() + except Exception: + LOG.exception() + if self.stats_manager: + self.stats_manager.close() except Exception: - traceback.print_exc() - LOG.error('Cleanup not finished.') + LOG.exception('Cleanup not finished') def get_version(self): - versions = { - 'Traffic Generator': self.clients['traffic'].get_version(), - 'TOR': self.clients['tor'].get_version(), - } - - versions.update(self.chain.get_version()) - + """Retrieve the version of dependent components.""" + versions = {} + if self.traffic_client: + versions['Traffic_Generator'] = self.traffic_client.get_version() + versions.update(self.stats_manager.get_version()) return versions diff --git a/nfvbench/chain_workers.py b/nfvbench/chain_workers.py index 2e36fb1..7c669d1 100644 --- a/nfvbench/chain_workers.py +++ b/nfvbench/chain_workers.py @@ -17,37 +17,37 @@ class BasicWorker(object): - def __init__(self, config, clients, specs): - self.config = config - self.clients = clients - self.specs = specs + def __init__(self, stats_manager): + self.stats_manager = stats_manager + self.chain_manager = stats_manager.chain_runner.chain_manager + self.config = stats_manager.config + self.specs = stats_manager.specs - def set_vlan_tag(self, device, vlan): - device.set_vlan_tag(vlan) + def get_compute_nodes_bios(self): + return {} - def set_vlans(self, vlans): - pass + def get_version(self): + return {} - def config_interfaces(self): + def close(self): pass - def get_data(self): - return {} + def insert_interface_stats(self, pps_list): + """Insert interface stats to a list of packet path stats. - def get_network_interfaces(self, index): - return [] + pps_list: a list of packet path stats instances indexed by chain index - def clear_interfaces(self): + Specialized workers can insert their own interface stats inside each existing packet path + stats for every chain. + """ pass - def run(self): - return None, None + def update_interface_stats(self, diff=False): + """Update all interface stats. - def get_compute_nodes_bios(self): - return {} - - def get_version(self): - return {} - - def close(self): + diff: if False, simply refresh the interface stats values with latest values + if True, diff the interface stats with the latest values + Make sure that the interface stats inserted in insert_interface_stats() are updated + with proper values + """ pass diff --git a/nfvbench/chaining.py b/nfvbench/chaining.py new file mode 100644 index 0000000..e5a9f0a --- /dev/null +++ b/nfvbench/chaining.py @@ -0,0 +1,988 @@ +#!/usr/bin/env python +# Copyright 2018 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +# This module takes care of chaining networks, ports and vms +# +"""NFVBENCH CHAIN DISCOVERY/STAGING. + +This module takes care of staging/discovering all resources that are participating in a +benchmarking session: flavors, networks, ports, VNF instances. +If a resource is discovered with the same name, it will be reused. +Otherwise it will be created. + +ChainManager: manages VM image, flavor, the staging discovery of all chains + has 1 or more chains +Chain: manages one chain, has 2 or more networks and 1 or more instances +ChainNetwork: manages 1 network in a chain +ChainVnf: manages 1 VNF instance in a chain, has 2 ports +ChainVnfPort: manages 1 instance port + +ChainManager-->Chain(*) +Chain-->ChainNetwork(*),ChainVnf(*) +ChainVnf-->ChainVnfPort(2) + +Once created/discovered, instances are checked to be in the active state (ready to pass traffic) +Configuration parameters that will influence how these resources are staged/related: +- openstack or no openstack +- chain type +- number of chains +- number of VNF in each chain (PVP, PVVP) +- SRIOV and middle port SRIOV for port types +- whether networks are shared across chains or not + +There is not traffic generation involved in this module. +""" +import os +import re +import time + +from glanceclient.v2 import client as glanceclient +from neutronclient.neutron import client as neutronclient +from novaclient.client import Client + +from attrdict import AttrDict +import compute +from log import LOG +from specs import ChainType + +# Left and right index for network and port lists +LEFT = 0 +RIGHT = 1 +# Name of the VM config file +NFVBENCH_CFG_FILENAME = 'nfvbenchvm.conf' +# full pathame of the VM config in the VM +NFVBENCH_CFG_VM_PATHNAME = os.path.join('/etc/', NFVBENCH_CFG_FILENAME) +# full path of the boot shell script template file on the server where nfvbench runs +BOOT_SCRIPT_PATHNAME = os.path.join(os.path.dirname(os.path.abspath(__file__)), + 'nfvbenchvm', + NFVBENCH_CFG_FILENAME) + + +class ChainException(Exception): + """Exception while operating the chains.""" + + pass + +class NetworkEncaps(object): + """Network encapsulation.""" + + +class ChainFlavor(object): + """Class to manage the chain flavor.""" + + def __init__(self, flavor_name, flavor_dict, comp): + """Create a flavor.""" + self.name = flavor_name + self.comp = comp + self.flavor = self.comp.find_flavor(flavor_name) + self.reuse = False + if self.flavor: + self.reuse = True + LOG.info("Reused flavor '%s'", flavor_name) + else: + extra_specs = flavor_dict.pop('extra_specs', None) + + self.flavor = comp.create_flavor(flavor_name, + **flavor_dict) + + LOG.info("Created flavor '%s'", flavor_name) + if extra_specs: + self.flavor.set_keys(extra_specs) + + def delete(self): + """Delete this flavor.""" + if not self.reuse and self.flavor: + self.flavor.delete() + LOG.info("Flavor '%s' deleted", self.name) + + +class ChainVnfPort(object): + """A port associated to one VNF in the chain.""" + + def __init__(self, name, vnf, chain_network, vnic_type): + """Create or reuse a port on a given network. + + if vnf.instance is None the VNF instance is not reused and this ChainVnfPort instance must + create a new port. + Otherwise vnf.instance is a reused VNF instance and this ChainVnfPort instance must + find an existing port to reuse that matches the port requirements: same attached network, + instance, name, vnic type + + name: name for this port + vnf: ChainVNf instance that owns this port + chain_network: ChainNetwork instance where this port should attach + vnic_type: required vnic type for this port + """ + self.name = name + self.vnf = vnf + self.manager = vnf.manager + self.reuse = False + self.port = None + if vnf.instance: + # VNF instance is reused, we need to find an existing port that matches this instance + # and network + # discover ports attached to this instance + port_list = self.manager.get_ports_from_network(chain_network) + for port in port_list: + if port['name'] != name: + continue + if port['binding:vnic_type'] != vnic_type: + continue + if port['device_id'] == vnf.get_uuid(): + self.port = port + LOG.info('Reusing existing port %s mac=%s', name, port['mac_address']) + break + else: + raise ChainException('Cannot find matching port') + else: + # VNF instance is not created yet, we need to create a new port + body = { + "port": { + 'name': name, + 'network_id': chain_network.get_uuid(), + 'binding:vnic_type': vnic_type + } + } + port = self.manager.neutron_client.create_port(body) + self.port = port['port'] + LOG.info('Created port %s', name) + try: + self.manager.neutron_client.update_port(self.port['id'], { + 'port': { + 'security_groups': [], + 'port_security_enabled': False, + } + }) + LOG.info('Security disabled on port %s', name) + except Exception: + LOG.info('Failed to disable security on port %s (ignored)', name) + + def get_mac(self): + """Get the MAC address for this port.""" + return self.port['mac_address'] + + def delete(self): + """Delete this port instance.""" + if self.reuse or not self.port: + return + retry = 0 + while retry < self.manager.config.generic_retry_count: + try: + self.manager.neutron_client.delete_port(self.port['id']) + LOG.info("Deleted port %s", self.name) + return + except Exception: + retry += 1 + time.sleep(self.manager.config.generic_poll_sec) + LOG.error('Unable to delete port: %s', self.name) + + +class ChainNetwork(object): + """Could be a shared network across all chains or a chain private network.""" + + def __init__(self, manager, network_config, chain_id=None, lookup_only=False): + """Create a network for given chain.""" + self.manager = manager + self.name = network_config.name + if chain_id is not None: + self.name += str(chain_id) + self.reuse = False + self.network = None + self.vlan = None + try: + self._setup(network_config, lookup_only) + except Exception: + if lookup_only: + LOG.error("Cannot find network %s", self.name) + else: + LOG.error("Error creating network %s", self.name) + self.delete() + raise + + def _setup(self, network_config, lookup_only): + # Lookup if there is a matching network with same name + networks = self.manager.neutron_client.list_networks(name=self.name) + if networks['networks']: + network = networks['networks'][0] + # a network of same name already exists, we need to verify it has the same + # characteristics + if network_config.segmentation_id: + if network['provider:segmentation_id'] != network_config.segmentation_id: + raise ChainException("Mismatch of 'segmentation_id' for reused " + "network '{net}'. Network has id '{seg_id1}', " + "configuration requires '{seg_id2}'." + .format(net=self.name, + seg_id1=network['provider:segmentation_id'], + seg_id2=network_config.segmentation_id)) + + if network_config.physical_network: + if network['provider:physical_network'] != network_config.physical_network: + raise ChainException("Mismatch of 'physical_network' for reused " + "network '{net}'. Network has '{phys1}', " + "configuration requires '{phys2}'." + .format(net=self.name, + phys1=network['provider:physical_network'], + phys2=network_config.physical_network)) + + LOG.info('Reusing existing network %s', self.name) + self.reuse = True + self.network = network + else: + if lookup_only: + raise ChainException('Network %s not found' % self.name) + body = { + 'network': { + 'name': self.name, + 'admin_state_up': True + } + } + if network_config.network_type: + body['network']['provider:network_type'] = network_config.network_type + if network_config.segmentation_id: + body['network']['provider:segmentation_id'] = network_config.segmentation_id + if network_config.physical_network: + body['network']['provider:physical_network'] = network_config.physical_network + + self.network = self.manager.neutron_client.create_network(body)['network'] + body = { + 'subnet': {'name': network_config.subnet, + 'cidr': network_config.cidr, + 'network_id': self.network['id'], + 'enable_dhcp': False, + 'ip_version': 4, + 'dns_nameservers': []} + } + subnet = self.manager.neutron_client.create_subnet(body)['subnet'] + # add subnet id to the network dict since it has just been added + self.network['subnets'] = [subnet['id']] + LOG.info('Created network: %s.', self.name) + + def get_uuid(self): + """ + Extract UUID of this network. + + :return: UUID of this network + """ + return self.network['id'] + + def get_vlan(self): + """ + Extract vlan for this network. + + :return: vlan ID for this network + """ + if self.network['provider:network_type'] != 'vlan': + raise ChainException('Trying to retrieve VLAN id for non VLAN network') + return self.network['provider:segmentation_id'] + + def delete(self): + """Delete this network.""" + if not self.reuse and self.network: + retry = 0 + while retry < self.manager.config.generic_retry_count: + try: + self.manager.neutron_client.delete_network(self.network['id']) + LOG.info("Deleted network: %s", self.name) + return + except Exception: + retry += 1 + LOG.info('Error deleting network %s (retry %d/%d)...', + self.name, + retry, + self.manager.config.generic_retry_count) + time.sleep(self.manager.config.generic_poll_sec) + LOG.error('Unable to delete network: %s', self.name) + + +class ChainVnf(object): + """A class to represent a VNF in a chain.""" + + def __init__(self, chain, vnf_id, networks): + """Reuse a VNF instance with same characteristics or create a new VNF instance. + + chain: the chain where this vnf belongs + vnf_id: indicates the index of this vnf in its chain (first vnf=0) + networks: the list of all networks (ChainNetwork) of the current chain + """ + self.manager = chain.manager + self.chain = chain + self.vnf_id = vnf_id + self.name = self.manager.config.loop_vm_name + str(chain.chain_id) + if len(networks) > 2: + # we will have more than 1 VM in each chain + self.name += '-' + str(vnf_id) + self.ports = [] + self.status = None + self.instance = None + self.reuse = False + self.host_ip = None + try: + # the vnf_id is conveniently also the starting index in networks + # for the left and right networks associated to this VNF + self._setup(networks[vnf_id:vnf_id + 2]) + except Exception: + LOG.error("Error creating VNF %s", self.name) + self.delete() + raise + + def _get_vm_config(self, remote_mac_pair): + config = self.manager.config + devices = self.manager.generator_config.devices + with open(BOOT_SCRIPT_PATHNAME, 'r') as boot_script: + content = boot_script.read() + g1cidr = devices[LEFT].get_gw_ip(self.chain.chain_id) + '/8' + g2cidr = devices[RIGHT].get_gw_ip(self.chain.chain_id) + '/8' + vm_config = { + 'forwarder': config.vm_forwarder, + 'intf_mac1': self.ports[LEFT].get_mac(), + 'intf_mac2': self.ports[RIGHT].get_mac(), + 'tg_gateway1_ip': devices[LEFT].tg_gateway_ip_addrs, + 'tg_gateway2_ip': devices[RIGHT].tg_gateway_ip_addrs, + 'tg_net1': devices[LEFT].ip_addrs, + 'tg_net2': devices[RIGHT].ip_addrs, + 'vnf_gateway1_cidr': g1cidr, + 'vnf_gateway2_cidr': g2cidr, + 'tg_mac1': remote_mac_pair[0], + 'tg_mac2': remote_mac_pair[1] + } + return content.format(**vm_config) + + def _get_vnic_type(self, port_index): + """Get the right vnic type for given port indexself. + + If SR-IOV is speficied, middle ports in multi-VNF chains + can use vswitch or SR-IOV based on config.use_sriov_middle_net + """ + if self.manager.config.sriov: + if self.manager.config.use_sriov_middle_net: + return 'direct' + if self.vnf_id == 0: + # first VNF in chain must use sriov for left port + if port_index == 0: + return 'direct' + elif (self.vnf_id == self.chain.get_length() - 1) and (port_index == 1): + # last VNF in chain must use sriov for right port + return 'direct' + return 'normal' + + def _setup(self, networks): + flavor_id = self.manager.flavor.flavor.id + # Check if we can reuse an instance with same name + for instance in self.manager.existing_instances: + if instance.name == self.name: + # Verify that other instance characteristics match + if instance.flavor['id'] != flavor_id: + self._reuse_exception('Flavor mismatch') + if instance.status != "ACTIVE": + self._reuse_exception('Matching instance is not in ACTIVE state') + # The 2 networks for this instance must also be reused + if not networks[LEFT].reuse: + self._reuse_exception('network %s is new' % networks[LEFT].name) + if not networks[RIGHT].reuse: + self._reuse_exception('network %s is new' % networks[RIGHT].name) + # instance.networks have the network names as keys: + # {'nfvbench-rnet0': ['192.168.2.10'], 'nfvbench-lnet0': ['192.168.1.8']} + if networks[LEFT].name not in instance.networks: + self._reuse_exception('Left network mismatch') + if networks[RIGHT].name not in instance.networks: + self._reuse_exception('Right network mismatch') + # Other checks not performed (yet) + # check if az and compute node match + self.reuse = True + self.instance = instance + LOG.info('Reusing existing instance %s on %s', + self.name, self.get_hypervisor_name()) + # create or reuse/discover 2 ports per instance + self.ports = [ChainVnfPort(self.name + '-' + str(index), + self, + networks[index], + self._get_vnic_type(index)) for index in [0, 1]] + # if no reuse, actual vm creation is deferred after all ports in the chain are created + # since we need to know the next mac in a multi-vnf chain + + def get_az(self): + """Get the AZ associated to this VNF.""" + return self.manager.az[0] + + def create_vnf(self, remote_mac_pair): + """Create the VNF instance if it does not already exist.""" + if self.instance is None: + port_ids = [{'port-id': vnf_port.port['id']} + for vnf_port in self.ports] + vm_config = self._get_vm_config(remote_mac_pair) + az = self.get_az() + server = self.manager.comp.create_server(self.name, + self.manager.image_instance, + self.manager.flavor.flavor, + None, + port_ids, + None, + avail_zone=az, + user_data=None, + config_drive=True, + files={NFVBENCH_CFG_VM_PATHNAME: vm_config}) + if server: + LOG.info('Created instance %s on %s', self.name, az) + self.instance = server + self.reuse = False + else: + raise ChainException('Unable to create instance: %s' % (self.name)) + + def _reuse_exception(self, reason): + raise ChainException('Instance %s cannot be reused (%s)' % (self.name, reason)) + + def get_status(self): + """Get the statis of this instance.""" + if self.instance.status != 'ACTIVE': + self.instance = self.manager.comp.poll_server(self.instance) + return self.instance.status + + def get_hostname(self): + """Get the hypervisor host name running this VNF instance.""" + return getattr(self.instance, 'OS-EXT-SRV-ATTR:hypervisor_hostname') + + def get_host_ip(self): + """Get the IP address of the host where this instance runs. + + return: the IP address + """ + if not self.host_ip: + self.host_ip = self.manager.comp.get_hypervisor(self.get_hostname()).host_ip + return self.host_ip + + def get_hypervisor_name(self): + """Get hypervisor name (az:hostname) for this VNF instance.""" + if self.instance: + az = getattr(self.instance, 'OS-EXT-AZ:availability_zone') + hostname = self.get_hostname() + if az: + return az + ':' + hostname + return hostname + return None + + def get_uuid(self): + """Get the uuid for this instance.""" + return self.instance.id + + def delete(self, forced=False): + """Delete this VNF instance.""" + if self.reuse: + LOG.info("Instance %s not deleted (reused)", self.name) + else: + if self.instance: + self.manager.comp.delete_server(self.instance) + LOG.info("Deleted instance %s", self.name) + for port in self.ports: + port.delete() + +class Chain(object): + """A class to manage a single chain. + + Can handle any type of chain (EXT, PVP, PVVP) + """ + + def __init__(self, chain_id, manager): + """Create a new chain. + + chain_id: chain index (first chain is 0) + manager: the chain manager that owns all chains + """ + self.chain_id = chain_id + self.manager = manager + self.encaps = manager.encaps + self.networks = [] + self.instances = [] + try: + self.networks = manager.get_networks(chain_id) + # For external chain VNFs can only be discovered from their MAC addresses + # either from config or from ARP + if manager.config.service_chain != ChainType.EXT: + for chain_instance_index in range(self.get_length()): + self.instances.append(ChainVnf(self, + chain_instance_index, + self.networks)) + # now that all VNF ports are created we need to calculate the + # left/right remote MAC for each VNF in the chain + # before actually creating the VNF itself + rem_mac_pairs = self._get_remote_mac_pairs() + for instance in self.instances: + rem_mac_pair = rem_mac_pairs.pop(0) + instance.create_vnf(rem_mac_pair) + except Exception: + self.delete() + raise + + def get_length(self): + """Get the number of VNF in the chain.""" + return len(self.networks) - 1 + + def _get_remote_mac_pairs(self): + """Get the list of remote mac pairs for every VNF in the chain. + + Traverse the chain from left to right and establish the + left/right remote MAC for each VNF in the chainself. + + PVP case is simpler: + mac sequence: tg_src_mac, vm0-mac0, vm0-mac1, tg_dst_mac + must produce [[tg_src_mac, tg_dst_mac]] or looking at index in mac sequence: [[0, 3]] + the mac pair is what the VNF at that position (index 0) sees as next hop mac left and right + + PVVP: + tg_src_mac, vm0-mac0, vm0-mac1, vm1-mac0, vm1-mac1, tg_dst_mac + Must produce the following list: + [[tg_src_mac, vm1-mac0], [vm0-mac1, tg_dst_mac]] or index: [[0, 3], [2, 5]] + + General case with 3 VMs in chain, the list of consecutive macs (left to right): + tg_src_mac, vm0-mac0, vm0-mac1, vm1-mac0, vm1-mac1, vm2-mac0, vm2-mac1, tg_dst_mac + Must produce the following list: + [[tg_src_mac, vm1-mac0], [vm0-mac1, vm2-mac0], [vm1-mac1, tg_dst_mac]] + or index: [[0, 3], [2, 5], [4, 7]] + + The series pattern is pretty clear: [[n, n+3],... ] where n is multiple of 2 + """ + # line up all mac from left to right + mac_seq = [self.manager.generator_config.devices[LEFT].mac] + for instance in self.instances: + mac_seq.append(instance.ports[0].get_mac()) + mac_seq.append(instance.ports[1].get_mac()) + mac_seq.append(self.manager.generator_config.devices[RIGHT].mac) + base = 0 + rem_mac_pairs = [] + for _ in self.instances: + rem_mac_pairs.append([mac_seq[base], mac_seq[base + 3]]) + base += 2 + return rem_mac_pairs + + def get_instances(self): + """Return all instances for this chain.""" + return self.instances + + def get_vlan(self, port_index): + """Get the VLAN id on a given port. + + port_index: left port is 0, right port is 1 + return: the vlan_id or None if there is no vlan tagging + """ + # for port 1 we need to return the VLAN of the last network in the chain + # The networks array contains 2 networks for PVP [left, right] + # and 3 networks in the case of PVVP [left.middle,right] + if port_index: + # this will pick the last item in array + port_index = -1 + return self.networks[port_index].get_vlan() + + def get_dest_mac(self, port_index): + """Get the dest MAC on a given port. + + port_index: left port is 0, right port is 1 + return: the dest MAC + """ + if port_index: + # for right port, use the right port MAC of the last (right most) VNF In chain + return self.instances[-1].ports[1].get_mac() + # for left port use the left port MAC of the first (left most) VNF in chain + return self.instances[0].ports[0].get_mac() + + def get_network_uuids(self): + """Get UUID of networks in this chain from left to right (order is important). + + :return: list of UUIDs of networks (2 or 3 elements) + """ + return [net['id'] for net in self.networks] + + def get_host_ips(self): + """Return the IP adresss(es) of the host compute nodes used for this chain. + + :return: a list of 1 or 2 IP addresses + """ + return [vnf.get_host_ip() for vnf in self.instances] + + def get_compute_nodes(self): + """Return the name of the host compute nodes used for this chain. + + :return: a list of 1 host name in the az:host format + """ + # Since all chains go through the same compute node(s) we can just retrieve the + # compute node name(s) for the first chain + return [vnf.get_hypervisor_name() for vnf in self.instances] + + def delete(self): + """Delete this chain.""" + for instance in self.instances: + instance.delete() + # only delete if these are chain private networks (not shared) + if not self.manager.config.service_chain_shared_net: + for network in self.networks: + network.delete() + + +class ChainManager(object): + """A class for managing all chains for a given run. + + Supports openstack or no openstack. + Supports EXT, PVP and PVVP chains. + """ + + def __init__(self, chain_runner): + """Create a chain manager to take care of discovering or bringing up the requested chains. + + A new instance must be created every time a new config is used. + config: the nfvbench config to use + cred: openstack credentials to use of None if there is no openstack + """ + self.chain_runner = chain_runner + self.config = chain_runner.config + self.generator_config = chain_runner.traffic_client.generator_config + self.chains = [] + self.image_instance = None + self.image_name = None + # Left and right networks shared across all chains (only if shared) + self.networks = [] + self.encaps = None + self.flavor = None + self.comp = None + self.nova_client = None + self.neutron_client = None + self.glance_client = None + self.existing_instances = [] + # existing ports keyed by the network uuid they belong to + self._existing_ports = {} + config = self.config + self.openstack = (chain_runner.cred is not None) and not config.l2_loopback + self.chain_count = config.service_chain_count + if self.openstack: + # openstack only + session = chain_runner.cred.get_session() + self.nova_client = Client(2, session=session) + self.neutron_client = neutronclient.Client('2.0', session=session) + self.glance_client = glanceclient.Client('2', session=session) + self.comp = compute.Compute(self.nova_client, + self.glance_client, + config) + self.az = None + try: + if config.service_chain != ChainType.EXT: + # we need to find 1 hypervisor + az_list = self.comp.get_enabled_az_host_list(1) + if not az_list: + raise ChainException('No matching hypervisor found') + self.az = az_list + self._setup_image() + self.flavor = ChainFlavor(config.flavor_type, config.flavor, self.comp) + # Get list of all existing instances to check if some instances can be reused + self.existing_instances = self.comp.get_server_list() + # If networks are shared across chains, get the list of networks + if config.service_chain_shared_net: + self.networks = self.get_networks() + # Reuse/create chains + for chain_id in range(self.chain_count): + self.chains.append(Chain(chain_id, self)) + if config.service_chain == ChainType.EXT: + # if EXT and no ARP we need to read dest MACs from config + if config.no_arp: + self._get_dest_macs_from_config() + else: + # Make sure all instances are active before proceeding + self._ensure_instances_active() + except Exception: + self.delete() + raise + else: + # no openstack, no need to create chains + # make sure there at least as many entries as chains in each left/right list + if len(config.vlans) != 2: + raise ChainException('The config vlans property must be a list ' + 'with 2 lists of VLAN IDs') + if not config.l2_loopback: + self._get_dest_macs_from_config() + + re_vlan = "[0-9]*$" + self.vlans = [self._check_list('vlans[0]', config.vlans[0], re_vlan), + self._check_list('vlans[1]', config.vlans[1], re_vlan)] + + def _get_dest_macs_from_config(self): + re_mac = "[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$" + tg_config = self.config.traffic_generator + self.dest_macs = [self._check_list("mac_addrs_left", + tg_config.mac_addrs_left, re_mac), + self._check_list("mac_addrs_right", + tg_config.mac_addrs_right, re_mac)] + + def _check_list(self, list_name, ll, pattern): + # if it is a single int or mac, make it a list of 1 int + if isinstance(ll, (int, str)): + ll = [ll] + if not ll or len(ll) < self.chain_count: + raise ChainException('%s=%s must be a list with 1 element per chain' % (list_name, ll)) + for item in ll: + if not re.match(pattern, str(item)): + raise ChainException("Invalid format '{item}' specified in {fname}" + .format(item=item, fname=list_name)) + return ll + + def _setup_image(self): + # To avoid reuploading image in server mode, check whether image_name is set or not + if self.image_name: + self.image_instance = self.comp.find_image(self.image_name) + if self.image_instance: + LOG.info("Reusing image %s", self.image_name) + else: + image_name_search_pattern = r'(nfvbenchvm-\d+(\.\d+)*).qcow2' + if self.config.vm_image_file: + match = re.search(image_name_search_pattern, self.config.vm_image_file) + if match: + self.image_name = match.group(1) + LOG.info('Using provided VM image file %s', self.config.vm_image_file) + else: + raise ChainException('Provided VM image file name %s must start with ' + '"nfvbenchvm-"' % self.config.vm_image_file) + else: + pkg_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + for f in os.listdir(pkg_root): + if re.search(image_name_search_pattern, f): + self.config.vm_image_file = pkg_root + '/' + f + self.image_name = f.replace('.qcow2', '') + LOG.info('Found built-in VM image file %s', f) + break + else: + raise ChainException('Cannot find any built-in VM image file.') + if self.image_name: + self.image_instance = self.comp.find_image(self.image_name) + if not self.image_instance: + LOG.info('Uploading %s', self.image_name) + res = self.comp.upload_image_via_url(self.image_name, + self.config.vm_image_file) + + if not res: + raise ChainException('Error uploading image %s from %s. ABORTING.' % + (self.image_name, self.config.vm_image_file)) + LOG.info('Image %s successfully uploaded.', self.image_name) + self.image_instance = self.comp.find_image(self.image_name) + + def _ensure_instances_active(self): + instances = [] + for chain in self.chains: + instances.extend(chain.get_instances()) + initial_instance_count = len(instances) + max_retries = (self.config.check_traffic_time_sec + + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec + retry = 0 + while instances: + remaining_instances = [] + for instance in instances: + status = instance.get_status() + if status == 'ACTIVE': + continue + if status == 'ERROR': + raise ChainException('Instance %s creation error: %s' % + (instance.name, + instance.instance.fault['message'])) + remaining_instances.append(instance) + if not remaining_instances: + break + retry += 1 + if retry >= max_retries: + raise ChainException('Time-out: %d/%d instances still not active' % + (len(remaining_instances), initial_instance_count)) + LOG.info('Waiting for %d/%d instance to become active (retry %d/%d)...', + len(remaining_instances), initial_instance_count, + retry, max_retries) + instances = remaining_instances + time.sleep(self.config.generic_poll_sec) + if initial_instance_count: + LOG.info('All instances are active') + + def get_networks(self, chain_id=None): + """Get the networks for given EXT, PVP or PVVP chain. + + For EXT packet path, these networks must pre-exist. + For PVP, PVVP these networks will be created if they do not exist. + chain_id: to which chain the networks belong. + a None value will mean that these networks are shared by all chains + """ + if self.networks: + # the only case where self.networks exists is when the networks are shared + # across all chains + return self.networks + if self.config.service_chain == ChainType.EXT: + lookup_only = True + ext_net = self.config.external_networks + net_cfg = [AttrDict({'name': name, + 'segmentation_id': None, + 'physical_network': None}) + for name in [ext_net.left, ext_net.right]] + else: + lookup_only = False + int_nets = self.config.internal_networks + if self.config.service_chain == ChainType.PVP: + net_cfg = [int_nets.left, int_nets.right] + else: + net_cfg = [int_nets.left, int_nets.middle, int_nets.right] + networks = [] + try: + for cfg in net_cfg: + networks.append(ChainNetwork(self, cfg, chain_id, lookup_only=lookup_only)) + except Exception: + # need to cleanup all successful networks prior to bailing out + for net in networks: + net.delete() + raise + return networks + + def get_existing_ports(self): + """Get the list of existing ports. + + Lazy retrieval of ports as this can be costly if there are lots of ports and + is only needed when VM and network are being reused. + + return: a dict of list of neutron ports indexed by the network uuid they are attached to + + Each port is a dict with fields such as below: + {'allowed_address_pairs': [], 'extra_dhcp_opts': [], + 'updated_at': '2018-10-06T07:15:35Z', 'device_owner': 'compute:nova', + 'revision_number': 10, 'port_security_enabled': False, 'binding:profile': {}, + 'fixed_ips': [{'subnet_id': '6903a3b3-49a1-4ba4-8259-4a90e7a44b21', + 'ip_address': '192.168.1.4'}], 'id': '3dcb9cfa-d82a-4dd1-85a1-fd8284b52d72', + 'security_groups': [], + 'binding:vif_details': {'vhostuser_socket': '/tmp/3dcb9cfa-d82a-4dd1-85a1-fd8284b52d72', + 'vhostuser_mode': 'server'}, + 'binding:vif_type': 'vhostuser', + 'mac_address': 'fa:16:3e:3c:63:04', + 'project_id': '977ac76a63d7492f927fa80e86baff4c', + 'status': 'ACTIVE', + 'binding:host_id': 'a20-champagne-compute-1', + 'description': '', + 'device_id': 'a98e2ad2-5371-4aa5-a356-8264a970ce4b', + 'name': 'nfvbench-loop-vm0-0', 'admin_state_up': True, + 'network_id': '3ea5fd88-278f-4d9d-b24d-1e443791a055', + 'tenant_id': '977ac76a63d7492f927fa80e86baff4c', + 'created_at': '2018-10-06T07:15:10Z', + 'binding:vnic_type': 'normal'} + """ + if not self._existing_ports: + LOG.info('Loading list of all ports...') + existing_ports = self.neutron_client.list_ports()['ports'] + # place all ports in the dict keyed by the port network uuid + for port in existing_ports: + port_list = self._existing_ports.setdefault(port['network_id'], []) + port_list.append(port) + LOG.info("Loaded %d ports attached to %d networks", + len(existing_ports), len(self._existing_ports)) + return self._existing_ports + + def get_ports_from_network(self, chain_network): + """Get the list of existing ports that belong to a network. + + Lazy retrieval of ports as this can be costly if there are lots of ports and + is only needed when VM and network are being reused. + + chain_network: a ChainNetwork instance for which attached ports neeed to be retrieved + return: list of neutron ports attached to requested network + """ + return self.get_existing_ports().get(chain_network.get_uuid(), None) + + def get_host_ip_from_mac(self, mac): + """Get the host IP address matching a MAC. + + mac: MAC address to look for + return: the IP address of the host where the matching port runs or None if not found + """ + # _existing_ports is a dict of list of ports indexed by network id + for port_list in self.get_existing_ports().values(): + for port in port_list: + try: + if port['mac_address'] == mac: + host_id = port['binding:host_id'] + return self.comp.get_hypervisor(host_id).host_ip + except KeyError: + pass + return None + + def get_chain_vlans(self, port_index): + """Get the list of per chain VLAN id on a given port. + + port_index: left port is 0, right port is 1 + return: a VLAN ID list indexed by the chain index or None if no vlan tagging + """ + if self.chains: + return [self.chains[chain_index].get_vlan(port_index) + for chain_index in range(self.chain_count)] + # no openstack + return self.vlans[port_index] + + def get_dest_macs(self, port_index): + """Get the list of per chain dest MACs on a given port. + + Should not be called if EXT+ARP is used (in that case the traffic gen will + have the ARP responses back from VNFs with the dest MAC to use). + + port_index: left port is 0, right port is 1 + return: a list of dest MACs indexed by the chain index + """ + if self.chains and self.config.service_chain != ChainType.EXT: + return [self.chains[chain_index].get_dest_mac(port_index) + for chain_index in range(self.chain_count)] + # no openstack or EXT+no-arp + return self.dest_macs[port_index] + + def get_host_ips(self): + """Return the IP adresss(es) of the host compute nodes used for this run. + + :return: a list of 1 IP address + """ + # Since all chains go through the same compute node(s) we can just retrieve the + # compute node(s) for the first chain + if self.chains: + if self.config.service_chain != ChainType.EXT: + return self.chains[0].get_host_ips() + # in the case of EXT, the compute node must be retrieved from the port + # associated to any of the dest MACs + dst_macs = self.chain_runner.traffic_client.gen.get_dest_macs() + # dest MAC on port 0, chain 0 + dst_mac = dst_macs[0][0] + host_ip = self.get_host_ip_from_mac(dst_mac) + if host_ip: + LOG.info('Found compute node IP for EXT chain: %s', host_ip) + return [host_ip] + return [] + + def get_compute_nodes(self): + """Return the name of the host compute nodes used for this run. + + :return: a list of 0 or 1 host name in the az:host format + """ + # Since all chains go through the same compute node(s) we can just retrieve the + # compute node name(s) for the first chain + if self.chains: + # in the case of EXT, the compute node must be retrieved from the port + # associated to any of the dest MACs + return self.chains[0].get_compute_nodes() + # no openstack = no chains + return [] + + def delete(self): + """Delete resources for all chains. + + Will not delete any resource if no-cleanup has been requested. + """ + if self.config.no_cleanup: + return + for chain in self.chains: + chain.delete() + for network in self.networks: + network.delete() + if self.flavor: + self.flavor.delete() diff --git a/nfvbench/cleanup.py b/nfvbench/cleanup.py index 246be3f..819514a 100644 --- a/nfvbench/cleanup.py +++ b/nfvbench/cleanup.py @@ -54,7 +54,7 @@ class ComputeCleaner(object): except Exception: LOG.exception("Instance %s deletion failed", server.name) LOG.info(' Waiting for %d instances to be fully deleted...', len(self.servers)) - retry_count = 5 + len(self.servers) * 2 + retry_count = 15 + len(self.servers) * 5 while True: retry_count -= 1 self.servers = [server for server in self.servers if self.instance_exists(server)] @@ -66,7 +66,7 @@ class ComputeCleaner(object): len(self.servers), retry_count) time.sleep(2) else: - LOG.warning(' instance deletion verification timed out: %d not removed', + LOG.warning(' instance deletion verification time-out: %d still not deleted', len(self.servers)) break @@ -74,20 +74,19 @@ class ComputeCleaner(object): class NetworkCleaner(object): """A cleaner for network resources.""" - def __init__(self, neutron_client, network_names): + def __init__(self, neutron_client, network_name_prefixes): self.neutron_client = neutron_client LOG.info('Discovering networks...') all_networks = self.neutron_client.list_networks()['networks'] self.networks = [] + net_ids = [] for net in all_networks: - try: - network_names.remove(net['name']) - self.networks.append(net) - except ValueError: - pass - if not network_names: - break - net_ids = [net['id'] for net in self.networks] + netname = net['name'] + for prefix in network_name_prefixes: + if netname.startswith(prefix): + self.networks.append(net) + net_ids.append(net['id']) + break if net_ids: LOG.info('Discovering ports...') all_ports = self.neutron_client.list_ports()['ports'] @@ -161,8 +160,8 @@ class Cleaner(object): table.extend(res_list) count = len(table) - 1 if count: - LOG.info('Discovered %d NFVbench resources:', count) - print tabulate(table, headers="firstrow", tablefmt="psql") + LOG.info('Discovered %d NFVbench resources:\n%s', count, + tabulate(table, headers="firstrow", tablefmt="psql")) else: LOG.info('No matching NFVbench resources found') return count diff --git a/nfvbench/compute.py b/nfvbench/compute.py index af1a0d6..3f97166 100644 --- a/nfvbench/compute.py +++ b/nfvbench/compute.py @@ -11,8 +11,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Module for Openstack compute operations""" -import os import time import traceback @@ -28,10 +26,9 @@ from log import LOG class Compute(object): - def __init__(self, nova_client, glance_client, neutron_client, config): + def __init__(self, nova_client, glance_client, config): self.novaclient = nova_client self.glance_client = glance_client - self.neutronclient = neutron_client self.config = config def find_image(self, image_name): @@ -43,9 +40,7 @@ class Compute(object): return None def upload_image_via_url(self, final_image_name, image_file, retry_count=60): - ''' - Directly uploads image to Nova via URL if image is not present - ''' + """Directly upload image to Nova via URL if image is not present.""" retry = 0 try: # check image is file/url based. @@ -93,56 +88,6 @@ class Compute(object): return True - # Remove keypair name from openstack if exists - def remove_public_key(self, name): - keypair_list = self.novaclient.keypairs.list() - for key in keypair_list: - if key.name == name: - self.novaclient.keypairs.delete(name) - LOG.info('Removed public key %s', name) - break - - # Test if keypair file is present if not create it - def create_keypair(self, name, private_key_pair_file): - self.remove_public_key(name) - keypair = self.novaclient.keypairs.create(name) - # Now write the keypair to the file if requested - if private_key_pair_file: - kpf = os.open(private_key_pair_file, - os.O_WRONLY | os.O_CREAT, 0o600) - with os.fdopen(kpf, 'w') as kpf: - kpf.write(keypair.private_key) - return keypair - - # Add an existing public key to openstack - def add_public_key(self, name, public_key_file): - self.remove_public_key(name) - # extract the public key from the file - public_key = None - try: - with open(os.path.expanduser(public_key_file)) as pkf: - public_key = pkf.read() - except IOError as exc: - LOG.error('Cannot open public key file %s: %s', public_key_file, exc) - return None - keypair = self.novaclient.keypairs.create(name, public_key) - return keypair - - def init_key_pair(self, kp_name, ssh_access): - '''Initialize the key pair for all test VMs - if a key pair is specified in access, use that key pair else - create a temporary key pair - ''' - if ssh_access.public_key_file: - return self.add_public_key(kp_name, ssh_access.public_key_file) - keypair = self.create_keypair(kp_name, None) - ssh_access.private_key = keypair.private_key - return keypair - - def find_network(self, label): - net = self.novaclient.networks.find(label=label) - return net - # Create a server instance with name vmname # and check that it gets into the ACTIVE state def create_server(self, vmname, image, flavor, key_name, @@ -174,48 +119,6 @@ class Compute(object): servers_list = self.novaclient.servers.list() return servers_list - def find_floating_ips(self): - floating_ip = self.novaclient.floating_ips.list() - return floating_ip - - def create_floating_ips(self, pool): - return self.novaclient.floating_ips.create(pool) - - # Return the server network for a server - def find_server_network(self, vmname): - servers_list = self.get_server_list() - for server in servers_list: - if server.name == vmname and server.status == "ACTIVE": - return server.networks - return None - - # Returns True if server is present false if not. - # Retry for a few seconds since after VM creation sometimes - # it takes a while to show up - def find_server(self, vmname, retry_count): - for retry_attempt in range(retry_count): - servers_list = self.get_server_list() - for server in servers_list: - if server.name == vmname and server.status == "ACTIVE": - return True - # Sleep between retries - LOG.debug("[%s] VM not yet found, retrying %s of %s...", - vmname, (retry_attempt + 1), retry_count) - time.sleep(self.config.generic_poll_sec) - LOG.error("[%s] VM not found, after %s attempts", vmname, retry_count) - return False - - # Returns True if server is found and deleted/False if not, - # retry the delete if there is a delay - def delete_server_by_name(self, vmname): - servers_list = self.get_server_list() - for server in servers_list: - if server.name == vmname: - LOG.info('Deleting server %s', server) - self.novaclient.servers.delete(server) - return True - return False - def delete_server(self, server): self.novaclient.servers.delete(server) @@ -226,32 +129,22 @@ class Compute(object): except Exception: return None - def create_flavor(self, name, ram, vcpus, disk, ephemeral=0, override=False): - if override: - self.delete_flavor(name) + def create_flavor(self, name, ram, vcpus, disk, ephemeral=0): return self.novaclient.flavors.create(name=name, ram=ram, vcpus=vcpus, disk=disk, ephemeral=ephemeral) - def delete_flavor(self, flavor=None, name=None): - try: - if not flavor: - flavor = self.find_flavor(name) - flavor.delete() - return True - except Exception: - return False - def normalize_az_host(self, az, host): if not az: az = self.config.availability_zone return az + ':' + host def auto_fill_az(self, host_list, host): - ''' + """Auto fill az:host. + no az provided, if there is a host list we can auto-fill the az else we use the configured az if available else we return an error - ''' + """ if host_list: for hyp in host_list: if hyp.host == host: @@ -265,7 +158,8 @@ class Compute(object): return None def sanitize_az_host(self, host_list, az_host): - ''' + """Sanitize the az:host string. + host_list: list of hosts as retrieved from openstack (can be empty) az_host: either a host or a az:host string if a host, will check host is in the list, find the corresponding az and @@ -273,7 +167,7 @@ class Compute(object): if az:host is passed will check the host is in the list and az matches if host_list is empty, will return the configured az if there is no az passed - ''' + """ if ':' in az_host: # no host_list, return as is (no check) if not host_list: @@ -301,9 +195,6 @@ class Compute(object): # The list of all hosts is retrieved first from openstack # if this fails, checks and az auto-fill are disabled # - # If the user provides a list of hypervisors (--hypervisor) - # that list is checked and returned - # # If the user provides a configured az name (config.availability_zone) # up to the first 2 hosts from the list that match the az are returned # @@ -315,49 +206,10 @@ class Compute(object): # [ az1:hyp1, az2:hyp2 ] # [] if an error occurred (error message printed to console) # - def get_az_host_list(self): - avail_list = [] - host_list = [] - - try: - host_list = self.novaclient.services.list() - except novaclient.exceptions.Forbidden: - LOG.warning('Operation Forbidden: could not retrieve list of hosts' - ' (likely no permission)') - - for host in host_list: - # this host must be a compute node - if host.binary != 'nova-compute' or host.state != 'up': - continue - candidate = None - if self.config.availability_zone: - if host.zone == self.config.availability_zone: - candidate = self.normalize_az_host(None, host.host) - else: - candidate = self.normalize_az_host(host.zone, host.host) - if candidate: - avail_list.append(candidate) - # pick first 2 matches at most - if len(avail_list) == 2: - break - - # if empty we insert the configured az - if not avail_list: - - if not self.config.availability_zone: - LOG.error('Availability_zone must be configured') - elif host_list: - LOG.error('No host matching the selection for availability zone: %s', - self.config.availability_zone) - avail_list = [] - else: - avail_list = [self.config.availability_zone] - return avail_list - def get_enabled_az_host_list(self, required_count=1): - """ - Check which hypervisors are enabled and on which compute nodes they are running. - Pick required count of hosts. + """Check which hypervisors are enabled and on which compute nodes they are running. + + Pick up to the required count of hosts (can be less or zero) :param required_count: count of compute-nodes to return :return: list of enabled available compute nodes @@ -398,76 +250,3 @@ class Compute(object): hyper = self.novaclient.hypervisors.search(hyper_name)[0] # get full hypervisor object return self.novaclient.hypervisors.get(hyper.id) - - # Given 2 VMs test if they are running on same Host or not - def check_vm_placement(self, vm_instance1, vm_instance2): - try: - server_instance_1 = self.novaclient.servers.get(vm_instance1) - server_instance_2 = self.novaclient.servers.get(vm_instance2) - return bool(server_instance_1.hostId == server_instance_2.hostId) - except novaclient.exceptions: - LOG.warning("Exception in retrieving the hostId of servers") - - # Create a new security group with appropriate rules - def security_group_create(self): - # check first the security group exists - sec_groups = self.neutronclient.list_security_groups()['security_groups'] - group = [x for x in sec_groups if x['name'] == self.config.security_group_name] - if group: - return group[0] - - body = { - 'security_group': { - 'name': self.config.security_group_name, - 'description': 'PNS Security Group' - } - } - group = self.neutronclient.create_security_group(body)['security_group'] - self.security_group_add_rules(group) - - return group - - # Delete a security group - def security_group_delete(self, group): - if group: - LOG.info("Deleting security group") - self.neutronclient.delete_security_group(group['id']) - - # Add rules to the security group - def security_group_add_rules(self, group): - body = { - 'security_group_rule': { - 'direction': 'ingress', - 'security_group_id': group['id'], - 'remote_group_id': None - } - } - if self.config.ipv6_mode: - body['security_group_rule']['ethertype'] = 'IPv6' - body['security_group_rule']['remote_ip_prefix'] = '::/0' - else: - body['security_group_rule']['ethertype'] = 'IPv4' - body['security_group_rule']['remote_ip_prefix'] = '0.0.0.0/0' - - # Allow ping traffic - body['security_group_rule']['protocol'] = 'icmp' - body['security_group_rule']['port_range_min'] = None - body['security_group_rule']['port_range_max'] = None - self.neutronclient.create_security_group_rule(body) - - # Allow SSH traffic - body['security_group_rule']['protocol'] = 'tcp' - body['security_group_rule']['port_range_min'] = 22 - body['security_group_rule']['port_range_max'] = 22 - self.neutronclient.create_security_group_rule(body) - - # Allow TCP/UDP traffic for perf tools like iperf/nuttcp - # 5001: Data traffic (standard iperf data port) - # 5002: Control traffic (non standard) - # note that 5000/tcp is already picked by openstack keystone - body['security_group_rule']['protocol'] = 'tcp' - body['security_group_rule']['port_range_min'] = 5001 - body['security_group_rule']['port_range_max'] = 5002 - self.neutronclient.create_security_group_rule(body) - body['security_group_rule']['protocol'] = 'udp' - self.neutronclient.create_security_group_rule(body) diff --git a/nfvbench/config_plugin.py b/nfvbench/config_plugin.py index f6654eb..a6759cd 100644 --- a/nfvbench/config_plugin.py +++ b/nfvbench/config_plugin.py @@ -13,44 +13,51 @@ # License for the specific language governing permissions and limitations # under the License. # +"""Configuration Plugin. +This module is used to override the configuration with platform specific constraints and extensions +""" import abc import specs class ConfigPluginBase(object): - """Base class for config plugins. Need to implement public interfaces.""" + """Base class for config plugins.""" + __metaclass__ = abc.ABCMeta class InitializationFailure(Exception): + """Used in case of any init failure.""" + pass def __init__(self, config): + """Save configuration.""" if not config: raise ConfigPluginBase.InitializationFailure( 'Initialization parameters need to be assigned.') - self.config = config @abc.abstractmethod def get_config(self): - """Returns updated default configuration file.""" + """Return updated default configuration file.""" def set_config(self, config): - """This method is called when the config has changed after this instance was initialized. + """Set a new configuration. - This is needed in teh frequent case where the main config is changed in a copy and to + This method is called when the config has changed after this instance was initialized. + This is needed in the frequent case where the main config is changed in a copy and to prevent this instance to keep pointing to the old copy of the config """ self.config = config @abc.abstractmethod def get_openstack_spec(self): - """Returns OpenStack specs for host.""" + """Return OpenStack specs for host.""" @abc.abstractmethod def get_run_spec(self, config, openstack_spec): - """Returns RunSpec for given platform.""" + """Return RunSpec for given platform.""" @abc.abstractmethod def validate_config(self, cfg, openstack_spec): @@ -58,19 +65,22 @@ class ConfigPluginBase(object): @abc.abstractmethod def prepare_results_config(self, cfg): - """This function is called before running configuration is copied. + """Insert any plugin specific information to the results. + + This function is called before running configuration is copied. Example usage is to remove sensitive information like switch credentials. """ @abc.abstractmethod def get_version(self): - """Returns platform version.""" + """Return platform version.""" class ConfigPlugin(ConfigPluginBase): """No-op config plugin class. Does not change anything.""" def __init__(self, config): + """Invoke the base class constructor.""" ConfigPluginBase.__init__(self, config) def get_config(self): @@ -78,18 +88,21 @@ class ConfigPlugin(ConfigPluginBase): return self.config def get_openstack_spec(self): - """Returns OpenStack specs for host.""" + """Return OpenStack specs for host.""" return specs.OpenStackSpec() def get_run_spec(self, config, openstack_spec): - """Returns RunSpec for given platform.""" + """Return RunSpec for given platform.""" return specs.RunSpec(config.no_vswitch_access, openstack_spec) def validate_config(self, config, openstack_spec): + """Nothing to validate by default.""" pass def prepare_results_config(self, cfg): + """Nothing to add the results by default.""" return cfg def get_version(self): + """Return an empty version.""" return {} diff --git a/nfvbench/factory.py b/nfvbench/factory.py index 1461036..cad5a43 100644 --- a/nfvbench/factory.py +++ b/nfvbench/factory.py @@ -13,57 +13,19 @@ # License for the specific language governing permissions and limitations # under the License. # +"""Factory for creating worker and config plugin instances.""" -from chain_clients import EXTStageClient -from chain_clients import PVPStageClient -from chain_clients import PVVPStageClient -from chain_managers import EXTStatsManager -from chain_managers import PVPStatsManager -from chain_managers import PVVPStatsManager import chain_workers as workers from config_plugin import ConfigPlugin -from specs import ChainType -import tor_client class BasicFactory(object): - chain_classes = [ChainType.EXT, ChainType.PVP, ChainType.PVVP] - - chain_stats_classes = { - ChainType.EXT: EXTStatsManager, - ChainType.PVP: PVPStatsManager, - ChainType.PVVP: PVVPStatsManager, - } - - stage_clients_classes = { - ChainType.EXT: EXTStageClient, - ChainType.PVP: PVPStageClient, - ChainType.PVVP: PVVPStageClient, - } - - def get_stats_class(self, service_chain): - CLASS = self.chain_stats_classes.get(service_chain, None) - if CLASS is None: - raise Exception("Service chain '{}' not supported.".format(service_chain)) - - return CLASS - - def get_stage_class(self, service_chain): - CLASS = self.stage_clients_classes.get(service_chain, None) - if CLASS is None: - raise Exception("VM Client for chain '{}' not supported.".format(service_chain)) - - return CLASS + """Basic factory class to be overridden for advanced customization.""" def get_chain_worker(self, encaps, service_chain): + """Get a chain worker based on encaps and service chain type.""" return workers.BasicWorker - def get_tor_class(self, tor_type, no_tor_access): - if no_tor_access or not tor_type: - # if no TOR access is required, use basic no-op client - tor_type = 'BasicTORClient' - - return getattr(tor_client, tor_type) - def get_config_plugin_class(self): + """Get a config plugin.""" return ConfigPlugin diff --git a/nfvbench/network.py b/nfvbench/network.py deleted file mode 100644 index 6c02f04..0000000 --- a/nfvbench/network.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - - -class Interface(object): - """A class to hold the RX and TX counters for a virtual or physical interface.""" - - def __init__(self, name, device, tx_packets, rx_packets): - """Create a new interface instance.""" - self.name = name - self.device = device - self.packets = { - 'tx': tx_packets, - 'rx': rx_packets - } - - def set_packets(self, tx, rx): - """Set tx and rx counters for this interface.""" - self.packets = { - 'tx': tx, - 'rx': rx - } - - def set_packets_diff(self, tx, rx): - """Subtract current counters from new set of counters and update with results.""" - self.packets = { - 'tx': tx - self.packets['tx'], - 'rx': rx - self.packets['rx'], - } - - def is_no_op(self): - """Check if this interface is a no-opn interface.""" - return self.name is None - - def get_packet_count(self, traffic_type): - """Get packet count for given direction.""" - return self.packets.get(traffic_type, 0) - - @staticmethod - def no_op(): - """Return an interface that doe snot pass any traffic.""" - return Interface(None, None, 0, 0) - - -class Network(object): - """This class holds all interfaces that make up a logical neutron network. - - A loopback packet path has exactly 2 networks. - The first interface is always one of the 2 traffic gen interface. - Subsequent interfaces are sorted along the path from the TG to the loopback point - which could be interfaces in a switch, a vswitch or a VM. - """ - - def __init__(self, interfaces=None, reverse=False): - """Create a network with initial interface list and direction. - - :param interfaces: initial interface list - :param reverse: specifies the order of interfaces returned by get_interfaces - """ - if interfaces is None: - interfaces = [] - self.interfaces = interfaces - self.reverse = reverse - - def add_interface(self, interface): - """Add one more interface to this network. - - Order if important as interfaces must be added from traffic generator ports towards then - looping back device. - """ - self.interfaces.append(interface) - - def get_interfaces(self): - """Get interfaces associated to this network. - - Returned interface list is ordered from traffic generator port towards looping device if - reverse is false. Else returms the list in the reverse order. - """ - return self.interfaces[::-1] if self.reverse else self.interfaces diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py index e0b5786..581206e 100644 --- a/nfvbench/nfvbench.py +++ b/nfvbench/nfvbench.py @@ -15,7 +15,6 @@ # import argparse -from collections import defaultdict import copy import datetime import importlib @@ -34,7 +33,6 @@ from cleanup import Cleaner from config import config_load from config import config_loads import credentials as credentials -from factory import BasicFactory from fluentd import FluentLogHandler import log from log import LOG @@ -42,7 +40,6 @@ from nfvbenchd import WebSocketIoServer from specs import ChainType from specs import Specs from summarizer import NFVBenchSummarizer -from traffic_client import TrafficGeneratorFactory import utils fluent_logger = None @@ -55,7 +52,9 @@ class NFVBench(object): STATUS_ERROR = 'ERROR' def __init__(self, config, openstack_spec, config_plugin, factory, notifier=None): + # the base config never changes for a given NFVbench instance self.base_config = config + # this is the running config, updated at every run() self.config = None self.config_plugin = config_plugin self.factory = factory @@ -65,19 +64,9 @@ class NFVBench(object): self.chain_runner = None self.specs = Specs() self.specs.set_openstack_spec(openstack_spec) - self.clients = defaultdict(lambda: None) self.vni_ports = [] sys.stdout.flush() - def setup(self): - self.specs.set_run_spec(self.config_plugin.get_run_spec(self.config, self.specs.openstack)) - self.chain_runner = ChainRunner(self.config, - self.clients, - self.cred, - self.specs, - self.factory, - self.notifier) - def set_notifier(self, notifier): self.notifier = notifier @@ -91,8 +80,15 @@ class NFVBench(object): fluent_logger.start_new_run() LOG.info(args) try: - self.update_config(opts) - self.setup() + # recalc the running config based on the base config and options for this run + self._update_config(opts) + self.specs.set_run_spec(self.config_plugin.get_run_spec(self.config, + self.specs.openstack)) + self.chain_runner = ChainRunner(self.config, + self.cred, + self.specs, + self.factory, + self.notifier) new_frame_sizes = [] min_packet_size = "68" if self.config.vlan_tagging else "64" for frame_size in self.config.frame_sizes: @@ -132,7 +128,7 @@ class NFVBench(object): self.chain_runner.close() if status == NFVBench.STATUS_OK: - result = utils.dict_to_json_dict(result) + # result2 = utils.dict_to_json_dict(result) return { 'status': status, 'result': result @@ -158,98 +154,65 @@ class NFVBench(object): self.config.flow_count, self.config.frame_sizes) - def update_config(self, opts): + def _update_config(self, opts): + """Recalculate the running config based on the base config and opts. + + Sanity check on the config is done here as well. + """ self.config = AttrDict(dict(self.base_config)) self.config.update(opts) - - self.config.service_chain = self.config.service_chain.upper() - self.config.service_chain_count = int(self.config.service_chain_count) - self.config.flow_count = utils.parse_flow_count(self.config.flow_count) - required_flow_count = self.config.service_chain_count * 2 - if self.config.flow_count < required_flow_count: + config = self.config + + config.service_chain = config.service_chain.upper() + config.service_chain_count = int(config.service_chain_count) + if config.l2_loopback: + # force the number of chains to be 1 in case of l2 loopback + config.service_chain_count = 1 + config.service_chain = ChainType.EXT + config.no_arp = True + LOG.info('Running L2 loopback: using EXT chain/no ARP') + config.flow_count = utils.parse_flow_count(config.flow_count) + required_flow_count = config.service_chain_count * 2 + if config.flow_count < required_flow_count: LOG.info("Flow count %d has been set to minimum value of '%d' " - "for current configuration", self.config.flow_count, + "for current configuration", config.flow_count, required_flow_count) - self.config.flow_count = required_flow_count - - if self.config.flow_count % 2 != 0: - self.config.flow_count += 1 - - self.config.duration_sec = float(self.config.duration_sec) - self.config.interval_sec = float(self.config.interval_sec) - self.config.pause_sec = float(self.config.pause_sec) - - # Get traffic generator profile config - if not self.config.generator_profile: - self.config.generator_profile = self.config.traffic_generator.default_profile - - generator_factory = TrafficGeneratorFactory(self.config) - self.config.generator_config = \ - generator_factory.get_generator_config(self.config.generator_profile) - - # Check length of mac_addrs_left/right for serivce_chain EXT with no_arp - if self.config.service_chain == ChainType.EXT and self.config.no_arp: - if not (self.config.generator_config.mac_addrs_left is None and - self.config.generator_config.mac_addrs_right is None): - if (self.config.generator_config.mac_addrs_left is None or - self.config.generator_config.mac_addrs_right is None): - raise Exception("mac_addrs_left and mac_addrs_right must either " - "both be None or have a number of entries matching " - "service_chain_count") - if not (len(self.config.generator_config.mac_addrs_left) == - self.config.service_chain_count and - len(self.config.generator_config.mac_addrs_right) == - self.config.service_chain_count): - raise Exception("length of mac_addrs_left ({a}) and/or mac_addrs_right ({b}) " - "does not match service_chain_count ({c})" - .format(a=len(self.config.generator_config.mac_addrs_left), - b=len(self.config.generator_config.mac_addrs_right), - c=self.config.service_chain_count)) - - if not any(self.config.generator_config.pcis): - raise Exception("PCI addresses configuration for selected traffic generator profile " - "({tg_profile}) are missing. Please specify them in configuration file." - .format(tg_profile=self.config.generator_profile)) - - if self.config.traffic is None or not self.config.traffic: - raise Exception("No traffic profile found in traffic configuration, " - "please fill 'traffic' section in configuration file.") - - if isinstance(self.config.traffic, tuple): - self.config.traffic = self.config.traffic[0] - - self.config.frame_sizes = generator_factory.get_frame_sizes(self.config.traffic.profile) - - self.config.ipv6_mode = False - self.config.no_dhcp = True - self.config.same_network_only = True - if self.config.openrc_file: - self.config.openrc_file = os.path.expanduser(self.config.openrc_file) - - self.config.ndr_run = (not self.config.no_traffic and - 'ndr' in self.config.rate.strip().lower().split('_')) - self.config.pdr_run = (not self.config.no_traffic and - 'pdr' in self.config.rate.strip().lower().split('_')) - self.config.single_run = (not self.config.no_traffic and - not (self.config.ndr_run or self.config.pdr_run)) - - if self.config.vlans and len(self.config.vlans) != 2: - raise Exception('Number of configured VLAN IDs for VLAN tagging must be exactly 2.') - - self.config.json_file = self.config.json if self.config.json else None - if self.config.json_file: - (path, _filename) = os.path.split(self.config.json) + config.flow_count = required_flow_count + + if config.flow_count % 2: + config.flow_count += 1 + + config.duration_sec = float(config.duration_sec) + config.interval_sec = float(config.interval_sec) + config.pause_sec = float(config.pause_sec) + + if config.traffic is None or not config.traffic: + raise Exception("Missing traffic property in configuration") + + if config.openrc_file: + config.openrc_file = os.path.expanduser(config.openrc_file) + + config.ndr_run = (not config.no_traffic and + 'ndr' in config.rate.strip().lower().split('_')) + config.pdr_run = (not config.no_traffic and + 'pdr' in config.rate.strip().lower().split('_')) + config.single_run = (not config.no_traffic and + not (config.ndr_run or config.pdr_run)) + + config.json_file = config.json if config.json else None + if config.json_file: + (path, _filename) = os.path.split(config.json) if not os.path.exists(path): raise Exception('Please provide existing path for storing results in JSON file. ' 'Path used: {path}'.format(path=path)) - self.config.std_json_path = self.config.std_json if self.config.std_json else None - if self.config.std_json_path: - if not os.path.exists(self.config.std_json): + config.std_json_path = config.std_json if config.std_json else None + if config.std_json_path: + if not os.path.exists(config.std_json): raise Exception('Please provide existing path for storing results in JSON file. ' - 'Path used: {path}'.format(path=self.config.std_json_path)) + 'Path used: {path}'.format(path=config.std_json_path)) - self.config_plugin.validate_config(self.config, self.specs.openstack) + self.config_plugin.validate_config(config, self.specs.openstack) def parse_opts_from_cli(): @@ -284,7 +247,7 @@ def parse_opts_from_cli(): help='Port on which server will be listening (default 7555)') parser.add_argument('-sc', '--service-chain', dest='service_chain', - choices=BasicFactory.chain_classes, + choices=ChainType.names, action='store', help='Service chain to run') @@ -349,21 +312,6 @@ def parse_opts_from_cli(): help='Do not use ARP to find MAC addresses, ' 'instead use values in config file') - parser.add_argument('--no-reset', dest='no_reset', - default=None, - action='store_true', - help='Do not reset counters prior to running') - - parser.add_argument('--no-int-config', dest='no_int_config', - default=None, - action='store_true', - help='Skip interfaces config on EXT service chain') - - parser.add_argument('--no-tor-access', dest='no_tor_access', - default=None, - action='store_true', - help='Skip TOR switch configuration and retrieving of stats') - parser.add_argument('--no-vswitch-access', dest='no_vswitch_access', default=None, action='store_true', @@ -572,8 +520,6 @@ def main(): config.service_chain_count = opts.service_chain_count if opts.no_vswitch_access: config.no_vswitch_access = opts.no_vswitch_access - if opts.no_int_config: - config.no_int_config = opts.no_int_config # port to port loopback (direct or through switch) if opts.l2_loopback: @@ -585,8 +531,6 @@ def main(): LOG.info('Disabling ARP') config.no_arp = True config.vlans = [int(opts.l2_loopback), int(opts.l2_loopback)] - # disable any form of interface config since we loop at the switch level - config.no_int_config = True LOG.info('Running L2 loopback: using EXT chain/no ARP') if opts.use_sriov_middle_net: diff --git a/nfvbench/packet_analyzer.py b/nfvbench/packet_analyzer.py deleted file mode 100644 index 5d72bc9..0000000 --- a/nfvbench/packet_analyzer.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from collections import OrderedDict -from log import LOG - - -class PacketAnalyzer(object): - """Analyze packet drop counter in a chain""" - - def __init__(self): - self.last_packet_count = 0 - self.chain = [] - - def record(self, interface, traffic_type): - """Records the counter of the next interface with the corresponding traffic type""" - if interface.is_no_op(): - return - packet_count = interface.get_packet_count(traffic_type) - packet_drop_count = self.last_packet_count - packet_count - path_data = OrderedDict() - path_data['interface'] = interface.name - path_data['device'] = interface.device - path_data['packet_count'] = packet_count - - if self.chain: - path_data['packet_drop_count'] = packet_drop_count - - self.chain.append(path_data) - self.last_packet_count = packet_count - - def get_analysis(self): - """Gets the analysis of packet drops""" - transmitted_packets = self.chain[0]['packet_count'] - - for (index, path_data) in enumerate(self.chain): - LOG.info('[Packet Analyze] Interface: %s', path_data['interface']) - LOG.info('[Packet Analyze] > Count: %d', path_data['packet_count']) - - if index: - if transmitted_packets: - self.chain[index]['packet_drop_percentage'] = \ - 100.0 * path_data['packet_drop_count'] / transmitted_packets - else: - self.chain[index]['packet_drop_percentage'] = float('nan') - LOG.info('[Packet Analyze] > Packet Drops: %d', - path_data['packet_drop_count']) - LOG.info('[Packet Analyze] > Percentage: %s', - path_data['packet_drop_percentage']) - - return self.chain diff --git a/nfvbench/packet_stats.py b/nfvbench/packet_stats.py new file mode 100644 index 0000000..16dc965 --- /dev/null +++ b/nfvbench/packet_stats.py @@ -0,0 +1,309 @@ +# Copyright 2018 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +"""Manage all classes related to counting packet stats. + +InterfaceStats counts RX/TX packet counters for one interface. +PacketPathStats manages all InterfaceStats instances for a given chain. +PacketPathStatsManager manages all packet path stats for all chains. +""" + +import copy + +from traffic_gen.traffic_base import Latency + +class InterfaceStats(object): + """A class to hold the RX and TX counters for a virtual or physical interface. + + An interface stats instance can represent a real interface (e.g. traffic gen port or + vhost interface) or can represent an aggegation of multiple interfaces when packets + are faned out (e.g. one vlan subinterface can fan out to multiple vhost interfaces + in the case of multi-chaining and when the network is shared across chains). + """ + + TX = 0 + RX = 1 + + def __init__(self, name, device, shared=False): + """Create a new interface instance. + + name: interface name specific to each chain (e.g. "trex port 0 chain 0") + device: on which device this interface resides (e.g. "trex server") + fetch_tx_rx: a fetch method that takes name, chain_index and returns a (tx, rx) tuple + shared: if true this interface stats is shared across all chains + """ + self.name = name + self.device = device + self.shared = shared + # RX and TX counters for this interface + self.tx = 0 + self.rx = 0 + + def get_packet_count(self, direction): + """Get packet count for given direction. + + direction: InterfaceStats.TX or InterfaceStats.RX + """ + return self.tx if direction == InterfaceStats.TX else self.rx + + @staticmethod + def get_reverse_direction(direction): + """Get the reverse direction of a given direction. + + direction: InterfaceStats.TX or InterfaceStats.RX + return: RX if TX given, or TX is RX given + """ + return 1 - direction + + @staticmethod + def get_direction_name(direction): + """Get the rdisplay name of a given direction. + + direction: InterfaceStats.TX or InterfaceStats.RX + return: "TX" or "RX" + """ + if direction == InterfaceStats.TX: + return 'TX' + return 'RX' + + def add_if_stats(self, if_stats): + """Add another ifstats to this instance.""" + self.tx += if_stats.tx + self.rx += if_stats.rx + + def update_stats(self, tx, rx, diff): + """Update stats for this interface. + + tx: new TX packet count + rx: new RX packet count + diff: if True, perform a diff of new value with previous baselined value, + otherwise store the new value + """ + if diff: + self.tx = tx - self.tx + self.rx = rx - self.rx + else: + self.tx = tx + self.rx = rx + + def get_display_name(self, dir, name=None, aggregate=False): + """Get the name to use to display stats for this interface stats. + + dir: direction InterfaceStats.TX or InterfaceStats.RX + name: override self.name + aggregate: true if this is for an aggregate of multiple chains + """ + if name is None: + name = self.name + return self.device + '.' + InterfaceStats.get_direction_name(dir) + '.' + name + + +class PacketPathStats(object): + """Manage the packet path stats for 1 chain in both directions. + + A packet path stats instance manages an ordered list of InterfaceStats objects + that can be traversed in the forward and reverse direction to display packet + counters in each direction. + The requirement is that RX and TX counters must always alternate as we travel + along one direction. For example with 4 interfaces per chain: + [ifstat0, ifstat1, ifstat2, ifstat3] + Packet counters in the forward direction are: + [ifstat0.TX, ifstat1.RX, ifstat2.TX, ifstat3.RX] + Packet counters in the reverse direction are: + [ifstat3.TX, ifstat2.RX, ifstat1.TX, ifstat0.RX] + + A packet path stats also carries the latency data for each direction of the + chain. + """ + + def __init__(self, if_stats, aggregate=False): + """Create a packet path stats intance with the list of associated if stats. + + if_stats: a list of interface stats that compose this packet path stats + aggregate: True if this is an aggregate packet path stats + + Aggregate packet path stats are the only one that should show counters for shared + interface stats + """ + self.if_stats = if_stats + # latency for packets sent from port 0 and 1 + self.latencies = [Latency(), Latency()] + self.aggregate = aggregate + + + def add_packet_path_stats(self, pps): + """Add another packet path stat to this instance. + + pps: the other packet path stats to add to this instance + + This is used only for aggregating/collapsing multiple pps into 1 + to form a "total" pps + """ + for index, ifstats in enumerate(self.if_stats): + # shared interface stats must not be self added + if not ifstats.shared: + ifstats.add_if_stats(pps.if_stats[index]) + + @staticmethod + def get_agg_packet_path_stats(pps_list): + """Get the aggregated packet path stats from a list of packet path stats. + + Interface counters are added, latency stats are updated. + """ + agg_pps = None + for pps in pps_list: + if agg_pps is None: + # Get a clone of the first in the list + agg_pps = PacketPathStats(pps.get_cloned_if_stats(), aggregate=True) + else: + agg_pps.add_packet_path_stats(pps) + # aggregate all latencies + agg_pps.latencies = [Latency([pps.latencies[port] for pps in pps_list]) + for port in [0, 1]] + return agg_pps + + def get_if_stats(self, reverse=False): + """Get interface stats for given direction. + + reverse: if True, get the list of interface stats in the reverse direction + else (default) gets the ist in the forward direction. + return: the list of interface stats indexed by the chain index + """ + return self.if_stats[::-1] if reverse else self.if_stats + + def get_cloned_if_stats(self): + """Get a clone copy of the interface stats list.""" + return [copy.copy(ifstat) for ifstat in self.if_stats] + + + def get_header_labels(self, reverse=False, aggregate=False): + """Get the list of header labels for this packet path stats.""" + labels = [] + dir = InterfaceStats.TX + for ifstat in self.get_if_stats(reverse): + # starts at TX then RX then TX again etc... + labels.append(ifstat.get_display_name(dir, aggregate=aggregate)) + dir = InterfaceStats.get_reverse_direction(dir) + return labels + + def get_stats(self, reverse=False): + """Get the list of packet counters and latency data for this packet path stats. + + return: a dict of packet counters and latency stats + + {'packets': [2000054, 1999996, 1999996], + 'min_usec': 10, 'max_usec': 187, 'avg_usec': 45}, + """ + counters = [] + dir = InterfaceStats.TX + for ifstat in self.get_if_stats(reverse): + # starts at TX then RX then TX again etc... + if ifstat.shared and not self.aggregate: + # shared if stats countesr are only shown in aggregate pps + counters.append('') + else: + counters.append(ifstat.get_packet_count(dir)) + dir = InterfaceStats.get_reverse_direction(dir) + + # latency: use port 0 latency for forward, port 1 latency for reverse + latency = self.latencies[1] if reverse else self.latencies[0] + + if latency.available(): + results = {'lat_min_usec': latency.min_usec, + 'lat_max_usec': latency.max_usec, + 'lat_avg_usec': latency.avg_usec} + else: + results = {} + results['packets'] = counters + return results + + +class PacketPathStatsManager(object): + """Manages all the packet path stats for all chains. + + Each run will generate packet path stats for 1 or more chains. + """ + + def __init__(self, pps_list): + """Create a packet path stats intance with the list of associated if stats. + + pps_list: a list of packet path stats indexed by the chain id. + All packet path stats must have the same length. + """ + self.pps_list = pps_list + + def insert_pps_list(self, chain_index, if_stats): + """Insert a list of interface stats for given chain right after the first in the list. + + chain_index: index of chain where to insert + if_stats: list of interface stats to insert + """ + # use slicing to insert the list + self.pps_list[chain_index].if_stats[1:1] = if_stats + + def _get_if_agg_name(self, reverse): + """Get the aggegated name for all interface stats across all pps. + + return: a list of aggregated names for each position of the chain for all chains + + The agregated name is the interface stats name if there is only 1 chain. + Otherwise it is the common prefix for all interface stats names at same position in the + chain. + """ + # if there is only one chain, use the if_stats names directly + return self.pps_list[0].get_header_labels(reverse, aggregate=(len(self.pps_list) > 1)) + + def _get_results(self, reverse=False): + """Get the digested stats for the forward or reverse directions. + + return: a dict with all the labels, total and per chain counters + """ + chains = {} + # insert the aggregated row if applicable + if len(self.pps_list) > 1: + agg_pps = PacketPathStats.get_agg_packet_path_stats(self.pps_list) + chains['total'] = agg_pps.get_stats(reverse) + + for index, pps in enumerate(self.pps_list): + chains[index] = pps.get_stats(reverse) + return {'interfaces': self._get_if_agg_name(reverse), + 'chains': chains} + + def get_results(self): + """Get the digested stats for the forward and reverse directions. + + return: a dictionary of results for each direction and each chain + + Example: + + { + 'Forward': { + 'interfaces': ['Port0', 'vhost0', 'Port1'], + 'chains': { + 0: {'packets': [2000054, 1999996, 1999996], + 'min_usec': 10, + 'max_usec': 187, + 'avg_usec': 45}, + 1: {...}, + 'total': {...} + } + }, + 'Reverse': {... + } + } + + """ + results = {'Forward': self._get_results(), + 'Reverse': self._get_results(reverse=True)} + return results diff --git a/nfvbench/service_chain.py b/nfvbench/service_chain.py deleted file mode 100644 index 7ec1511..0000000 --- a/nfvbench/service_chain.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from collections import OrderedDict -import time - -from chain_managers import StageManager -from log import LOG -from specs import ChainType - - -class ServiceChain(object): - - def __init__(self, config, clients, cred, specs, factory, notifier=None): - self.config = config - self.clients = clients - self.cred = cred - self.specs = specs - self.factory = factory - self.notifier = notifier - self.chain_name = self.config.service_chain - self.vlans = None - self.stage_manager = None - self.stats_manager = None - LOG.info('ServiceChain initialized.') - - def __set_helpers(self): - self.stage_manager = StageManager(self.config, self.cred, self.factory) - self.clients['vm'] = self.stage_manager - self.vlans = self.stage_manager.get_vlans() - - STATS_CLASS = self.factory.get_stats_class(self.config.service_chain) - self.stats_manager = STATS_CLASS(self.config, - self.clients, - self.specs, - self.factory, - self.vlans, - self.notifier) - - def __set_vlan_tags(self): - if self.config.vlan_tagging: - # override with user-specified vlans if configured - vlans = self.config.vlans if self.config.vlans else self.vlans[:2] - for vlan, device in zip(vlans, self.config.generator_config.devices): - self.stats_manager.set_vlan_tag(device, vlan) - - def __get_result_per_frame_size(self, frame_size, actual_frame_size, bidirectional): - start_time = time.time() - traffic_result = { - frame_size: {} - } - result = {} - if not self.config.no_traffic: - self.clients['traffic'].set_traffic(actual_frame_size, bidirectional) - - if self.config.single_run: - result = self.stats_manager.run() - else: - results = self.clients['traffic'].get_ndr_and_pdr() - - for dr in ['pdr', 'ndr']: - if dr in results: - if frame_size != actual_frame_size: - results[dr]['l2frame_size'] = frame_size - results[dr]['actual_l2frame_size'] = actual_frame_size - traffic_result[frame_size][dr] = results[dr] - if 'warning' in results[dr]['stats'] and results[dr]['stats']['warning']: - traffic_result['warning'] = results[dr]['stats']['warning'] - traffic_result[frame_size]['iteration_stats'] = results['iteration_stats'] - - result['analysis_duration_sec'] = time.time() - start_time - if self.config.single_run: - result['run_config'] = self.clients['traffic'].get_run_config(result) - required = result['run_config']['direction-total']['orig']['rate_pps'] - actual = result['stats']['total_tx_rate'] - if frame_size != actual_frame_size: - result['actual_l2frame_size'] = actual_frame_size - warning = self.clients['traffic'].compare_tx_rates(required, actual) - if warning is not None: - result['run_config']['warning'] = warning - - traffic_result[frame_size].update(result) - return traffic_result - - def __get_chain_result(self): - result = OrderedDict() - for fs, actual_fs in zip(self.config.frame_sizes, self.config.actual_frame_sizes): - result.update(self.__get_result_per_frame_size(fs, - actual_fs, - self.config.traffic.bidirectional)) - - chain_result = { - 'flow_count': self.config.flow_count, - 'service_chain_count': self.config.service_chain_count, - 'bidirectional': self.config.traffic.bidirectional, - 'profile': self.config.traffic.profile, - 'compute_nodes': self.stats_manager.get_compute_nodes_bios(), - 'result': result - } - - return chain_result - - def __setup_traffic(self): - self.clients['traffic'].setup() - if not self.config.no_traffic: - if self.config.service_chain == ChainType.EXT and not self.config.no_arp: - self.clients['traffic'].ensure_arp_successful() - self.clients['traffic'].ensure_end_to_end() - - def run(self): - LOG.info('Starting %s chain...', self.chain_name) - LOG.info('Dry run: %s', self.config.no_traffic) - results = {} - - self.__set_helpers() - self.__set_vlan_tags() - self.stage_manager.set_vm_macs() - self.__setup_traffic() - results[self.chain_name] = {'result': self.__get_chain_result()} - - if self.config.service_chain == ChainType.PVVP: - results[self.chain_name]['mode'] = 'inter-node' \ - if self.config.inter_node else 'intra-node' - - LOG.info("Service chain '%s' run completed.", self.chain_name) - return results - - def get_version(self): - return self.stats_manager.get_version() - - def close(self): - if self.stage_manager: - self.stage_manager.close() - if self.stats_manager: - self.stats_manager.close() diff --git a/nfvbench/specs.py b/nfvbench/specs.py index a84a55f..75fe703 100644 --- a/nfvbench/specs.py +++ b/nfvbench/specs.py @@ -17,11 +17,12 @@ class Encaps(object): VLAN = "VLAN" VxLAN = "VxLAN" - BASIC = "BASIC" + NO_ENCAPS = "NONE" encaps_mapping = { 'VLAN': VLAN, - 'VXLAN': VxLAN + 'VXLAN': VxLAN, + 'NONE': NO_ENCAPS } @classmethod @@ -33,22 +34,13 @@ class ChainType(object): PVP = "PVP" PVVP = "PVVP" EXT = "EXT" - - chain_mapping = { - 'PVP': PVP, - 'PVVP': PVVP, - 'EXT': EXT - } - - @classmethod - def get_chain_type(cls, chain): - return cls.chain_mapping.get(chain.upper(), None) + names = [EXT, PVP, PVVP] class OpenStackSpec(object): def __init__(self): self.__vswitch = "BASIC" - self.__encaps = Encaps.BASIC + self.__encaps = Encaps.NO_ENCAPS @property def vswitch(self): diff --git a/nfvbench/stats_manager.py b/nfvbench/stats_manager.py new file mode 100644 index 0000000..a1fb497 --- /dev/null +++ b/nfvbench/stats_manager.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import time + +from log import LOG +from packet_stats import PacketPathStatsManager +from stats_collector import IntervalCollector + + +class StatsManager(object): + """A class to collect detailed stats and handle fixed rate runs for all chain types.""" + + def __init__(self, chain_runner): + self.chain_runner = chain_runner + self.config = chain_runner.config + self.traffic_client = chain_runner.traffic_client + self.specs = chain_runner.specs + self.notifier = chain_runner.notifier + self.interval_collector = None + self.factory = chain_runner.factory + # create a packet path stats manager for fixed rate runs only + if self.config.single_run: + pps_list = [] + self.traffic_client.insert_interface_stats(pps_list) + self.pps_mgr = PacketPathStatsManager(pps_list) + else: + self.pps_mgr = None + self.worker = None + + def create_worker(self): + """Create a worker to fetch custom data. + + This is done late as we need to know the dest MAC for all VNFs, which can happen + as late as after ARP discovery. + """ + if not self.worker and self.specs.openstack: + WORKER_CLASS = self.factory.get_chain_worker(self.specs.openstack.encaps, + self.config.service_chain) + self.worker = WORKER_CLASS(self) + + def _generate_traffic(self): + if self.config.no_traffic: + return {} + + self.interval_collector = IntervalCollector(time.time()) + self.interval_collector.attach_notifier(self.notifier) + LOG.info('Starting to generate traffic...') + stats = {} + for stats in self.traffic_client.run_traffic(): + self.interval_collector.add(stats) + + LOG.info('...traffic generating ended.') + return stats + + def get_stats(self): + return self.interval_collector.get() if self.interval_collector else [] + + def get_version(self): + return self.worker.get_version() if self.worker else {} + + def _update_interface_stats(self, diff=False): + """Update interface stats for both the traffic generator and the worker.""" + self.traffic_client.update_interface_stats(diff=True) + if self.worker: + self.worker.update_interface_stats(diff=True) + + def run_fixed_rate(self): + """Run a fixed rate and analyze results.""" + # Baseline the packet path stats + self._update_interface_stats() + + in_flight_stats = self._generate_traffic() + result = { + 'stats': in_flight_stats + } + # New analysis code with packet path stats + # Diff all interface stats and return packet path stats analysis + # Diff the packet path stats + self._update_interface_stats(diff=True) + result['packet_path_stats'] = self.pps_mgr.get_results() + return result + + def get_compute_nodes_bios(self): + return self.worker.get_compute_nodes_bios() if self.worker else {} + + def close(self): + if self.worker: + self.worker.close() diff --git a/nfvbench/summarizer.py b/nfvbench/summarizer.py index b27ed6f..4e70294 100644 --- a/nfvbench/summarizer.py +++ b/nfvbench/summarizer.py @@ -22,11 +22,60 @@ import bitmath import pytz from tabulate import tabulate -from specs import ChainType - +def _annotate_chain_stats(chain_stats, nodrop_marker='=>'): + """Transform a plain chain stats into an annotated one. + + Example: + { + 0: {'packets': [2000054, 1999996, 1999996, 1999996], + 'lat_min_usec': 10, + 'lat_max_usec': 187, + 'lat_avg_usec': 45}, + 1: {...}, + 'total': {...} + } + should become: + { + 0: {'packets': [2000054, -58 (-0.034%), '=>', 1999996], + 'lat_min_usec': 10, + 'lat_max_usec': 187, + 'lat_avg_usec': 45}, + 1: {...}, + 'total': {...} + } + + In the case of shared net, some columns in packets array can have '' + """ + for stats in chain_stats.values(): + packets = stats['packets'] + count = len(packets) + if count > 1: + # keep the first counter + annotated_packets = [packets[0]] + # modify all remaining counters + prev_count = packets[0] + for index in range(1, count): + cur_count = packets[index] + if cur_count == '': + # an empty string indicates an unknown counter for a shared interface + # do not annotate those + annotated_value = '' + else: + drop = cur_count - prev_count + if drop: + dr = (drop * 100.0) / prev_count if prev_count else 0 + annotated_value = '{:,} ({:.4f}%)'.format(drop, dr) + else: + # no drop + # if last column we display the value + annotated_value = cur_count if index == count - 1 else nodrop_marker + prev_count = cur_count + annotated_packets.append(annotated_value) + + stats['packets'] = annotated_packets class Formatter(object): - """Collection of string formatter methods""" + """Collection of string formatter methods.""" @staticmethod def fixed(data): @@ -83,7 +132,7 @@ class Formatter(object): class Table(object): - """ASCII readable table class""" + """ASCII readable table class.""" def __init__(self, header): header_row, self.formatters = zip(*header) @@ -108,7 +157,7 @@ class Table(object): class Summarizer(object): - """Generic summarizer class""" + """Generic summarizer class.""" indent_per_level = 2 @@ -164,7 +213,7 @@ class Summarizer(object): class NFVBenchSummarizer(Summarizer): - """Summarize nfvbench json result""" + """Summarize nfvbench json result.""" ndr_pdr_header = [ ('-', Formatter.fixed), @@ -195,21 +244,11 @@ class NFVBenchSummarizer(Summarizer): ('RX Rate (pps)', Formatter.suffix(' pps')) ] - chain_analysis_header = [ - ('Interface', Formatter.standard), - ('Device', Formatter.standard), - ('Packets (fwd)', Formatter.standard), - ('Drops (fwd)', Formatter.standard), - ('Drop% (fwd)', Formatter.percentage), - ('Packets (rev)', Formatter.standard), - ('Drops (rev)', Formatter.standard), - ('Drop% (rev)', Formatter.percentage) - ] - direction_keys = ['direction-forward', 'direction-reverse', 'direction-total'] direction_names = ['Forward', 'Reverse', 'Total'] def __init__(self, result, sender): + """Create a summarizer instance.""" Summarizer.__init__(self) self.result = result self.config = self.result['config'] @@ -247,13 +286,10 @@ class NFVBenchSummarizer(Summarizer): self._put('Components:') with self._create_block(): - self._put('TOR:') - with self._create_block(False): - self._put('Type:', self.config['tor']['type']) self._put('Traffic Generator:') with self._create_block(False): - self._put('Profile:', self.config['generator_config']['name']) - self._put('Tool:', self.config['generator_config']['tool']) + self._put('Profile:', self.config['tg-name']) + self._put('Tool:', self.config['tg-tool']) if network_benchmark['versions']: self._put('Versions:') with self._create_block(): @@ -274,9 +310,6 @@ class NFVBenchSummarizer(Summarizer): def __chain_summarize(self, chain_name, chain_benchmark): self._put(chain_name + ':') - if chain_name == ChainType.PVVP: - self._put('Mode:', chain_benchmark.get('mode')) - chain_name += "-" + chain_benchmark.get('mode') self.__record_header_put('service_chain', chain_name) with self._create_block(): self._put('Traffic:') @@ -323,11 +356,6 @@ class NFVBenchSummarizer(Summarizer): self._put('Actual l2 frame size:', analysis['ndr']['actual_l2frame_size']) elif self.config['pdr_run'] and 'actual_l2frame_size' in analysis['pdr']: self._put('Actual l2 frame size:', analysis['pdr']['actual_l2frame_size']) - if 'analysis_duration_sec' in analysis: - self._put('Chain analysis duration:', - Formatter.float(3)(analysis['analysis_duration_sec']), 'seconds') - self.__record_data_put(frame_size, {'chain_analysis_duration': Formatter.float(3)( - analysis['analysis_duration_sec'])}) if self.config['ndr_run']: self._put('NDR search duration:', Formatter.float(0)(analysis['ndr']['time_taken_sec']), 'seconds') @@ -350,12 +378,13 @@ class NFVBenchSummarizer(Summarizer): self._put(analysis['run_config']['warning']) self._put() - if 'packet_analysis' in analysis: - self._put('Chain Analysis:') - self._put() - with self._create_block(False): - self._put_table(self.__get_chain_analysis_table(analysis['packet_analysis'])) + if 'packet_path_stats' in analysis: + for dir in ['Forward', 'Reverse']: + self._put(dir + ' Chain Packet Counters and Latency:') self._put() + with self._create_block(False): + self._put_table(self._get_chain_table(analysis['packet_path_stats'][dir])) + self._put() def __get_summary_table(self, traffic_result): if self.config['single_run']: @@ -452,23 +481,42 @@ class NFVBenchSummarizer(Summarizer): }) return config_table - def __get_chain_analysis_table(self, packet_analysis): - chain_analysis_table = Table(self.chain_analysis_header) - forward_analysis = packet_analysis['direction-forward'] - reverse_analysis = packet_analysis['direction-reverse'] - reverse_analysis.reverse() - for fwd, rev in zip(forward_analysis, reverse_analysis): - chain_analysis_table.add_row([ - fwd['interface'], - fwd['device'], - fwd['packet_count'], - fwd.get('packet_drop_count', None), - fwd.get('packet_drop_percentage', None), - rev['packet_count'], - rev.get('packet_drop_count', None), - rev.get('packet_drop_percentage', None), - ]) - return chain_analysis_table + def _get_chain_table(self, chain_stats): + """Retrieve the table for a direction. + + chain_stats: { + 'interfaces': ['Port0', 'drop %'', 'vhost0', 'Port1'], + 'chains': { + 0: {'packets': [2000054, '-0.023%', 1999996, 1999996], + 'lat_min_usec': 10, + 'lat_max_usec': 187, + 'lat_avg_usec': 45}, + 1: {...}, + 'total': {...} + } + } + """ + chains = chain_stats['chains'] + _annotate_chain_stats(chains) + header = [('Chain', Formatter.standard)] + \ + [(ifname, Formatter.standard) for ifname in chain_stats['interfaces']] + # add latency columns if available Avg, Min, Max + lat_keys = [] + lat_map = {'lat_avg_usec': 'Avg lat.', + 'lat_min_usec': 'Min lat.', + 'lat_max_usec': 'Max lat.'} + if 'lat_avg_usec' in chains[0]: + lat_keys = ['lat_avg_usec', 'lat_min_usec', 'lat_max_usec'] + for key in lat_keys: + header.append((lat_map[key], Formatter.standard)) + + table = Table(header) + for chain in sorted(chains.keys()): + row = [chain] + chains[chain]['packets'] + for lat_key in lat_keys: + row.append('{:,} usec'.format(chains[chain][lat_key])) + table.add_row(row) + return table def __record_header_put(self, key, value): if self.sender: diff --git a/nfvbench/tor_client.py b/nfvbench/tor_client.py deleted file mode 100644 index c8214c8..0000000 --- a/nfvbench/tor_client.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -# Copyright 2016 Cisco Systems, Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - - -class TORClientException(Exception): - pass - - -class BasicTORClient(object): - - def __init__(self, config): - pass - - def get_int_counters(self): - return {} - - def get_vni_counters(self, vni): - return {} - - def get_vni_interface(self, vni, counters): - return None - - def get_vni_for_vlan(self, vlans): - return [] - - def attach_tg_interfaces(self, network_vlans, switch_ports): - pass - - def clear_nve(self): - pass - - def clear_interface(self, vni): - pass - - def close(self): - pass - - def get_version(self): - return {} diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py index ef68fe5..4414710 100755 --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -12,8 +12,9 @@ # License for the specific language governing permissions and limitations # under the License. +"""Interface to the traffic generator clients including NDR/PDR binary search.""" + from datetime import datetime -import re import socket import struct import time @@ -26,8 +27,8 @@ from trex_stl_lib.api import STLError # pylint: enable=import-error from log import LOG -from network import Interface -from specs import ChainType +from packet_stats import InterfaceStats +from packet_stats import PacketPathStats from stats_collector import IntervalCollector from stats_collector import IterationCollector import traffic_gen.traffic_utils as utils @@ -44,12 +45,16 @@ class TrafficRunner(object): """Serialize various steps required to run traffic.""" def __init__(self, client, duration_sec, interval_sec=0): + """Create a traffic runner.""" self.client = client self.start_time = None self.duration_sec = duration_sec self.interval_sec = interval_sec def run(self): + """Clear stats and instruct the traffic generator to start generating traffic.""" + if self.is_running(): + return None LOG.info('Running traffic generator') self.client.gen.clear_stats() self.client.gen.start_traffic() @@ -57,22 +62,29 @@ class TrafficRunner(object): return self.poll_stats() def stop(self): + """Stop the current run and instruct the traffic generator to stop traffic.""" if self.is_running(): self.start_time = None self.client.gen.stop_traffic() def is_running(self): + """Check if a run is still pending.""" return self.start_time is not None def time_elapsed(self): + """Return time elapsed since start of run.""" if self.is_running(): return time.time() - self.start_time return self.duration_sec def poll_stats(self): + """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary. + + return: latest stats or None if traffic is stopped + """ if not self.is_running(): return None - if self.client.skip_sleep: + if self.client.skip_sleep(): self.stop() return self.client.get_stats() time_elapsed = self.time_elapsed() @@ -96,6 +108,7 @@ class IpBlock(object): """Manage a block of IP addresses.""" def __init__(self, base_ip, step_ip, count_ip): + """Create an IP block.""" self.base_ip_int = Device.ip_to_int(base_ip) self.step = Device.ip_to_int(step_ip) self.max_available = count_ip @@ -104,7 +117,7 @@ class IpBlock(object): def get_ip(self, index=0): """Return the IP address at given index.""" if index < 0 or index >= self.max_available: - raise IndexError('Index out of bounds') + raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available)) return Device.int_to_ip(self.base_ip_int + index * self.step) def reserve_ip_range(self, count): @@ -120,69 +133,73 @@ class IpBlock(object): return (first_ip, last_ip) def reset_reservation(self): + """Reset all reservations and restart with a completely unused IP block.""" self.next_free = 0 class Device(object): - """Represent a port device and all information associated to it.""" - - def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None, - gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None, - gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None, - dst_mac=None, chain_count=1, flow_count=1, vlan_tagging=False): - self.chain_count = chain_count - self.flow_count = flow_count - self.dst = None + """Represent a port device and all information associated to it. + + In the curent version we only support 2 port devices for the traffic generator + identified as port 0 or port 1. + """ + + def __init__(self, port, generator_config, vtep_vlan=None): + """Create a new device for a given port.""" + self.generator_config = generator_config + self.chain_count = generator_config.service_chain_count + self.flow_count = generator_config.flow_count / 2 self.port = port - self.switch_port = switch_port + self.switch_port = generator_config.interfaces[port].get('switch_port', None) self.vtep_vlan = vtep_vlan - self.vlan_tag = None - self.vlan_tagging = vlan_tagging - self.pci = pci + self.pci = generator_config.interfaces[port].pci self.mac = None - self.dst_mac = dst_mac - self.vm_mac_list = None - subnet = IPNetwork(ip) + self.dest_macs = None + self.vlans = None + self.ip_addrs = generator_config.ip_addrs[port] + subnet = IPNetwork(self.ip_addrs) self.ip = subnet.ip.format() - self.ip_prefixlen = subnet.prefixlen - self.ip_addrs_step = ip_addrs_step - self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step - self.gateway_ip_addrs_step = gateway_ip_addrs_step - self.gateway_ip = gateway_ip - self.tg_gateway_ip = tg_gateway_ip - self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count) - self.gw_ip_block = IpBlock(gateway_ip, - gateway_ip_addrs_step, - chain_count) - self.tg_gw_ip_block = IpBlock(tg_gateway_ip, - tg_gateway_ip_addrs_step, - chain_count) - self.udp_src_port = udp_src_port - self.udp_dst_port = udp_dst_port + self.ip_addrs_step = generator_config.ip_addrs_step + self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count) + self.gw_ip_block = IpBlock(generator_config.gateway_ips[port], + generator_config.gateway_ip_addrs_step, + self.chain_count) + self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port] + self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs, + generator_config.tg_gateway_ip_addrs_step, + self.chain_count) + self.udp_src_port = generator_config.udp_src_port + self.udp_dst_port = generator_config.udp_dst_port def set_mac(self, mac): + """Set the local MAC for this port device.""" if mac is None: raise TrafficClientException('Trying to set traffic generator MAC address as None') self.mac = mac - LOG.info("Port %d: src MAC %s", self.port, self.mac) - def set_destination(self, dst): - self.dst = dst + def get_peer_device(self): + """Get the peer device (device 0 -> device 1, or device 1 -> device 0).""" + return self.generator_config.devices[1 - self.port] - def set_vm_mac_list(self, vm_mac_list): - self.vm_mac_list = map(str, vm_mac_list) + def set_dest_macs(self, dest_macs): + """Set the list of dest MACs indexed by the chain id.""" + self.dest_macs = map(str, dest_macs) - def set_vlan_tag(self, vlan_tag): - if self.vlan_tagging and vlan_tag is None: - raise TrafficClientException('Trying to set VLAN tag as None') - self.vlan_tag = vlan_tag - LOG.info("Port %d: VLAN %d", self.port, self.vlan_tag) + def set_vlans(self, vlans): + """Set the list of vlans to use indexed by the chain id.""" + self.vlans = vlans + LOG.info("Port %d: VLANs %s", self.port, self.vlans) def get_gw_ip(self, chain_index): """Retrieve the IP address assigned for the gateway of a given chain.""" return self.gw_ip_block.get_ip(chain_index) - def get_stream_configs(self, service_chain): + def get_stream_configs(self): + """Get the stream config for a given chain on this device. + + Called by the traffic generator driver to program the traffic generator properly + before generating traffic + """ configs = [] # exact flow count for each chain is calculated as follows: # - all chains except the first will have the same flow count @@ -191,24 +208,19 @@ class Device(object): # example 11 flows and 3 chains => 3, 4, 4 flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1) - + peer = self.get_peer_device() self.ip_block.reset_reservation() - self.dst.ip_block.reset_reservation() + peer.ip_block.reset_reservation() for chain_idx in xrange(self.chain_count): src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count) - dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count) - - dst_mac = self.dst_mac[chain_idx] if self.dst_mac is not None else self.dst.mac - if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", dst_mac.lower()): - raise TrafficClientException("Invalid MAC address '{mac}' specified in " - "mac_addrs_left/right".format(mac=dst_mac)) + dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count) + dest_mac = self.dest_macs[chain_idx] if self.dest_macs else peer.mac configs.append({ 'count': cur_chain_flow_count, 'mac_src': self.mac, - 'mac_dst': dst_mac if service_chain == ChainType.EXT else self.vm_mac_list[ - chain_idx], + 'mac_dst': dest_mac, 'ip_src_addr': src_ip_first, 'ip_src_addr_max': src_ip_last, 'ip_src_count': cur_chain_flow_count, @@ -220,216 +232,149 @@ class Device(object): 'udp_dst_port': self.udp_dst_port, 'mac_discovery_gw': self.get_gw_ip(chain_idx), 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx), - 'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx), - 'vlan_tag': self.vlan_tag if self.vlan_tagging else None + 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx), + 'vlan_tag': self.vlans[chain_idx] if self.vlans else None }) # after first chain, fall back to the flow count for all other chains cur_chain_flow_count = flows_per_chain - return configs - def ip_range_overlaps(self): - """Check if this device ip range is overlapping with the dst device ip range.""" - src_base_ip = Device.ip_to_int(self.ip) - dst_base_ip = Device.ip_to_int(self.dst.ip) - src_last_ip = src_base_ip + self.flow_count - 1 - dst_last_ip = dst_base_ip + self.flow_count - 1 - return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip - - @staticmethod - def mac_to_int(mac): - return int(mac.translate(None, ":.- "), 16) - - @staticmethod - def int_to_mac(i): - mac = format(i, 'x').zfill(12) - blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)] - return ':'.join(blocks) - @staticmethod def ip_to_int(addr): + """Convert an IP address from string to numeric.""" return struct.unpack("!I", socket.inet_aton(addr))[0] @staticmethod def int_to_ip(nvalue): + """Convert an IP address from numeric to string.""" return socket.inet_ntoa(struct.pack("!I", nvalue)) -class RunningTrafficProfile(object): +class GeneratorConfig(object): """Represents traffic configuration for currently running traffic profile.""" DEFAULT_IP_STEP = '0.0.0.1' DEFAULT_SRC_DST_IP_STEP = '0.0.0.1' - def __init__(self, config, generator_profile): - generator_config = self.__match_generator_profile(config.traffic_generator, - generator_profile) - self.generator_config = generator_config + def __init__(self, config): + """Create a generator config.""" + self.config = config + # name of the generator profile (normally trex or dummy) + # pick the default one if not specified explicitly from cli options + if not config.generator_profile: + config.generator_profile = config.traffic_generator.default_profile + # pick up the profile dict based on the name + gen_config = self.__match_generator_profile(config.traffic_generator, + config.generator_profile) + self.gen_config = gen_config + # copy over fields from the dict + self.tool = gen_config.tool + self.ip = gen_config.ip + self.cores = gen_config.get('cores', 1) + if gen_config.intf_speed: + # interface speed is overriden from config + self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits + else: + # interface speed is discovered/provided by the traffic generator + self.intf_speed = 0 + self.software_mode = gen_config.get('software_mode', False) + self.interfaces = gen_config.interfaces + if self.interfaces[0].port != 0 or self.interfaces[1].port != 1: + raise TrafficClientException('Invalid port order/id in generator_profile.interfaces') + self.service_chain = config.service_chain self.service_chain_count = config.service_chain_count self.flow_count = config.flow_count - self.host_name = generator_config.host_name - self.name = generator_config.name - self.tool = generator_config.tool - self.cores = generator_config.get('cores', 1) - self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP + self.host_name = gen_config.host_name + + self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs + self.ip_addrs = gen_config.ip_addrs + self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP self.tg_gateway_ip_addrs_step = \ - generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP - self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP - self.gateway_ips = generator_config.gateway_ip_addrs - self.ip = generator_config.ip - self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits + gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP + self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP + self.gateway_ips = gen_config.gateway_ip_addrs + self.udp_src_port = gen_config.udp_src_port + self.udp_dst_port = gen_config.udp_dst_port + self.devices = [Device(port, self) for port in [0, 1]] + # This should normally always be [0, 1] + self.ports = [device.port for device in self.devices] + + # check that pci is not empty + if not gen_config.interfaces[0].get('pci', None) or \ + not gen_config.interfaces[1].get('pci', None): + raise TrafficClientException("configuration interfaces pci fields cannot be empty") + + self.pcis = [tgif['pci'] for tgif in gen_config.interfaces] self.vlan_tagging = config.vlan_tagging - self.no_arp = config.no_arp - self.src_device = None - self.dst_device = None - self.vm_mac_list = None - self.mac_addrs_left = generator_config.mac_addrs_left - self.mac_addrs_right = generator_config.mac_addrs_right - self.__prep_interfaces(generator_config) + + # needed for result/summarizer + config['tg-name'] = gen_config.name + config['tg-tool'] = self.tool def to_json(self): - return dict(self.generator_config) + """Get json form to display the content into the overall result dict.""" + return dict(self.gen_config) + + def set_dest_macs(self, port_index, dest_macs): + """Set the list of dest MACs indexed by the chain id on given port. - def set_vm_mac_list(self, vm_mac_list): - self.src_device.set_vm_mac_list(vm_mac_list[0]) - self.dst_device.set_vm_mac_list(vm_mac_list[1]) + port_index: the port for which dest macs must be set + dest_macs: a list of dest MACs indexed by chain id + """ + if len(dest_macs) != self.config.service_chain_count: + raise TrafficClientException('Dest MAC list %s must have %d entries' % + (dest_macs, self.config.service_chain_count)) + self.devices[port_index].set_dest_macs(dest_macs) + LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs]) + + def set_vlans(self, port_index, vlans): + """Set the list of vlans to use indexed by the chain id on given port. + + port_index: the port for which VLANs must be set + vlans: a list of vlan lists indexed by chain id + """ + if len(vlans) != self.config.service_chain_count: + raise TrafficClientException('VLAN list %s must have %d entries' % + (vlans, self.config.service_chain_count)) + self.devices[port_index].set_vlans(vlans) @staticmethod def __match_generator_profile(traffic_generator, generator_profile): - generator_config = AttrDict(traffic_generator) - generator_config.pop('default_profile') - generator_config.pop('generator_profile') + gen_config = AttrDict(traffic_generator) + gen_config.pop('default_profile') + gen_config.pop('generator_profile') matching_profile = [profile for profile in traffic_generator.generator_profile if profile.name == generator_profile] if len(matching_profile) != 1: raise Exception('Traffic generator profile not found: ' + generator_profile) - generator_config.update(matching_profile[0]) - - return generator_config - - def __prep_interfaces(self, generator_config): - src_config = { - 'chain_count': self.service_chain_count, - 'flow_count': self.flow_count / 2, - 'ip': generator_config.ip_addrs[0], - 'ip_addrs_step': self.ip_addrs_step, - 'gateway_ip': self.gateway_ips[0], - 'gateway_ip_addrs_step': self.gateway_ip_addrs_step, - 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0], - 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, - 'udp_src_port': generator_config.udp_src_port, - 'udp_dst_port': generator_config.udp_dst_port, - 'vlan_tagging': self.vlan_tagging, - 'dst_mac': generator_config.mac_addrs_left - } - dst_config = { - 'chain_count': self.service_chain_count, - 'flow_count': self.flow_count / 2, - 'ip': generator_config.ip_addrs[1], - 'ip_addrs_step': self.ip_addrs_step, - 'gateway_ip': self.gateway_ips[1], - 'gateway_ip_addrs_step': self.gateway_ip_addrs_step, - 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1], - 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, - 'udp_src_port': generator_config.udp_src_port, - 'udp_dst_port': generator_config.udp_dst_port, - 'vlan_tagging': self.vlan_tagging, - 'dst_mac': generator_config.mac_addrs_right - } - - self.src_device = Device(**dict(src_config, **generator_config.interfaces[0])) - self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1])) - self.src_device.set_destination(self.dst_device) - self.dst_device.set_destination(self.src_device) - - if self.service_chain == ChainType.EXT and not self.no_arp \ - and self.src_device.ip_range_overlaps(): - raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' % - self.src_device.ip, - self.dst_device.ip, - self.flow_count) - - @property - def devices(self): - return [self.src_device, self.dst_device] - - @property - def vlans(self): - return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan] - - @property - def ports(self): - return [self.src_device.port, self.dst_device.port] - - @property - def switch_ports(self): - return [self.src_device.switch_port, self.dst_device.switch_port] - - @property - def pcis(self): - return [self.src_device.pci, self.dst_device.pci] - - -class TrafficGeneratorFactory(object): - """Factory class to generate a traffic generator.""" - - def __init__(self, config): - self.config = config - - def get_tool(self): - return self.config.generator_config.tool - - def get_generator_client(self): - tool = self.get_tool().lower() - if tool == 'trex': - from traffic_gen import trex - return trex.TRex(self.config) - elif tool == 'dummy': - from traffic_gen import dummy - return dummy.DummyTG(self.config) - return None - - def list_generator_profile(self): - return [profile.name for profile in self.config.traffic_generator.generator_profile] - - def get_generator_config(self, generator_profile): - return RunningTrafficProfile(self.config, generator_profile) - - def get_matching_profile(self, traffic_profile_name): - matching_profile = [profile for profile in self.config.traffic_profile if - profile.name == traffic_profile_name] - - if len(matching_profile) > 1: - raise Exception('Multiple traffic profiles with the same name found.') - elif not matching_profile: - raise Exception('No traffic profile found.') - - return matching_profile[0] - - def get_frame_sizes(self, traffic_profile): - matching_profile = self.get_matching_profile(traffic_profile) - return matching_profile.l2frame_size + gen_config.update(matching_profile[0]) + return gen_config class TrafficClient(object): - """Traffic generator client.""" + """Traffic generator client with NDR/PDR binary seearch.""" PORTS = [0, 1] - def __init__(self, config, notifier=None, skip_sleep=False): - generator_factory = TrafficGeneratorFactory(config) - self.gen = generator_factory.get_generator_client() - self.tool = generator_factory.get_tool() + def __init__(self, config, notifier=None): + """Create a new TrafficClient instance. + + config: nfvbench config + notifier: notifier (optional) + + A new instance is created everytime the nfvbench config may have changed. + """ self.config = config + self.generator_config = GeneratorConfig(config) + self.tool = self.generator_config.tool + self.gen = self._get_generator() self.notifier = notifier self.interval_collector = None self.iteration_collector = None self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec) - if self.gen is None: - raise TrafficClientException('%s is not a supported traffic generator' % self.tool) - + self.config.frame_sizes = self._get_frame_sizes() self.run_config = { 'l2frame_size': None, 'duration_sec': self.config.duration_sec, @@ -439,23 +384,70 @@ class TrafficClient(object): self.current_total_rate = {'rate_percent': '10'} if self.config.single_run: self.current_total_rate = utils.parse_rate_str(self.config.rate) - # UT with dummy TG can bypass all sleeps - self.skip_sleep = skip_sleep + self.ifstats = None + # Speed is either discovered when connecting to TG or set from config + # This variable is 0 if not yet discovered from TG or must be the speed of + # each interface in bits per second + self.intf_speed = self.generator_config.intf_speed + + def _get_generator(self): + tool = self.tool.lower() + if tool == 'trex': + from traffic_gen import trex + return trex.TRex(self) + if tool == 'dummy': + from traffic_gen import dummy + return dummy.DummyTG(self) + raise TrafficClientException('Unsupported generator tool name:' + self.tool) - def set_macs(self): - for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices): - device.set_mac(mac) + def skip_sleep(self): + """Skip all sleeps when doing unit testing with dummy TG. + + Must be overriden using mock.patch + """ + return False + + def _get_frame_sizes(self): + traffic_profile_name = self.config.traffic.profile + matching_profiles = [profile for profile in self.config.traffic_profile if + profile.name == traffic_profile_name] + if len(matching_profiles) > 1: + raise TrafficClientException('Multiple traffic profiles with name: ' + + traffic_profile_name) + elif not matching_profiles: + raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name) + return matching_profiles[0].l2frame_size def start_traffic_generator(self): - self.gen.init() + """Start the traffic generator process (traffic not started yet).""" self.gen.connect() + # pick up the interface speed if it is not set from config + intf_speeds = self.gen.get_port_speed_gbps() + # convert Gbps unit into bps + tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits + if self.intf_speed: + # interface speed is overriden from config + if self.intf_speed != tg_if_speed: + # Warn the user if the speed in the config is different + LOG.warning('Interface speed provided is different from actual speed (%d Gbps)', + intf_speeds[0]) + else: + # interface speed not provisioned by config + self.intf_speed = tg_if_speed + # also update the speed in the tg config + self.generator_config.intf_speed = tg_if_speed + + # Save the traffic generator local MAC + for mac, device in zip(self.gen.get_macs(), self.generator_config.devices): + device.set_mac(mac) def setup(self): + """Set up the traffic client.""" self.gen.set_mode() - self.gen.config_interface() self.gen.clear_stats() def get_version(self): + """Get the traffic generator version.""" return self.gen.get_version() def ensure_end_to_end(self): @@ -478,48 +470,50 @@ class TrafficClient(object): all 10 VMs 10 VMs are in operational state. """ LOG.info('Starting traffic generator to ensure end-to-end connectivity') - rate_pps = {'rate_pps': str(self.config.service_chain_count * 1)} + # send 2pps on each chain and each direction + rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)} self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) # ensures enough traffic is coming back retry_count = (self.config.check_traffic_time_sec + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec mac_addresses = set() - ln = 0 - # in case of l2-loopback, we will only have 2 unique src MAC regardless of the - # number of chains configured because there are no VM involved - # otherwise, we expect to see packets coming from 2 unique MAC per chain - unique_src_mac_count = 2 if self.config.l2_loopback else self.config.service_chain_count * 2 + + # we expect to see packets coming from 2 unique MAC per chain + unique_src_mac_count = self.config.service_chain_count * 2 for it in xrange(retry_count): self.gen.clear_stats() self.gen.start_traffic() self.gen.start_capture() - LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count) - if not self.skip_sleep: + LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...', + len(mac_addresses), unique_src_mac_count, + it + 1, retry_count) + if not self.skip_sleep(): time.sleep(self.config.generic_poll_sec) self.gen.stop_traffic() self.gen.fetch_capture_packets() self.gen.stop_capture() for packet in self.gen.packet_list: - mac_addresses.add(packet['binary'][6:12]) - if ln != len(mac_addresses): - ln = len(mac_addresses) - LOG.info('Received unique source MAC %d / %d', ln, unique_src_mac_count) + src_mac = packet['binary'][6:12] + if src_mac not in mac_addresses: + LOG.info('Received packet from mac: %s', + ':'.join(["%02x" % ord(x) for x in src_mac])) + mac_addresses.add(src_mac) + if len(mac_addresses) == unique_src_mac_count: - LOG.info('End-to-end connectivity ensured') + LOG.info('End-to-end connectivity established') return - if not self.skip_sleep: - time.sleep(self.config.generic_poll_sec) - raise TrafficClientException('End-to-end connectivity cannot be ensured') def ensure_arp_successful(self): + """Resolve all IP using ARP and throw an exception in case of failure.""" if not self.gen.resolve_arp(): raise TrafficClientException('ARP cannot be resolved') def set_traffic(self, frame_size, bidirectional): + """Reconfigure the traffic generator for a new frame size.""" self.run_config['bidirectional'] = bidirectional self.run_config['l2frame_size'] = frame_size self.run_config['rates'] = [self.get_per_direction_rate()] @@ -537,7 +531,7 @@ class TrafficClient(object): self.gen.clear_streamblock() self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) - def modify_load(self, load): + def _modify_load(self, load): self.current_total_rate = {'rate_percent': str(load)} rate_per_direction = self.get_per_direction_rate() @@ -548,6 +542,7 @@ class TrafficClient(object): self.run_config['rates'][1] = rate_per_direction def get_ndr_and_pdr(self): + """Start the NDR/PDR iteration and return the results.""" dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional' targets = {} if self.config.ndr_run: @@ -590,6 +585,7 @@ class TrafficClient(object): return float(dropped_pkts) / total_pkts * 100 def get_stats(self): + """Collect final stats for previous run.""" stats = self.gen.get_stats() retDict = {'total_tx_rate': stats['total_tx_rate']} for port in self.PORTS: @@ -645,7 +641,7 @@ class TrafficClient(object): def __convert_rates(self, rate): return utils.convert_rates(self.run_config['l2frame_size'], rate, - self.config.generator_config.intf_speed) + self.intf_speed) def __ndr_pdr_found(self, tag, load): rates = self.__convert_rates({'rate_percent': load}) @@ -745,8 +741,11 @@ class TrafficClient(object): self.__range_search(middle, right, right_targets, results) def __run_search_iteration(self, rate): - # set load - self.modify_load(rate) + """Run one iteration at the given rate level. + + rate: the rate to send on each port in percent (0 to 100) + """ + self._modify_load(rate) # poll interval stats and collect them for stats in self.run_traffic(): @@ -754,12 +753,13 @@ class TrafficClient(object): time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec'] if time_elapsed_ratio >= 1: self.cancel_traffic() - time.sleep(self.config.pause_sec) + if not self.skip_sleep(): + time.sleep(self.config.pause_sec) self.interval_collector.reset() # get stats from the run stats = self.runner.client.get_stats() - current_traffic_config = self.get_traffic_config() + current_traffic_config = self._get_traffic_config() warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'], stats['total_tx_rate']) if warning is not None: @@ -768,11 +768,11 @@ class TrafficClient(object): # save reliable stats from whole iteration self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps']) LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent']) - return stats, current_traffic_config['direction-total'] @staticmethod def log_stats(stats): + """Log estimated stats during run.""" report = { 'datetime': str(datetime.now()), 'tx_packets': stats['overall']['tx']['total_pkts'], @@ -782,11 +782,12 @@ class TrafficClient(object): } LOG.info('TX: %(tx_packets)d; ' 'RX: %(rx_packets)d; ' - 'Dropped: %(drop_packets)d; ' - 'Drop rate: %(drop_rate_percent).4f%%', + 'Est. Dropped: %(drop_packets)d; ' + 'Est. Drop rate: %(drop_rate_percent).4f%%', report) def run_traffic(self): + """Start traffic and return intermediate stats for each interval.""" stats = self.runner.run() while self.runner.is_running: self.log_stats(stats) @@ -799,16 +800,10 @@ class TrafficClient(object): yield stats def cancel_traffic(self): + """Stop traffic.""" self.runner.stop() - def get_interface(self, port_index, stats): - port = self.gen.port_handle[port_index] - tx, rx = 0, 0 - if stats and port in stats: - tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) - return Interface('traffic-generator', self.tool.lower(), tx, rx) - - def get_traffic_config(self): + def _get_traffic_config(self): config = {} load_total = 0.0 bps_total = 0.0 @@ -856,8 +851,63 @@ class TrafficClient(object): r['direction-total'] = total return r + def insert_interface_stats(self, pps_list): + """Insert interface stats to a list of packet path stats. + + pps_list: a list of packet path stats instances indexed by chain index + + This function will insert the packet path stats for the traffic gen ports 0 and 1 + with itemized per chain tx/rx counters. + There will be as many packet path stats as chains. + Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1 + self.pps_list: + [ + PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)), + PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)), + ... + ] + """ + def get_if_stats(chain_idx): + return [InterfaceStats('p' + str(port), self.tool) + for port in range(2)] + # keep the list of list of interface stats indexed by the chain id + self.ifstats = [get_if_stats(chain_idx) + for chain_idx in range(self.config.service_chain_count)] + # note that we need to make a copy of the ifs list so that any modification in the + # list from pps will not change the list saved in self.ifstats + self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats] + # insert the corresponding pps in the passed list + pps_list.extend(self.pps_list) + + def update_interface_stats(self, diff=False): + """Update all interface stats. + + diff: if False, simply refresh the interface stats values with latest values + if True, diff the interface stats with the latest values + Make sure that the interface stats inserted in insert_interface_stats() are updated + with proper values. + self.ifstats: + [ + [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)], + [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)], + ... + ] + """ + if diff: + stats = self.gen.get_stats() + for chain_idx, ifs in enumerate(self.ifstats): + # each ifs has exactly 2 InterfaceStats and 2 Latency instances + # corresponding to the + # port 0 and port 1 for the given chain_idx + # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the + # interface stats for the pps because it could have been modified to contain + # additional interface stats + self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx) + + @staticmethod def compare_tx_rates(required, actual): + """Compare the actual TX rate to the required TX rate.""" threshold = 0.9 are_different = False try: @@ -876,6 +926,7 @@ class TrafficClient(object): return None def get_per_direction_rate(self): + """Get the rate for each direction.""" divisor = 2 if self.run_config['bidirectional'] else 1 if 'rate_percent' in self.current_total_rate: # don't split rate if it's percentage @@ -884,6 +935,7 @@ class TrafficClient(object): return utils.divide_rate(self.current_total_rate, divisor) def close(self): + """Close this instance.""" try: self.gen.stop_traffic() except Exception: diff --git a/nfvbench/traffic_gen/dummy.py b/nfvbench/traffic_gen/dummy.py index 788a53f..2a1064f 100644 --- a/nfvbench/traffic_gen/dummy.py +++ b/nfvbench/traffic_gen/dummy.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +from nfvbench.log import LOG from traffic_base import AbstractTrafficGenerator import traffic_utils as utils @@ -23,33 +24,32 @@ class DummyTG(AbstractTrafficGenerator): Useful for unit testing without actually generating any traffic. """ - def __init__(self, config): - AbstractTrafficGenerator.__init__(self, config) + def __init__(self, traffic_client): + AbstractTrafficGenerator.__init__(self, traffic_client) self.port_handle = [] self.rates = [] self.l2_frame_size = 0 - self.duration_sec = self.config.duration_sec - self.intf_speed = config.generator_config.intf_speed + self.duration_sec = traffic_client.config.duration_sec + self.intf_speed = traffic_client.generator_config.intf_speed self.set_response_curve() - self.packet_list = [{ - "binary": "01234567890123456789" - }, { - "binary": "98765432109876543210" - }] + # for packet capture, generate 2*scc random packets + # normally we should generate packets coming from the right dest macs + scc = traffic_client.config.service_chain_count + self.packet_list = [self._get_packet_capture(mac_id) for mac_id in range(scc * 2)] + + def _get_packet_capture(self, mac_id): + return {'binary': 'SSSSSS01234' + str(mac_id)} def get_version(self): return "0.1" - def init(self): - pass - def get_tx_pps_dropped_pps(self, tx_rate): - '''Get actual tx packets based on requested tx rate + """Get actual tx packets based on requested tx rate. :param tx_rate: requested TX rate with unit ('40%', '1Mbps', '1000pps') :return: the actual TX pps and the dropped pps corresponding to the requested TX rate - ''' + """ dr, tx = self.__get_dr_actual_tx(tx_rate) actual_tx_bps = utils.load_to_bps(tx, self.intf_speed) avg_packet_size = utils.get_average_packet_size(self.l2_frame_size) @@ -61,14 +61,14 @@ class DummyTG(AbstractTrafficGenerator): return int(tx_packets), int(dropped) def set_response_curve(self, lr_dr=0, ndr=100, max_actual_tx=100, max_11_tx=100): - '''Set traffic gen response characteristics + """Set traffic gen response characteristics. Specifies the drop rate curve and the actual TX curve :param float lr_dr: The actual drop rate at TX line rate (in %, 0..100) :param float ndr: The true NDR (0 packet drop) in % (0..100) of line rate" :param float max_actual_tx: highest actual TX when requested TX is 100% :param float max_11_tx: highest requested TX that results in same actual TX - ''' + """ self.target_ndr = ndr if ndr < 100: self.dr_slope = float(lr_dr) / (100 - ndr) @@ -82,10 +82,11 @@ class DummyTG(AbstractTrafficGenerator): self.tx_slope = 0 def __get_dr_actual_tx(self, requested_tx_rate): - '''Get drop rate at given requested tx rate + """Get drop rate at given requested tx rate. + :param float requested_tx_rate: requested tx rate in % (0..100) :return: the drop rate and actual tx rate at that requested_tx_rate in % (0..100) - ''' + """ if requested_tx_rate <= self.max_11_tx: actual_tx = requested_tx_rate else: @@ -97,15 +98,9 @@ class DummyTG(AbstractTrafficGenerator): return dr, actual_tx def connect(self): - ports = list(self.config.generator_config.ports) + ports = list(self.traffic_client.generator_config.ports) self.port_handle = ports - def is_arp_successful(self): - return True - - def config_interface(self): - pass - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): self.rates = [utils.to_rate_str(rate) for rate in rates] self.l2_frame_size = l2frame_size @@ -114,7 +109,7 @@ class DummyTG(AbstractTrafficGenerator): pass def get_stats(self): - '''Get stats from current run. + """Get stats from current run. The binary search mainly looks at 2 results to make the decision: actual tx packets @@ -122,7 +117,7 @@ class DummyTG(AbstractTrafficGenerator): From the Requested TX rate - we get the Actual TX rate and the RX drop rate From the Run duration and actual TX rate - we get the actual total tx packets From the Actual tx packets and RX drop rate - we get the RX dropped packets - ''' + """ result = {} total_tx_pps = 0 @@ -160,9 +155,24 @@ class DummyTG(AbstractTrafficGenerator): result['total_tx_rate'] = total_tx_pps return result + def get_stream_stats(self, tg_stats, if_stats, latencies, chain_idx): + for port in range(2): + if_stats[port].tx = 1000 + if_stats[port].rx = 1000 + latencies[port].min_usec = 10 + latencies[port].max_usec = 100 + latencies[port].avg_usec = 50 + def get_macs(self): return ['00.00.00.00.00.01', '00.00.00.00.00.02'] + def get_port_speed_gbps(self): + """Return the local port speeds. + + return: a list of speed in Gbps indexed by the port# + """ + return [10, 10] + def clear_stats(self): pass @@ -188,4 +198,6 @@ class DummyTG(AbstractTrafficGenerator): pass def resolve_arp(self): + """Resolve ARP sucessfully.""" + LOG.info('Dummy TG ARP OK') return True diff --git a/nfvbench/traffic_gen/traffic_base.py b/nfvbench/traffic_gen/traffic_base.py index 81537b3..adb2bd0 100644 --- a/nfvbench/traffic_gen/traffic_base.py +++ b/nfvbench/traffic_gen/traffic_base.py @@ -13,18 +13,47 @@ # under the License. import abc +import sys from nfvbench.log import LOG import traffic_utils +class Latency(object): + """A class to hold latency data.""" + + def __init__(self, latency_list=None): + """Create a latency instance. + + latency_list: aggregate all latency values from list if not None + """ + self.min_usec = sys.maxint + self.max_usec = 0 + self.avg_usec = 0 + if latency_list: + for lat in latency_list: + if lat.available(): + self.min_usec = min(self.min_usec, lat.min_usec) + self.max_usec = max(self.max_usec, lat.max_usec) + self.avg_usec += lat.avg_usec + # round to nearest usec + self.avg_usec = int(round(float(self.avg_usec) / len(latency_list))) + + def available(self): + """Return True if latency information is available.""" + return self.min_usec != sys.maxint + class TrafficGeneratorException(Exception): + """Exception for traffic generator.""" + pass class AbstractTrafficGenerator(object): - def __init__(self, config): - self.config = config + def __init__(self, traffic_client): + self.traffic_client = traffic_client + self.generator_config = traffic_client.generator_config + self.config = traffic_client.config self.imix_l2_sizes = [64, 594, 1518] self.imix_ratios = [7, 4, 1] self.imix_avg_l2_size = 0 @@ -35,46 +64,32 @@ class AbstractTrafficGenerator(object): # Must be implemented by sub classes return None - @abc.abstractmethod - def init(self): - # Must be implemented by sub classes - return None - @abc.abstractmethod def connect(self): # Must be implemented by sub classes return None - @abc.abstractmethod - def config_interface(self): - # Must be implemented by sub classes - return None - @abc.abstractmethod def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): # Must be implemented by sub classes return None def modify_rate(self, rate, reverse): + """Change the rate per port. + + rate: new rate in % (0 to 100) + reverse: 0 for port 0, 1 for port 1 + """ port_index = int(reverse) port = self.port_handle[port_index] self.rates[port_index] = traffic_utils.to_rate_str(rate) - LOG.info('Modified traffic stream for %s, new rate=%s.', port, - traffic_utils.to_rate_str(rate)) - - def modify_traffic(self): - # Must be implemented by sub classes - return None + LOG.info('Modified traffic stream for port %s, new rate=%s.', port, self.rates[port_index]) @abc.abstractmethod def get_stats(self): # Must be implemented by sub classes return None - def clear_traffic(self): - # Must be implemented by sub classes - return None - @abc.abstractmethod def start_traffic(self): # Must be implemented by sub classes @@ -87,9 +102,37 @@ class AbstractTrafficGenerator(object): @abc.abstractmethod def cleanup(self): - # Must be implemented by sub classes + """Cleanup the traffic generator.""" return None + def clear_streamblock(self): + """Clear all streams from the traffic generator.""" + pass + + @abc.abstractmethod + def resolve_arp(self): + """Resolve all configured remote IP addresses. + + return: True if ARP resolved successfully + """ + pass + + @abc.abstractmethod + def get_macs(self): + """Return the local port MAC addresses. + + return: a list of MAC addresses indexed by the port# + """ + pass + + @abc.abstractmethod + def get_port_speed_gbps(self): + """Return the local port speeds. + + return: a list of speed in Gbps indexed by the port# + """ + pass + def adjust_imix_min_size(self, min_size): # assume the min size is always the first entry self.imix_l2_sizes[0] = min_size diff --git a/nfvbench/traffic_gen/traffic_utils.py b/nfvbench/traffic_gen/traffic_utils.py index 4a7f855..c3428a4 100644 --- a/nfvbench/traffic_gen/traffic_utils.py +++ b/nfvbench/traffic_gen/traffic_utils.py @@ -20,18 +20,29 @@ imix_avg_l2_size = None def convert_rates(l2frame_size, rate, intf_speed): + """Convert a given rate unit into the other rate units. + + l2frame_size: size of the L2 frame in bytes or 'IMIX' + rate: a dict that has at least one of the following key: + 'rate_pps', 'rate_bps', 'rate_percent' + with the corresponding input value + intf_speed: the line rate speed in bits per second + """ avg_packet_size = get_average_packet_size(l2frame_size) if 'rate_pps' in rate: + # input = packets/sec initial_rate_type = 'rate_pps' pps = rate['rate_pps'] bps = pps_to_bps(pps, avg_packet_size) load = bps_to_load(bps, intf_speed) elif 'rate_bps' in rate: + # input = bits per second initial_rate_type = 'rate_bps' bps = rate['rate_bps'] load = bps_to_load(bps, intf_speed) pps = bps_to_pps(bps, avg_packet_size) elif 'rate_percent' in rate: + # input = percentage of the line rate (between 0.0 and 100.0) initial_rate_type = 'rate_percent' load = rate['rate_percent'] bps = load_to_bps(load, intf_speed) diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py index cabf1cb..31b0867 100644 --- a/nfvbench/traffic_gen/trex.py +++ b/nfvbench/traffic_gen/trex.py @@ -17,7 +17,6 @@ import random import time import traceback -from collections import defaultdict from itertools import count from nfvbench.log import LOG from nfvbench.specs import ChainType @@ -54,31 +53,52 @@ from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP class TRex(AbstractTrafficGenerator): + """TRex traffic generator driver.""" + LATENCY_PPS = 1000 + CHAIN_PG_ID_MASK = 0x007F + PORT_PG_ID_MASK = 0x0080 + LATENCY_PG_ID_MASK = 0x0100 - def __init__(self, runner): - AbstractTrafficGenerator.__init__(self, runner) + def __init__(self, traffic_client): + AbstractTrafficGenerator.__init__(self, traffic_client) self.client = None self.id = count() - self.latencies = defaultdict(list) - self.stream_ids = defaultdict(list) self.port_handle = [] - self.streamblock = defaultdict(list) + self.chain_count = self.generator_config.service_chain_count self.rates = [] + # A dict of list of dest macs indexed by port# + # the dest macs in the list are indexed by the chain id self.arps = {} self.capture_id = None self.packet_list = [] def get_version(self): + """Get the Trex version.""" return self.client.get_server_version() + def get_pg_id(self, port, chain_id): + """Calculate the packet group IDs to use for a given port/stream type/chain_id. + + port: 0 or 1 + chain_id: identifies to which chain the pg_id is associated (0 to 255) + return: pg_id, lat_pg_id + + We use a bit mask to set up the 3 fields: + 0x007F: chain ID (8 bits for a max of 128 chains) + 0x0080: port bit + 0x0100: latency bit + """ + pg_id = port * TRex.PORT_PG_ID_MASK | chain_id + return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK + def extract_stats(self, in_stats): """Extract stats from dict returned by Trex API. :param in_stats: dict as returned by TRex api """ utils.nan_replace(in_stats) - LOG.debug(in_stats) + # LOG.debug(in_stats) result = {} # port_handles should have only 2 elements: [0, 1] @@ -104,38 +124,123 @@ class TRex(AbstractTrafficGenerator): far_end_stats['opackets'] - stats['ipackets']) } } + self.__combine_latencies(in_stats, result[ph]['rx'], ph) - lat = self.__combine_latencies(in_stats, ph) - result[ph]['rx']['max_delay_usec'] = cast_integer( - lat['total_max']) if 'total_max' in lat else float('nan') - result[ph]['rx']['min_delay_usec'] = cast_integer( - lat['total_min']) if 'total_min' in lat else float('nan') - result[ph]['rx']['avg_delay_usec'] = cast_integer( - lat['average']) if 'average' in lat else float('nan') total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec) + result["flow_stats"] = in_stats["flow_stats"] + result["latency"] = in_stats["latency"] return result - def __combine_latencies(self, in_stats, port_handle): - """Traverses TRex result dictionary and combines chosen latency stats.""" - if not self.latencies[port_handle]: - return {} - - result = defaultdict(float) - result['total_min'] = float("inf") - for lat_id in self.latencies[port_handle]: - lat = in_stats['latency'][lat_id] - result['dropped_pkts'] += lat['err_cntrs']['dropped'] - result['total_max'] = max(lat['latency']['total_max'], result['total_max']) - result['total_min'] = min(lat['latency']['total_min'], result['total_min']) - result['average'] += lat['latency']['average'] - - result['average'] /= len(self.latencies[port_handle]) - - return result + def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx): + """Extract the aggregated stats for a given chain. + + trex_stats: stats as returned by get_stats() + if_stats: a list of 2 interface stats to update (port 0 and 1) + latencies: a list of 2 Latency instances to update for this chain (port 0 and 1) + latencies[p] is the latency for packets sent on port p + if there are no latency streams, the Latency instances are not modified + chain_idx: chain index of the interface stats + + The packet counts include normal and latency streams. + + Trex returns flows stats as follows: + + 'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, + 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}}, + 1: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068}, + 'tx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}}, + 128: {'rx_bps': {0: 0, 1: 0, 'total': 0}, + 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'rx_bytes': {0: nan, 1: nan, 'total': nan}, + 'rx_pkts': {0: 15001, 1: 0, 'total': 15001}, + 'rx_pps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps': {0: 0, 1: 0, 'total': 0}, + 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0}, + 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068}, + 'tx_pkts': {0: 0, 1: 15001, 'total': 15001}, + 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc... + + the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the + get_pg_id() method. + packet counters for a given stream sent on port p are reported as: + - tx_pkts[p] on port p + - rx_pkts[1-p] on the far end port + + This is a tricky/critical counter transposition operation because + the results are grouped by port (not by stream): + tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0] + rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0] + tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1] + rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1] + + or using a more generic formula: + tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p] + rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p] + + the second formula is equivalent to + rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p] + + If there are latency streams, those same counters need to be added in the same way + """ + for ifs in if_stats: + ifs.tx = ifs.rx = 0 + for port in range(2): + pg_id, lat_pg_id = self.get_pg_id(port, chain_idx) + for pid in [pg_id, lat_pg_id]: + try: + pg_stats = trex_stats['flow_stats'][pid] + if_stats[port].tx += pg_stats['tx_pkts'][port] + if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port] + except KeyError: + pass + try: + lat = trex_stats['latency'][lat_pg_id]['latency'] + # dropped_pkts += lat['err_cntrs']['dropped'] + latencies[port].max_usec = int(round(lat['total_max'])) + latencies[port].min_usec = int(round(lat['total_min'])) + latencies[port].avg_usec = int(round(lat['average'])) + except KeyError: + pass - def create_pkt(self, stream_cfg, l2frame_size): + def __combine_latencies(self, in_stats, results, port_handle): + """Traverse TRex result dictionary and combines chosen latency stats.""" + total_max = 0 + average = 0 + total_min = float("inf") + for chain_id in range(self.chain_count): + try: + _, lat_pg_id = self.get_pg_id(port_handle, chain_id) + lat = in_stats['latency'][lat_pg_id]['latency'] + # dropped_pkts += lat['err_cntrs']['dropped'] + total_max = max(lat['total_max'], total_max) + total_min = min(lat['total_min'], total_min) + average += lat['average'] + except KeyError: + pass + if total_min == float("inf"): + total_min = 0 + results['min_delay_usec'] = total_min + results['max_delay_usec'] = total_max + results['avg_delay_usec'] = int(average / self.chain_count) + def _create_pkt(self, stream_cfg, l2frame_size): pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) if stream_cfg['vlan_tag'] is not None: # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP) @@ -195,48 +300,42 @@ class TRex(AbstractTrafficGenerator): return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param)) - def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True): - idx_lat = None + def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True): + """Create a list of streams corresponding to a given chain and stream config. + + port: port where the streams originate (0 or 1) + chain_id: the chain to which the streams are associated to + stream_cfg: stream configuration + l2frame: L2 frame size + latency: if True also create a latency stream + """ streams = [] + pg_id, lat_pg_id = self.get_pg_id(port, chain_id) if l2frame == 'IMIX': min_size = 64 if stream_cfg['vlan_tag'] is None else 68 self.adjust_imix_min_size(min_size) - for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)): - pkt = self.create_pkt(stream_cfg, l2_frame_size) + for ratio, l2_frame_size in zip(self.imix_ratios, self.imix_l2_sizes): + pkt = self._create_pkt(stream_cfg, l2_frame_size) streams.append(STLStream(packet=pkt, - isg=0.1 * t, - flow_stats=STLFlowStats( - pg_id=self.stream_ids[port_handle]), + flow_stats=STLFlowStats(pg_id=pg_id), mode=STLTXCont(pps=ratio))) if latency: - idx_lat = self.id.next() - pkt = self.create_pkt(stream_cfg, self.imix_avg_l2_size) - sl = STLStream(packet=pkt, - isg=isg, - flow_stats=STLFlowLatencyStats(pg_id=idx_lat), - mode=STLTXCont(pps=self.LATENCY_PPS)) - streams.append(sl) + # for IMIX, the latency packets have the average IMIX packet size + pkt = self._create_pkt(stream_cfg, self.imix_avg_l2_size) + else: - pkt = self.create_pkt(stream_cfg, l2frame) + pkt = self._create_pkt(stream_cfg, l2frame) streams.append(STLStream(packet=pkt, - flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]), + flow_stats=STLFlowStats(pg_id=pg_id), mode=STLTXCont())) - if latency: - idx_lat = self.id.next() - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowLatencyStats(pg_id=idx_lat), - mode=STLTXCont(pps=self.LATENCY_PPS))) - if latency: - self.latencies[port_handle].append(idx_lat) - + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id), + mode=STLTXCont(pps=self.LATENCY_PPS))) return streams - def init(self): - pass - @timeout(5) def __connect(self, client): client.connect() @@ -255,8 +354,9 @@ class TRex(AbstractTrafficGenerator): LOG.info("Retrying connection to TRex (%s)...", ex.message) def connect(self): - LOG.info("Connecting to TRex...") - server_ip = self.config.generator_config.ip + """Connect to the TRex server.""" + server_ip = self.generator_config.ip + LOG.info("Connecting to TRex (%s)...", server_ip) # Connect to TRex server self.client = STLClient(server=server_ip) @@ -289,10 +389,36 @@ class TRex(AbstractTrafficGenerator): else: raise TrafficGeneratorException(e.message) - ports = list(self.config.generator_config.ports) + ports = list(self.generator_config.ports) self.port_handle = ports # Prepare the ports self.client.reset(ports) + # Read HW information from each port + # this returns an array of dict (1 per port) + """ + Example of output for Intel XL710 + [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True, + 'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes', + u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']}, + u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE', + u'description': u'Ethernet Controller XL710 for 40GbE QSFP+', + 'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-', + u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match', + 'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0', + 'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off', + 'layer_mode': 'Ethernet', u'numa': 0}, ...] + """ + self.port_info = self.client.get_port_info(ports) + LOG.info('Connected to TRex') + for id, port in enumerate(self.port_info): + LOG.info(' Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s', + id, port['description'], port['speed'], port['src_mac'], + port['pci_addr'], port['driver']) + # Make sure the 2 ports have the same speed + if self.port_info[0]['speed'] != self.port_info[1]['speed']: + raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' % + (self.port_info[0]['speed'], + self.port_info[1]['speed'])) def set_mode(self): if self.config.service_chain == ChainType.EXT and not self.config.no_arp: @@ -302,7 +428,7 @@ class TRex(AbstractTrafficGenerator): def __set_l3_mode(self): self.client.set_service_mode(ports=self.port_handle, enabled=True) - for port, device in zip(self.port_handle, self.config.generator_config.devices): + for port, device in zip(self.port_handle, self.generator_config.devices): try: self.client.set_l3_mode(port=port, src_ipv4=device.tg_gateway_ip, @@ -315,62 +441,85 @@ class TRex(AbstractTrafficGenerator): def __set_l2_mode(self): self.client.set_service_mode(ports=self.port_handle, enabled=True) - for port, device in zip(self.port_handle, self.config.generator_config.devices): - for cfg in device.get_stream_configs(self.config.generator_config.service_chain): + for port, device in zip(self.port_handle, self.generator_config.devices): + for cfg in device.get_stream_configs(): self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst']) self.client.set_service_mode(ports=self.port_handle, enabled=False) def __start_server(self): server = TRexTrafficServer() - server.run_server(self.config.generator_config, self.config.vlan_tagging) + server.run_server(self.generator_config) def resolve_arp(self): + """Resolve all configured remote IP addresses. + + return: True if ARP resolved successfully + """ self.client.set_service_mode(ports=self.port_handle) - LOG.info('Polling ARP until successful') - resolved = 0 - attempt = 0 - for port, device in zip(self.port_handle, self.config.generator_config.devices): + LOG.info('Polling ARP until successful...') + arps = {} + for port, device in zip(self.port_handle, self.generator_config.devices): + # there should be 1 stream config per chain + stream_configs = device.get_stream_configs() + chain_count = len(stream_configs) ctx = self.client.create_service_ctx(port=port) - + # all dest macs on this port indexed by chain ID + dst_macs = [None] * chain_count + dst_macs_count = 0 + # the index in the list is the chain id arps = [ STLServiceARP(ctx, src_ip=cfg['ip_src_tg_gw'], dst_ip=cfg['mac_discovery_gw'], vlan=device.vlan_tag if device.vlan_tagging else None) - for cfg in device.get_stream_configs(self.config.generator_config.service_chain) + for cfg in stream_configs() ] - for _ in xrange(self.config.generic_retry_count): - attempt += 1 + for attempt in range(self.config.generic_retry_count): try: ctx.run(arps) except STLError: LOG.error(traceback.format_exc()) continue - self.arps[port] = [arp.get_record().dst_mac for arp in arps - if arp.get_record().dst_mac is not None] - - if len(self.arps[port]) == self.config.service_chain_count: - resolved += 1 + unresolved = [] + for chain_id, mac in enumerate(dst_macs): + if not mac: + arp_record = arps[chain_id].get_record() + if arp_record.dest_mac: + dst_macs[chain_id] = arp_record.dst_mac + dst_macs_count += 1 + LOG.info(' ARP: port=%d chain=%d IP=%s -> MAC=%s', + port, chain_id, + arp_record.dst_ip, arp_record.dst_mac) + else: + unresolved.append(arp_record.dst_ip) + if dst_macs_count == chain_count: + arps[port] = dst_macs LOG.info('ARP resolved successfully for port %s', port) break else: - failed = [arp.get_record().dst_ip for arp in arps - if arp.get_record().dst_mac is None] - LOG.info('Retrying ARP for: %s (%d / %d)', - failed, attempt, self.config.generic_retry_count) - time.sleep(self.config.generic_poll_sec) + retry = attempt + 1 + LOG.info('Retrying ARP for: %s (retry %d/%d)', + unresolved, retry, self.config.generic_retry_count) + if retry < self.config.generic_retry_count: + time.sleep(self.config.generic_poll_sec) + else: + LOG.error('ARP timed out for port %s (resolved %d out of %d)', + port, + dst_macs_count, + chain_count) + break self.client.set_service_mode(ports=self.port_handle, enabled=False) - return resolved == len(self.port_handle) - - def config_interface(self): - pass + if len(arps) == len(self.port_handle): + self.arps = arps + return True + return False def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency): """Check if rate provided by user is above requirements. Applies only if latency is True.""" - intf_speed = self.config.generator_config.intf_speed + intf_speed = self.generator_config.intf_speed if latency: if bidirectional: mult = 2 @@ -392,6 +541,14 @@ class TRex(AbstractTrafficGenerator): return {'result': True} def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + """Program all the streams in Trex server. + + l2frame_size: L2 frame size or IMIX + rates: a list of 2 rates to run each direction + each rate is a dict like {'rate_pps': '10kpps'} + bidirectional: True if bidirectional + latency: True if latency measurement is needed + """ r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) if not r['result']: raise TrafficGeneratorException( @@ -399,59 +556,89 @@ class TRex(AbstractTrafficGenerator): .format(pps=r['rate_pps'], bps=r['rate_bps'], load=r['rate_percent'])) - - stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain) - for d in self.config.generator_config.devices] + # a dict of list of streams indexed by port# + # in case of fixed size, has self.chain_count * 2 * 2 streams + # (1 normal + 1 latency stream per direction per chain) + # for IMIX, has self.chain_count * 2 * 4 streams + # (3 normal + 1 latency stream per direction per chain) + streamblock = {} + for port in self.port_handle: + streamblock[port] = [] + stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices] self.rates = [utils.to_rate_str(rate) for rate in rates] - - for ph in self.port_handle: - # generate one pg_id for each direction - self.stream_ids[ph] = self.id.next() - - for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): - if self.config.service_chain == ChainType.EXT and not self.config.no_arp: - fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i] - rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i] - - self.streamblock[0].extend(self.generate_streams(self.port_handle[0], - fwd_stream_cfg, - l2frame_size, - latency=latency)) + for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): + if self.arps: + # in case of external chain with ARP, fill in the proper dest MAC + # based on the 2 ARP replies for each chain + fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][chain_id] + rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][chain_id] + + streamblock[0].extend(self.generate_streams(self.port_handle[0], + chain_id, + fwd_stream_cfg, + l2frame_size, + latency=latency)) if len(self.rates) > 1: - self.streamblock[1].extend(self.generate_streams(self.port_handle[1], - rev_stream_cfg, - l2frame_size, - isg=10.0, - latency=bidirectional and latency)) + streamblock[1].extend(self.generate_streams(self.port_handle[1], + chain_id, + rev_stream_cfg, + l2frame_size, + latency=bidirectional and latency)) - for ph in self.port_handle: - self.client.add_streams(self.streamblock[ph], ports=ph) - LOG.info('Created traffic stream for port %s.', ph) + for port in self.port_handle: + self.client.add_streams(streamblock[port], ports=port) + LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port) def clear_streamblock(self): - self.streamblock = defaultdict(list) - self.latencies = defaultdict(list) - self.stream_ids = defaultdict(list) + """Clear all streams from TRex.""" self.rates = [] self.client.reset(self.port_handle) - LOG.info('Cleared all existing streams.') + LOG.info('Cleared all existing streams') def get_stats(self): + """Get stats from Trex.""" stats = self.client.get_stats() return self.extract_stats(stats) def get_macs(self): - return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle] + """Return the Trex local port MAC addresses. + + return: a list of MAC addresses indexed by the port# + """ + return [port['src_mac'] for port in self.port_info] + + def get_port_speed_gbps(self): + """Return the Trex local port MAC addresses. + + return: a list of speed in Gbps indexed by the port# + """ + return [port['speed'] for port in self.port_info] + + def get_dest_macs(self): + """Return the dest MAC for all chains for both ports for the current traffic setup. + + return: a list of MAC addresses indexed by the port# [[m00, m01...], [m10, m11...]] + + If ARP are used, resolve_arp() must be called prior to calling this method. + """ + # if ARP was used, return the dest MACs resolved by ARP + if self.arps: + return [self.arps[port] for port in self.port_handle] + # no ARP, use the dest MACs as configured in the devices + return [d.dest_macs for d in self.generator_config.devices] def clear_stats(self): + """Clear all stats in the traffic gneerator.""" if self.port_handle: self.client.clear_stats() def start_traffic(self): + """Start generating traffic in all ports.""" for port, rate in zip(self.port_handle, self.rates): self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True) def stop_traffic(self): + """Stop generating traffic.""" self.client.stop(ports=self.port_handle) def start_capture(self): @@ -468,18 +655,21 @@ class TRex(AbstractTrafficGenerator): bpf_filter=bpf_filter) def fetch_capture_packets(self): + """Fetch capture packets in capture mode.""" if self.capture_id: self.packet_list = [] self.client.fetch_capture_packets(capture_id=self.capture_id['id'], output=self.packet_list) def stop_capture(self): + """Stop capturing packets.""" if self.capture_id: self.client.stop_capture(capture_id=self.capture_id['id']) self.capture_id = None self.client.set_service_mode(ports=self.port_handle, enabled=False) def cleanup(self): + """Cleanup Trex driver.""" if self.client: try: self.client.reset(self.port_handle) diff --git a/nfvbench/traffic_server.py b/nfvbench/traffic_server.py index dcb83fb..2239ec3 100644 --- a/nfvbench/traffic_server.py +++ b/nfvbench/traffic_server.py @@ -34,17 +34,17 @@ class TRexTrafficServer(TrafficServer): assert len(contents) == 1 self.trex_dir = os.path.join(trex_base_dir, contents[0]) - def run_server(self, traffic_profile, vlan_tagging, filename='/etc/trex_cfg.yaml'): + def run_server(self, generator_config, filename='/etc/trex_cfg.yaml'): """ Runs TRex server for specified traffic profile. :param traffic_profile: traffic profile object based on config file :param filename: path where to save TRex config file """ - cfg = self.__save_config(traffic_profile, filename) - cores = traffic_profile.cores - sw_mode = "--software" if traffic_profile.generator_config.software_mode else "" - vlan_opt = "--vlan" if vlan_tagging else "" + cfg = self.__save_config(generator_config, filename) + cores = generator_config.cores + sw_mode = "--software" if generator_config.software_mode else "" + vlan_opt = "--vlan" if generator_config.vlan_tagging else "" subprocess.Popen(['nohup', '/bin/bash', '-c', './t-rex-64 -i -c {} --iom 0 --no-scapy-server --close-at-end {} ' '{} --cfg {} &> /tmp/trex.log & disown'.format(cores, sw_mode, @@ -52,10 +52,10 @@ class TRexTrafficServer(TrafficServer): cwd=self.trex_dir) LOG.info('TRex server is running...') - def __save_config(self, traffic_profile, filename): - ifs = ",".join([repr(pci) for pci in traffic_profile.pcis]) + def __save_config(self, generator_config, filename): + ifs = ",".join([repr(pci) for pci in generator_config.pcis]) - result = """# Config generated by NFVBench tool + result = """# Config generated by NFVbench - port_limit : 2 version : 2 interfaces : [{ifs}]""".format(ifs=ifs) -- cgit 1.2.3-korg