diff options
Diffstat (limited to 'nfvbench')
31 files changed, 6443 insertions, 0 deletions
diff --git a/nfvbench/__init__.py b/nfvbench/__init__.py new file mode 100644 index 0000000..6e88400 --- /dev/null +++ b/nfvbench/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import pbr.version + +__version__ = pbr.version.VersionInfo('nfvbench').version_string_with_vcs() diff --git a/nfvbench/cfg.default.yaml b/nfvbench/cfg.default.yaml new file mode 100644 index 0000000..795ed5d --- /dev/null +++ b/nfvbench/cfg.default.yaml @@ -0,0 +1,337 @@ +# +# NFVbench default configuration file +# +# This configuration file is ALWAYS loaded by NFVbench and should never be modified by users. +# To specify your own property values, always define them in a separate config file +# and pass that file to the script using -c or --config <file> +# Property values in that config file will override the default values in the current file +# +--- +# IMPORTANT CUSTOMIZATION NOTES +# There are roughly 2 types of NFVbench config based on the OpenStack encaps used: +# - VLAN (OVS, OVS-DPDK, ML2/VPP) +# Many of the fields to customize are relevant to only 1 of the 2 encaps +# These will be clearly labeled "VxLAN only" or "VLAN only" +# Fields that are not applicable will not be used by NFVbench and can be left empty +# +# All fields are applicable to all encaps/traffic generators unless explicitly marked otherwise. +# Fields that can be over-ridden at the command line are marked with the corresponding +# option, e.g. "--interval" + + +# Name of the image to use for launching the loopback VMs. This name must be +# the exact same name used in OpenStack (as shown from 'nova image-list') +# Can be overridden by --image or -i +image_name: 'nfvbenchvm' +# Forwarder to use in nfvbenchvm image. Available options: ['vpp', 'testpmd'] +vm_forwarder: testpmd + +# NFVbench can automatically upload a VM image if the image named by +# image_name is missing, for that you need to specify a file location where +# the image can be retrieved +# +# To upload the image as a file, download it to preferred location +# and prepend it with file:// like in this example: +# file://<location of the image> +# NFVbench (the image must have the same name as defined in image_name above). +vm_image_file: + +# Name of the flavor to use for the loopback VMs +# +# If the provided name is an exact match to a flavor name known by OpenStack +# (as shown from 'nova flavor-list'), that flavor will be reused. +# Otherwise, a new flavor will be created with attributes listed below. +flavor_type: 'nfvbench.medium' + +# Custom flavor attributes +flavor: + # Number of vCPUs for the flavor + vcpus: 2 + # Memory for the flavor in MB + ram: 8192 + # Size of local disk in GB + disk: 0 + # metadata are supported and can be added if needed, optional + # note that if your openstack does not have NUMA optimization + # (cpu pinning and huge pages) + # you must comment out extra_specs completely otherwise + # loopback VM creation will fail + extra_specs: + "hw:cpu_policy": dedicated + "hw:mem_page_size": large + +# Name of the availability zone to use for the test VMs +# Must be one of the zones listed by 'nova availability-zone-list' +# If the selected zone contains only 1 compute node and PVVP inter-node flow is selected, +# application will use intra-node PVVP flow. +# List of compute nodes can be specified, must be in given availability zone if not empty +#availability_zone: 'nova' +availability_zone: +compute_nodes: + + +# Credentials for SSH connection to TOR switches. +tor: + # Leave type empty or switch list empty to skip TOR switches configuration. + # Preferably use 'no_tor_access' to achieve the same behavior. + # (skipping TOR config will require the user to pre-stitch the traffic generator interfaces + # to the service chain under test, needed only if configured in access mode) + type: + # Switches are only needed if type is not empty. + # You can configure 0, 1 or 2 switches + # no switch: in this case NFVbench will not attempt to ssh to the switch + # and stitching of traffic must be done externally + # 1 switch: this assumes that both traffic generator interfaces are wired to the same switch + # 2 switches: this is the recommended setting wuth redundant switches, in this case each + # traffic generator interface must be wired to a different switch + switches: + - host: + username: + password: + port: + +# Skip TOR switch configuration and retrieving of stats +# Can be overriden by --no-tor-access +no_tor_access: false + +# Skip vswitch configuration and retrieving of stats +# Can be overriden by --no-vswitch-access +no_vswitch_access: false + +# Type of service chain to run, possible options are PVP, PVVP and EXT +# PVP - port to VM to port +# PVVP - port to VM to VM to port +# EXT - external chain used only for running traffic and checking traffic generator counters, +# all other parts of chain must be configured manually +# Can be overriden by --service-chain +service_chain: 'PVP' + +# Total number of service chains, every chain has own traffic stream +# Can be overriden by --service-chain-count +service_chain_count: 1 + +# Total number of traffic flows for all chains and directions generated by the traffic generator. +# Minimum is '2 * service_chain_count', it is automatically adjusted if too small +# value was configured. Must be even. +# Every flow has packets with different IPs in headers +# Can be overriden by --flow-count +flow_count: 2 + +# Used by PVVP chain to spawn VMs on different compute nodes +# Can be overriden by --inter-node +inter_node: false + +# set to true if service chains should use SRIOV +# This requires SRIOV to be available on compute nodes +sriov: false + +# Skip interfaces config on EXT service chain +# Can be overriden by --no-int-config +no_int_config: false + +# Resources created by NFVbench will not be removed +# Can be overriden by --no-cleanup +no_cleanup: false + +# Configuration for traffic generator +traffic_generator: + # Name of the traffic generator, only for informational purposes + host_name: 'nfvbench_tg' + # this is the default traffic generator profile to use + # the name must be defined under generator_profile + # you can override the traffic generator to use using the + # -g or --traffic-gen option at the command line + default_profile: trex-local + + # IP addresses for L3 traffic. + # All of the IPs are used as base for IP sequence computed based on chain or flow count. + # + # `ip_addrs` base IPs used as src and dst in packet header, quantity depends on flow count + # `ip_addrs_step`: step for generating IP sequence. Use "random" for random patterns, default is 0.0.0.1. + # `tg_gateway_ip_addrs` base IPs for traffic generator ports, quantity depends on chain count + # `tg_gateway_ip_addrs__step`: step for generating traffic generator gateway sequences. default is 0.0.0.1 + # `gateway_ip_addrs`: base IPs of router gateways on both networks, quantity depends on chain count + # `gateway_ip_addrs_step`: step for generating router gateway sequences. default is 0.0.0.1 + ip_addrs: ['10.0.0.0/8', '20.0.0.0/8'] + ip_addrs_step: 0.0.0.1 + tg_gateway_ip_addrs: ['1.1.0.100', '2.2.0.100'] + tg_gateway_ip_addrs_step: 0.0.0.1 + gateway_ip_addrs: ['1.1.0.2', '2.2.0.2'] + gateway_ip_addrs_step: 0.0.0.1 + + # Traffic Generator Profiles + # In case you have multiple testbeds or traffic generators, + # you can define one traffic generator profile per testbed/traffic generator. + # + # Generator profiles are listed in the following format: + # `name`: Traffic generator profile name (use a unique name, no space or special character) + # `tool`: Traffic generator tool to be used (currently supported is `TRex`). + # `ip`: IP address of the traffic generator. + # `cores`: Specify the number of cores for TRex traffic generator. ONLY applies to trex-local. + # `interfaces`: Configuration of traffic generator interfaces. + # `interfaces.port`: The port of the traffic generator to be used (leave as 0 and 1 resp.) + # `interfaces.switch_port`: Leave empty (reserved for advanced use cases) + # `interfaces.pci`: The PCI address of the intel NIC interface associated to this port + # `intf_speed`: The speed of the interfaces used by the traffic generator (per direction). + # + generator_profile: + - name: trex-local + tool: TRex + ip: 127.0.0.1 + cores: 3 + interfaces: + - port: 0 + switch_port: + pci: + - port: 1 + switch_port: + pci: + intf_speed: 10Gbps + +# ----------------------------------------------------------------------------- +# These variables are not likely to be changed + +# The openrc file +openrc_file: + +# General retry count +generic_retry_count: 100 + +# General poll period +generic_poll_sec: 2 + +# name of the loop VM +loop_vm_name: 'nfvbench-loop-vm' + +# Default names, subnets and CIDRs for internal networks used by the script. +# If a network with given name already exists it will be reused. +# Otherwise a new internal network will be created with that name, subnet and CIDR. +internal_networks: + # Required only when segmentation_id specified + physical_network: + left: + name: 'nfvbench-net0' + subnet: 'nfvbench-subnet0' + cidr: '192.168.1.0/24' + network_type: 'vlan' + segmentation_id: + right: + name: 'nfvbench-net1' + subnet: 'nfvbench-subnet1' + cidr: '192.168.2.0/24' + network_type: 'vlan' + segmentation_id: + middle: + name: 'nfvbench-net2' + subnet: 'nfvbench-subnet2' + cidr: '192.168.3.0/24' + network_type: 'vlan' + segmentation_id: + +# EXT chain only. Names of edge networks which will be used to send traffic via traffic generator. +external_networks: + left: 'nfvbench-net0' + right: 'nfvbench-net1' + +# Use 'true' to enable VLAN tagging of packets coming from traffic generator +# Leave empty if VLAN tagging is enabled on switch or if you want to hook directly to a NIC +# Else by default is set to true (which is the nominal use case with TOR and trunk mode to Trex) +vlan_tagging: true + +# Specify only when you want to override VLAN IDs used for tagging with own values (exactly 2). +# Default behavior of VLAN tagging is to retrieve VLAN IDs from OpenStack networks provided above. +# In case of VxLAN this setting is ignored and only vtep_vlan from traffic generator profile is used. +# Example: [1998, 1999] +vlans: [] + +# Used only with EXT chain. MAC addresses of traffic generator ports are used as destination +# if 'no_arp' is set to 'true'. Otherwise ARP requests are sent to find out destination MAC addresses. +no_arp: false + +# Traffic Profiles +# You can add here more profiles as needed +# `l2frame_size` can be specified in any none zero integer value to represent the size in bytes +# of the L2 frame, or "IMIX" to represent the standard 3-packet size mixed sequence (IMIX1). +traffic_profile: + - name: traffic_profile_64B + l2frame_size: ['64'] + - name: traffic_profile_IMIX + l2frame_size: ['IMIX'] + - name: traffic_profile_1518B + l2frame_size: ['1518'] + - name: traffic_profile_3sizes + l2frame_size: ['64', 'IMIX', '1518'] + +# Traffic Configuration +# bidirectional: to have traffic generated from both direction, set bidirectional to true +# profile: must be one of the profiles defined in traffic_profile +# The traffic profile can be overriden with the options --frame-size and --uni-dir +traffic: + bidirectional: true + profile: traffic_profile_64B + +# Check config and connectivity only - do not generate traffic +# Can be overriden by --no-traffic +no_traffic: false + +# Do not reset tx/rx counters prior to running +# Can be overriden by --no-reset +no_reset: false + +# Test configuration + +# The rate pps for traffic going in reverse direction in case of unidirectional flow. Default to 1. +unidir_reverse_traffic_pps: 1 + +# The rate specifies if NFVbench should determine the NDR/PDR +# or if NFVbench should just generate traffic at a given fixed rate +# for a given duration (called "single run" mode) +# Supported rate format: +# NDR/PDR test: `ndr`, `pdr`, `ndr_pdr` (default) +# Or for single run mode: +# Packet per second: pps (e.g. `50pps`) +# Bits per second: bps, kbps, Mbps, etc (e.g. `1Gbps`, `1000bps`) +# Load percentage: % (e.g. `50%`) +# Can be overridden by --rate +rate: ndr_pdr + +# Default run duration (single run at given rate only) +# Can be overridden by --duration +duration_sec: 60 + +# Interval between intermediate reports when interval reporting is enabled +# Can be overridden by --interval +interval_sec: 10 + +# NDR / PDR configuration ZZ +measurement: + # Drop rates represent the ratio of dropped packet to the total number of packets sent. + # Values provided here are percentages. A value of 0.01 means that at most 0.01% of all + # packets sent are dropped (or 1 packet every 10,000 packets sent) + + # No Drop Rate in percentage; Default to 0.001% + NDR: 0.001 + # Partial Drop Rate in percentage; NDR should always be less than PDR + PDR: 0.1 + # The accuracy of NDR and PDR load percentiles; The actual load percentile that match NDR + # or PDR should be within `load_epsilon` difference than the one calculated. + load_epsilon: 0.1 + +# Location where to store results in a JSON format. Must be container specific path. +# Can be overriden by --json +json: + +# Location where to store results in the NFVbench standard JSON format: +# <service-chain-type>-<service-chain-count>-<flow-count>-<packet-sizes>.json +# Example: PVP-1-10-64-IMIX.json +# Must be container specific path. +# Can be overriden by --std-json +std_json: + +# Prints debug messages (verbose mode) +# Can be overriden by --debug +debug: false + +# Module and class name of factory which will be used to provide classes dynamically for other components. +factory_module: 'nfvbench.factory' +factory_class: 'BasicFactory'
\ No newline at end of file diff --git a/nfvbench/chain_clients.py b/nfvbench/chain_clients.py new file mode 100644 index 0000000..4be050f --- /dev/null +++ b/nfvbench/chain_clients.py @@ -0,0 +1,564 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import compute +from glanceclient.v2 import client as glanceclient +from log import LOG +from neutronclient.neutron import client as neutronclient +from novaclient.client import Client +import os +import time + + +class StageClientException(Exception): + pass + + +class BasicStageClient(object): + """Client for spawning and accessing the VM setup""" + + nfvbenchvm_config_name = 'nfvbenchvm.conf' + + def __init__(self, config, cred): + self.comp = None + self.image_instance = None + self.config = config + self.cred = cred + self.nets = [] + self.vms = [] + self.created_ports = [] + self.ports = {} + self.compute_nodes = set([]) + self.comp = None + self.neutron = None + self.flavor_type = {'is_reuse': True, 'flavor': None} + self.host_ips = None + + def _ensure_vms_active(self): + for _ in range(self.config.generic_retry_count): + for i, instance in enumerate(self.vms): + if instance.status == 'ACTIVE': + continue + is_reuse = getattr(instance, 'is_reuse', True) + instance = self.comp.poll_server(instance) + if instance.status == 'ERROR': + raise StageClientException('Instance creation error: %s' % + instance.fault['message']) + if instance.status == 'ACTIVE': + LOG.info('Created instance: %s', instance.name) + self.vms[i] = instance + setattr(self.vms[i], 'is_reuse', is_reuse) + if all(map(lambda instance: instance.status == 'ACTIVE', self.vms)): + return + time.sleep(self.config.generic_poll_sec) + raise StageClientException('Timed out waiting for VMs to spawn') + + def _setup_openstack_clients(self): + self.session = self.cred.get_session() + nova_client = Client(2, session=self.session) + self.neutron = neutronclient.Client('2.0', session=self.session) + self.glance_client = glanceclient.Client('2', + session=self.session) + self.comp = compute.Compute(nova_client, self.glance_client, self.neutron, self.config) + + def _lookup_network(self, network_name): + networks = self.neutron.list_networks(name=network_name) + return networks['networks'][0] if networks['networks'] else None + + def _create_net(self, name, subnet, cidr, network_type=None, segmentation_id=None): + network = self._lookup_network(name) + if network: + phys_net = self.config.internal_networks.physical_network + if segmentation_id is not None and phys_net is not None: + if network['provider:segmentation_id'] != segmentation_id: + raise StageClientException("Mismatch of 'segmentation_id' for reused " + "network '{net}'. Network has id '{seg_id1}', " + "configuration requires '{seg_id2}'." + .format(net=name, + seg_id1=network['provider:segmentation_id'], + seg_id2=segmentation_id)) + + if network['provider:physical_network'] != phys_net: + raise StageClientException("Mismatch of 'physical_network' for reused " + "network '{net}'. Network has '{phys1}', " + "configuration requires '{phys2}'." + .format(net=name, + phys1=network['provider:physical_network'], + phys2=phys_net)) + + LOG.info('Reusing existing network: ' + name) + network['is_reuse'] = True + return network + + body = { + 'network': { + 'name': name, + 'admin_state_up': True + } + } + + if network_type: + body['network']['provider:network_type'] = network_type + phys_net = self.config.internal_networks.physical_network + if segmentation_id is not None and phys_net is not None: + body['network']['provider:segmentation_id'] = segmentation_id + body['network']['provider:physical_network'] = phys_net + + network = self.neutron.create_network(body)['network'] + body = { + 'subnet': { + 'name': subnet, + 'cidr': cidr, + 'network_id': network['id'], + 'enable_dhcp': False, + 'ip_version': 4, + 'dns_nameservers': [] + } + } + subnet = self.neutron.create_subnet(body)['subnet'] + # add subnet id to the network dict since it has just been added + network['subnets'] = [subnet['id']] + network['is_reuse'] = False + LOG.info('Created network: %s.' % name) + return network + + def _create_port(self, net): + body = { + "port": { + 'network_id': net['id'], + 'binding:vnic_type': 'direct' if self.config.sriov else 'normal' + } + } + port = self.neutron.create_port(body) + return port['port'] + + def __delete_port(self, port): + retry = 0 + while retry < self.config.generic_retry_count: + try: + self.neutron.delete_port(port['id']) + return + except Exception: + retry += 1 + time.sleep(self.config.generic_poll_sec) + LOG.error('Unable to delete port: %s' % (port['id'])) + + def __delete_net(self, network): + retry = 0 + while retry < self.config.generic_retry_count: + try: + self.neutron.delete_network(network['id']) + return + except Exception: + retry += 1 + time.sleep(self.config.generic_poll_sec) + LOG.error('Unable to delete network: %s' % (network['name'])) + + def __get_server_az(self, server): + availability_zone = getattr(server, 'OS-EXT-AZ:availability_zone', None) + host = getattr(server, 'OS-EXT-SRV-ATTR:host', None) + if availability_zone is None: + return None + if host is None: + return None + return availability_zone + ':' + host + + def _lookup_servers(self, name=None, nets=None, az=None, flavor_id=None): + error_msg = 'VM with the same name, but non-matching {} found. Aborting.' + networks = set(map(lambda net: net['name'], nets)) if nets else None + server_list = self.comp.get_server_list() + matching_servers = [] + + for server in server_list: + if name and server.name != name: + continue + + if az and self.__get_server_az(server) != az: + raise StageClientException(error_msg.format('availability zones')) + + if flavor_id and server.flavor['id'] != flavor_id: + raise StageClientException(error_msg.format('flavors')) + + if networks and not set(server.networks.keys()).issuperset(networks): + raise StageClientException(error_msg.format('networks')) + + if server.status != "ACTIVE": + raise StageClientException(error_msg.format('state')) + + # everything matches + matching_servers.append(server) + + return matching_servers + + def _create_server(self, name, ports, az, nfvbenchvm_config): + port_ids = map(lambda port: {'port-id': port['id']}, ports) + nfvbenchvm_config_location = os.path.join('/etc/', self.nfvbenchvm_config_name) + server = self.comp.create_server(name, + self.image_instance, + self.flavor_type['flavor'], + None, + port_ids, + None, + avail_zone=az, + user_data=None, + config_drive=True, + files={nfvbenchvm_config_location: nfvbenchvm_config}) + if server: + setattr(server, 'is_reuse', False) + LOG.info('Creating instance: %s on %s' % (name, az)) + else: + raise StageClientException('Unable to create instance: %s.' % (name)) + return server + + def _setup_resources(self): + if not self.image_instance: + self.image_instance = self.comp.find_image(self.config.image_name) + if self.image_instance is None: + if self.config.vm_image_file: + LOG.info('%s: image for VM not found, trying to upload it ...' + % self.config.image_name) + res = self.comp.upload_image_via_url(self.config.image_name, + self.config.vm_image_file) + + if not res: + raise StageClientException('Error uploading image %s from %s. ABORTING.' + % (self.config.image_name, + self.config.vm_image_file)) + self.image_instance = self.comp.find_image(self.config.image_name) + else: + raise StageClientException('%s: image to launch VM not found. ABORTING.' + % self.config.image_name) + + LOG.info('Found image %s to launch VM' % self.config.image_name) + + self.__setup_flavor() + + def __setup_flavor(self): + if self.flavor_type.get('flavor', False): + return + + self.flavor_type['flavor'] = self.comp.find_flavor(self.config.flavor_type) + if self.flavor_type['flavor']: + self.flavor_type['is_reuse'] = True + else: + flavor_dict = self.config.flavor + extra_specs = flavor_dict.pop('extra_specs', None) + + self.flavor_type['flavor'] = self.comp.create_flavor(self.config.flavor_type, + override=True, + **flavor_dict) + + LOG.info("Flavor '%s' was created." % self.config.flavor_type) + + if extra_specs: + self.flavor_type['flavor'].set_keys(extra_specs) + + self.flavor_type['is_reuse'] = False + + if self.flavor_type['flavor'] is None: + raise StageClientException('%s: flavor to launch VM not found. ABORTING.' + % self.config.flavor_type) + + def __delete_flavor(self, flavor): + if self.comp.delete_flavor(flavor=flavor): + LOG.info("Flavor '%s' deleted" % self.config.flavor_type) + self.flavor_type = {'is_reuse': False, 'flavor': None} + else: + LOG.error('Unable to delete flavor: %s' % self.config.flavor_type) + + def get_config_file(self, chain_index, src_mac, dst_mac): + boot_script_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), + 'nfvbenchvm/', self.nfvbenchvm_config_name) + + with open(boot_script_file, 'r') as boot_script: + content = boot_script.read() + + g1cidr = self.config.generator_config.src_device.gateway_ip_list[chain_index] + '/8' + g2cidr = self.config.generator_config.dst_device.gateway_ip_list[chain_index] + '/8' + + vm_config = { + 'forwarder': self.config.vm_forwarder, + 'tg_gateway1_ip': self.config.traffic_generator.tg_gateway_ip_addrs[0], + 'tg_gateway2_ip': self.config.traffic_generator.tg_gateway_ip_addrs[1], + 'tg_net1': self.config.traffic_generator.ip_addrs[0], + 'tg_net2': self.config.traffic_generator.ip_addrs[1], + 'vnf_gateway1_cidr': g1cidr, + 'vnf_gateway2_cidr': g2cidr, + 'tg_mac1': src_mac, + 'tg_mac2': dst_mac + } + + return content.format(**vm_config) + + def set_ports(self): + """Stores all ports of NFVbench networks.""" + nets = self.get_networks_uuids() + for port in self.neutron.list_ports()['ports']: + if port['network_id'] in nets: + ports = self.ports.setdefault(port['network_id'], []) + ports.append(port) + + def disable_port_security(self): + """ + Disable security at port level. + """ + vm_ids = map(lambda vm: vm.id, self.vms) + for net in self.nets: + for port in self.ports[net['id']]: + if port['device_id'] in vm_ids: + self.neutron.update_port(port['id'], { + 'port': { + 'security_groups': [], + 'port_security_enabled': False, + } + }) + LOG.info('Security disabled on port {}'.format(port['id'])) + + def get_loop_vm_hostnames(self): + return [getattr(vm, 'OS-EXT-SRV-ATTR:hypervisor_hostname') for vm in self.vms] + + def get_host_ips(self): + '''Return the IP adresss(es) of the host compute nodes for this VMclient instance. + Returns a list of 1 IP adress or 2 IP addresses (PVVP inter-node) + ''' + if not self.host_ips: + # get the hypervisor object from the host name + self.host_ips = [self.comp.get_hypervisor( + getattr(vm, 'OS-EXT-SRV-ATTR:hypervisor_hostname')).host_ip + for vm in self.vms] + return self.host_ips + + def get_loop_vm_compute_nodes(self): + compute_nodes = [] + for vm in self.vms: + az = getattr(vm, 'OS-EXT-AZ:availability_zone') + hostname = getattr(vm, 'OS-EXT-SRV-ATTR:hypervisor_hostname') + compute_nodes.append(az + ':' + hostname) + return compute_nodes + + def get_reusable_vm(self, name, nets, az): + servers = self._lookup_servers(name=name, nets=nets, az=az, + flavor_id=self.flavor_type['flavor'].id) + if servers: + server = servers[0] + LOG.info('Reusing existing server: ' + name) + setattr(server, 'is_reuse', True) + return server + else: + return None + + def get_networks_uuids(self): + """ + Extract UUID of used networks. Order is important. + + :return: list of UUIDs of created networks + """ + return [net['id'] for net in self.nets] + + def get_vlans(self): + """ + Extract vlans of used networks. Order is important. + + :return: list of UUIDs of created networks + """ + vlans = [] + for net in self.nets: + assert(net['provider:network_type'] == 'vlan') + vlans.append(net['provider:segmentation_id']) + + return vlans + + def setup(self): + """ + Creates two networks and spawn a VM which act as a loop VM connected + with the two networks. + """ + self._setup_openstack_clients() + + def dispose(self, only_vm=False): + """ + Deletes the created two networks and the VM. + """ + for vm in self.vms: + if vm: + if not getattr(vm, 'is_reuse', True): + self.comp.delete_server(vm) + else: + LOG.info('Server %s not removed since it is reused' % vm.name) + + for port in self.created_ports: + self.__delete_port(port) + + if not only_vm: + for net in self.nets: + if 'is_reuse' in net and not net['is_reuse']: + self.__delete_net(net) + else: + LOG.info('Network %s not removed since it is reused' % (net['name'])) + + if not self.flavor_type['is_reuse']: + self.__delete_flavor(self.flavor_type['flavor']) + + +class EXTStageClient(BasicStageClient): + + def __init__(self, config, cred): + super(EXTStageClient, self).__init__(config, cred) + + def setup(self): + super(EXTStageClient, self).setup() + + # Lookup two existing networks + for net_name in [self.config.external_networks.left, self.config.external_networks.right]: + net = self._lookup_network(net_name) + if net: + self.nets.append(net) + else: + raise StageClientException('Existing network {} cannot be found.'.format(net_name)) + + +class PVPStageClient(BasicStageClient): + + def __init__(self, config, cred): + super(PVPStageClient, self).__init__(config, cred) + + def get_end_port_macs(self): + vm_ids = map(lambda vm: vm.id, self.vms) + port_macs = [] + for index, net in enumerate(self.nets): + vm_mac_map = {port['device_id']: port['mac_address'] for port in self.ports[net['id']]} + port_macs.append([vm_mac_map[vm_id] for vm_id in vm_ids]) + return port_macs + + def setup(self): + super(PVPStageClient, self).setup() + self._setup_resources() + + # Create two networks + nets = self.config.internal_networks + self.nets.extend([self._create_net(**n) for n in [nets.left, nets.right]]) + + az_list = self.comp.get_enabled_az_host_list(required_count=1) + if not az_list: + raise Exception('Not enough hosts found.') + + az = az_list[0] + self.compute_nodes.add(az) + for chain_index in xrange(self.config.service_chain_count): + name = self.config.loop_vm_name + str(chain_index) + reusable_vm = self.get_reusable_vm(name, self.nets, az) + if reusable_vm: + self.vms.append(reusable_vm) + else: + config_file = self.get_config_file(chain_index, + self.config.generator_config.src_device.mac, + self.config.generator_config.dst_device.mac) + + ports = [self._create_port(net) for net in self.nets] + self.created_ports.extend(ports) + self.vms.append(self._create_server(name, ports, az, config_file)) + self._ensure_vms_active() + self.set_ports() + + +class PVVPStageClient(BasicStageClient): + + def __init__(self, config, cred): + super(PVVPStageClient, self).__init__(config, cred) + + def get_end_port_macs(self): + port_macs = [] + for index, net in enumerate(self.nets[:2]): + vm_ids = map(lambda vm: vm.id, self.vms[index::2]) + vm_mac_map = {port['device_id']: port['mac_address'] for port in self.ports[net['id']]} + port_macs.append([vm_mac_map[vm_id] for vm_id in vm_ids]) + return port_macs + + def setup(self): + super(PVVPStageClient, self).setup() + self._setup_resources() + + # Create two networks + nets = self.config.internal_networks + self.nets.extend([self._create_net(**n) for n in [nets.left, nets.right, nets.middle]]) + + required_count = 2 if self.config.inter_node else 1 + az_list = self.comp.get_enabled_az_host_list(required_count=required_count) + + if not az_list: + raise Exception('Not enough hosts found.') + + az1 = az2 = az_list[0] + if self.config.inter_node: + if len(az_list) > 1: + az1 = az_list[0] + az2 = az_list[1] + else: + # fallback to intra-node + az1 = az2 = az_list[0] + self.config.inter_node = False + LOG.info('Using intra-node instead of inter-node.') + + self.compute_nodes.add(az1) + self.compute_nodes.add(az2) + + # Create loop VMs + for chain_index in xrange(self.config.service_chain_count): + name0 = self.config.loop_vm_name + str(chain_index) + 'a' + # Attach first VM to net0 and net2 + vm0_nets = self.nets[0::2] + reusable_vm0 = self.get_reusable_vm(name0, vm0_nets, az1) + + name1 = self.config.loop_vm_name + str(chain_index) + 'b' + # Attach second VM to net1 and net2 + vm1_nets = self.nets[1:] + reusable_vm1 = self.get_reusable_vm(name1, vm1_nets, az2) + + if reusable_vm0 and reusable_vm1: + self.vms.extend([reusable_vm0, reusable_vm1]) + else: + vm0_port_net0 = self._create_port(vm0_nets[0]) + vm0_port_net2 = self._create_port(vm0_nets[1]) + + vm1_port_net2 = self._create_port(vm1_nets[1]) + vm1_port_net1 = self._create_port(vm1_nets[0]) + + self.created_ports.extend([vm0_port_net0, + vm0_port_net2, + vm1_port_net2, + vm1_port_net1]) + + # order of ports is important for sections below + # order of MAC addresses needs to follow order of interfaces + # TG0 (net0) -> VM0 (net2) -> VM1 (net2) -> TG1 (net1) + config_file0 = self.get_config_file(chain_index, + self.config.generator_config.src_device.mac, + vm1_port_net2['mac_address']) + config_file1 = self.get_config_file(chain_index, + vm0_port_net2['mac_address'], + self.config.generator_config.dst_device.mac) + + self.vms.append(self._create_server(name0, + [vm0_port_net0, vm0_port_net2], + az1, + config_file0)) + self.vms.append(self._create_server(name1, + [vm1_port_net2, vm1_port_net1], + az2, + config_file1)) + + self._ensure_vms_active() + self.set_ports() diff --git a/nfvbench/chain_managers.py b/nfvbench/chain_managers.py new file mode 100644 index 0000000..fe3a2d4 --- /dev/null +++ b/nfvbench/chain_managers.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from log import LOG +from network import Network +from packet_analyzer import PacketAnalyzer +from specs import ChainType +from stats_collector import IntervalCollector +import time + + +class StageManager(object): + + def __init__(self, config, cred, factory): + self.config = config + self.client = None + # conditions due to EXT chain special cases + if (config.vlan_tagging and not config.vlans) or not config.no_int_config: + VM_CLASS = factory.get_stage_class(config.service_chain) + self.client = VM_CLASS(config, cred) + self.client.setup() + + def get_vlans(self): + return self.client.get_vlans() if self.client else [] + + def get_host_ips(self): + return self.client.get_host_ips() + + def get_networks_uuids(self): + return self.client.get_networks_uuids() + + def disable_port_security(self): + self.client.disable_port_security() + + def get_vms(self): + return self.client.vms + + def get_nets(self): + return self.client.nets + + def get_ports(self): + return self.client.ports + + def get_compute_nodes(self): + return self.client.compute_nodes + + def set_vm_macs(self): + if self.client and self.config.service_chain != ChainType.EXT: + self.config.generator_config.set_vm_mac_list(self.client.get_end_port_macs()) + + def close(self): + if not self.config.no_cleanup and self.client: + self.client.dispose() + + +class StatsManager(object): + + def __init__(self, config, clients, specs, factory, vlans, notifier=None): + self.config = config + self.clients = clients + self.specs = specs + self.notifier = notifier + self.interval_collector = None + self.vlans = vlans + self.factory = factory + self._setup() + + def set_vlan_tag(self, device, vlan): + self.worker.set_vlan_tag(device, vlan) + + def _setup(self): + WORKER_CLASS = self.factory.get_chain_worker(self.specs.openstack.encaps, + self.config.service_chain) + self.worker = WORKER_CLASS(self.config, self.clients, self.specs) + self.worker.set_vlans(self.vlans) + self._config_interfaces() + + def _get_data(self): + return self.worker.get_data() + + def _get_network(self, traffic_port, index=None, reverse=False): + interfaces = [self.clients['traffic'].get_interface(traffic_port)] + interfaces.extend(self.worker.get_network_interfaces(index)) + return Network(interfaces, reverse) + + def _config_interfaces(self): + if self.config.service_chain != ChainType.EXT: + self.clients['vm'].disable_port_security() + + self.worker.config_interfaces() + + def _generate_traffic(self): + if self.config.no_traffic: + return + + self.interval_collector = IntervalCollector(time.time()) + self.interval_collector.attach_notifier(self.notifier) + LOG.info('Starting to generate traffic...') + stats = {} + for stats in self.clients['traffic'].run_traffic(): + self.interval_collector.add(stats) + + LOG.info('...traffic generating ended.') + return stats + + def get_stats(self): + return self.interval_collector.get() if self.interval_collector else [] + + def get_version(self): + return self.worker.get_version() + + def run(self): + """ + Run analysis in both direction and return the analysis + """ + self.worker.run() + + stats = self._generate_traffic() + result = { + 'raw_data': self._get_data(), + 'packet_analysis': {}, + 'stats': stats + } + + LOG.info('Requesting packet analysis on the forward direction...') + result['packet_analysis']['direction-forward'] = \ + self.get_analysis([self._get_network(0, 0), + self._get_network(0, 1, reverse=True)]) + LOG.info('Packet analysis on the forward direction completed') + + LOG.info('Requesting packet analysis on the reverse direction...') + result['packet_analysis']['direction-reverse'] = \ + self.get_analysis([self._get_network(1, 1), + self._get_network(1, 0, reverse=True)]) + + LOG.info('Packet analysis on the reverse direction completed') + return result + + def get_compute_nodes_bios(self): + return self.worker.get_compute_nodes_bios() + + @staticmethod + def get_analysis(nets): + LOG.info('Starting traffic analysis...') + + packet_analyzer = PacketAnalyzer() + # Traffic types are assumed to always alternate in every chain. Add a no stats interface in + # between if that is not the case. + tx = True + for network in nets: + for interface in network.get_interfaces(): + packet_analyzer.record(interface, 'tx' if tx else 'rx') + tx = not tx + + LOG.info('...traffic analysis completed') + return packet_analyzer.get_analysis() + + def close(self): + self.worker.close() + + +class PVPStatsManager(StatsManager): + + def __init__(self, config, clients, specs, factory, vlans, notifier=None): + StatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) + + +class PVVPStatsManager(StatsManager): + + def __init__(self, config, clients, specs, factory, vlans, notifier=None): + StatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) + + def run(self): + """ + Run analysis in both direction and return the analysis + """ + fwd_v2v_net, rev_v2v_net = self.worker.run() + + stats = self._generate_traffic() + result = { + 'raw_data': self._get_data(), + 'packet_analysis': {}, + 'stats': stats + } + + fwd_nets = [self._get_network(0, 0)] + if fwd_v2v_net: + fwd_nets.append(fwd_v2v_net) + fwd_nets.append(self._get_network(0, 1, reverse=True)) + + rev_nets = [self._get_network(1, 1)] + if rev_v2v_net: + rev_nets.append(rev_v2v_net) + rev_nets.append(self._get_network(1, 0, reverse=True)) + + LOG.info('Requesting packet analysis on the forward direction...') + result['packet_analysis']['direction-forward'] = self.get_analysis(fwd_nets) + LOG.info('Packet analysis on the forward direction completed') + + LOG.info('Requesting packet analysis on the reverse direction...') + result['packet_analysis']['direction-reverse'] = self.get_analysis(rev_nets) + + LOG.info('Packet analysis on the reverse direction completed') + return result + + +class EXTStatsManager(StatsManager): + def __init__(self, config, clients, specs, factory, vlans, notifier=None): + StatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) + + def _setup(self): + WORKER_CLASS = self.factory.get_chain_worker(self.specs.openstack.encaps, + self.config.service_chain) + self.worker = WORKER_CLASS(self.config, self.clients, self.specs) + self.worker.set_vlans(self.vlans) + + if not self.config.no_int_config: + self._config_interfaces() diff --git a/nfvbench/chain_runner.py b/nfvbench/chain_runner.py new file mode 100644 index 0000000..2e222de --- /dev/null +++ b/nfvbench/chain_runner.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from log import LOG +from service_chain import ServiceChain +import traceback +from traffic_client import TrafficClient + + +class ChainRunner(object): + """Run selected chain, collect results and analyse them.""" + + def __init__(self, config, clients, cred, specs, factory, notifier=None): + self.config = config + self.clients = clients + self.specs = specs + self.factory = factory + self.chain_name = self.config.service_chain + + try: + TORClass = factory.get_tor_class(self.config.tor.type, self.config.no_tor_access) + except AttributeError: + raise Exception("Requested TOR class '{}' was not found.".format(self.config.tor.type)) + + self.clients['tor'] = TORClass(self.config.tor.switches) + self.clients['traffic'] = TrafficClient(config, notifier) + self.chain = ServiceChain(config, clients, cred, specs, factory, notifier) + + LOG.info('ChainRunner initialized.') + + def run(self): + """ + Run a chain, collect and analyse results. + + :return: dictionary + """ + self.clients['traffic'].start_traffic_generator() + self.clients['traffic'].set_macs() + + return self.chain.run() + + def close(self): + try: + if not self.config.no_cleanup: + LOG.info('Cleaning up...') + else: + LOG.info('Clean up skipped.') + + for client in ['traffic', 'tor']: + try: + self.clients[client].close() + except Exception as e: + traceback.print_exc() + LOG.error(e) + + self.chain.close() + except Exception: + traceback.print_exc() + LOG.error('Cleanup not finished.') + + def get_version(self): + versions = { + 'Traffic Generator': self.clients['traffic'].get_version(), + 'TOR': self.clients['tor'].get_version(), + } + + versions.update(self.chain.get_version()) + + return versions diff --git a/nfvbench/chain_workers.py b/nfvbench/chain_workers.py new file mode 100644 index 0000000..2e36fb1 --- /dev/null +++ b/nfvbench/chain_workers.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# Copyright 2017 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +class BasicWorker(object): + + def __init__(self, config, clients, specs): + self.config = config + self.clients = clients + self.specs = specs + + def set_vlan_tag(self, device, vlan): + device.set_vlan_tag(vlan) + + def set_vlans(self, vlans): + pass + + def config_interfaces(self): + pass + + def get_data(self): + return {} + + def get_network_interfaces(self, index): + return [] + + def clear_interfaces(self): + pass + + def run(self): + return None, None + + def get_compute_nodes_bios(self): + return {} + + def get_version(self): + return {} + + def close(self): + pass diff --git a/nfvbench/compute.py b/nfvbench/compute.py new file mode 100644 index 0000000..c8ec383 --- /dev/null +++ b/nfvbench/compute.py @@ -0,0 +1,483 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''Module for Openstack compute operations''' +from glanceclient import exc as glance_exception +import keystoneauth1 +from log import LOG +import novaclient +import os +import time +import traceback + + +try: + from glanceclient.openstack.common.apiclient.exceptions import NotFound as GlanceImageNotFound +except ImportError: + from glanceclient.v1.apiclient.exceptions import NotFound as GlanceImageNotFound + + +class Compute(object): + + def __init__(self, nova_client, glance_client, neutron_client, config): + self.novaclient = nova_client + self.glance_client = glance_client + self.neutronclient = neutron_client + self.config = config + + def find_image(self, image_name): + try: + return next(self.glance_client.images.list(filters={'name': image_name}), None) + except (novaclient.exceptions.NotFound, keystoneauth1.exceptions.http.NotFound, + GlanceImageNotFound): + pass + return None + + def upload_image_via_url(self, final_image_name, image_file, retry_count=60): + ''' + Directly uploads image to Nova via URL if image is not present + ''' + retry = 0 + try: + # check image is file/url based. + file_prefix = "file://" + image_location = image_file.split(file_prefix)[1] + with open(image_location) as f_image: + img = self.glance_client.images.create(name=str(final_image_name), + disk_format="qcow2", + container_format="bare", + visibility="public") + self.glance_client.images.upload(img.id, image_data=f_image) + # Check for the image in glance + while img.status in ['queued', 'saving'] and retry < retry_count: + img = self.glance_client.images.get(img.id) + retry += 1 + LOG.debug("Image not yet active, retrying %s of %s...", retry, retry_count) + time.sleep(self.config.generic_poll_sec) + if img.status != 'active': + LOG.error("Image uploaded but too long to get to active state") + raise Exception("Image update active state timeout") + except glance_exception.HTTPForbidden: + LOG.error("Cannot upload image without admin access. Please make " + "sure the image is uploaded and is either public or owned by you.") + return False + except IOError: + # catch the exception for file based errors. + LOG.error("Failed while uploading the image. Please make sure the " + "image at the specified location %s is correct.", image_file) + return False + except keystoneauth1.exceptions.http.NotFound as exc: + LOG.error("Authentication error while uploading the image:" + str(exc)) + return False + except Exception: + LOG.error(traceback.format_exc()) + LOG.error("Failed while uploading the image, please make sure the " + "cloud under test has the access to file: %s.", image_file) + return False + return True + + def delete_image(self, img_name): + try: + LOG.log("Deleting image %s...", img_name) + img = self.glance_client.images.find(name=img_name) + self.glance_client.images.delete(img.id) + except Exception: + LOG.error("Failed to delete the image %s.", img_name) + return False + + return True + + # Remove keypair name from openstack if exists + def remove_public_key(self, name): + keypair_list = self.novaclient.keypairs.list() + for key in keypair_list: + if key.name == name: + self.novaclient.keypairs.delete(name) + LOG.info('Removed public key %s', name) + break + + # Test if keypair file is present if not create it + def create_keypair(self, name, private_key_pair_file): + self.remove_public_key(name) + keypair = self.novaclient.keypairs.create(name) + # Now write the keypair to the file if requested + if private_key_pair_file: + kpf = os.open(private_key_pair_file, + os.O_WRONLY | os.O_CREAT, 0o600) + with os.fdopen(kpf, 'w') as kpf: + kpf.write(keypair.private_key) + return keypair + + # Add an existing public key to openstack + def add_public_key(self, name, public_key_file): + self.remove_public_key(name) + # extract the public key from the file + public_key = None + try: + with open(os.path.expanduser(public_key_file)) as pkf: + public_key = pkf.read() + except IOError as exc: + LOG.error('Cannot open public key file %s: %s', public_key_file, exc) + return None + keypair = self.novaclient.keypairs.create(name, public_key) + return keypair + + def init_key_pair(self, kp_name, ssh_access): + '''Initialize the key pair for all test VMs + if a key pair is specified in access, use that key pair else + create a temporary key pair + ''' + if ssh_access.public_key_file: + return self.add_public_key(kp_name, ssh_access.public_key_file) + else: + keypair = self.create_keypair(kp_name, None) + ssh_access.private_key = keypair.private_key + return keypair + + def find_network(self, label): + net = self.novaclient.networks.find(label=label) + return net + + # Create a server instance with name vmname + # and check that it gets into the ACTIVE state + def create_server(self, vmname, image, flavor, key_name, + nic, sec_group, avail_zone=None, user_data=None, + config_drive=None, files=None): + + if sec_group: + security_groups = [sec_group['id']] + else: + security_groups = None + + # Also attach the created security group for the test + instance = self.novaclient.servers.create(name=vmname, + image=image, + flavor=flavor, + key_name=key_name, + nics=nic, + availability_zone=avail_zone, + userdata=user_data, + config_drive=config_drive, + files=files, + security_groups=security_groups) + return instance + + def poll_server(self, instance): + return self.novaclient.servers.get(instance.id) + + def get_server_list(self): + servers_list = self.novaclient.servers.list() + return servers_list + + def find_floating_ips(self): + floating_ip = self.novaclient.floating_ips.list() + return floating_ip + + def create_floating_ips(self, pool): + return self.novaclient.floating_ips.create(pool) + + # Return the server network for a server + def find_server_network(self, vmname): + servers_list = self.get_server_list() + for server in servers_list: + if server.name == vmname and server.status == "ACTIVE": + return server.networks + return None + + # Returns True if server is present false if not. + # Retry for a few seconds since after VM creation sometimes + # it takes a while to show up + def find_server(self, vmname, retry_count): + for retry_attempt in range(retry_count): + servers_list = self.get_server_list() + for server in servers_list: + if server.name == vmname and server.status == "ACTIVE": + return True + # Sleep between retries + LOG.debug("[%s] VM not yet found, retrying %s of %s...", + vmname, (retry_attempt + 1), retry_count) + time.sleep(self.config.generic_poll_sec) + LOG.error("[%s] VM not found, after %s attempts", vmname, retry_count) + return False + + # Returns True if server is found and deleted/False if not, + # retry the delete if there is a delay + def delete_server_by_name(self, vmname): + servers_list = self.get_server_list() + for server in servers_list: + if server.name == vmname: + LOG.info('Deleting server %s', server) + self.novaclient.servers.delete(server) + return True + return False + + def delete_server(self, server): + self.novaclient.servers.delete(server) + + def find_flavor(self, flavor_type): + try: + flavor = self.novaclient.flavors.find(name=flavor_type) + return flavor + except Exception: + return None + + def create_flavor(self, name, ram, vcpus, disk, ephemeral=0, override=False): + if override: + self.delete_flavor(name) + return self.novaclient.flavors.create(name=name, ram=ram, vcpus=vcpus, disk=disk, + ephemeral=ephemeral) + + def delete_flavor(self, flavor=None, name=None): + try: + if not flavor: + flavor = self.find_flavor(name) + flavor.delete() + return True + except Exception: + return False + + def normalize_az_host(self, az, host): + if not az: + az = self.config.availability_zone + return az + ':' + host + + def auto_fill_az(self, host_list, host): + ''' + no az provided, if there is a host list we can auto-fill the az + else we use the configured az if available + else we return an error + ''' + if host_list: + for hyp in host_list: + if hyp.host == host: + return self.normalize_az_host(hyp.zone, host) + # no match on host + LOG.error('Passed host name does not exist: ' + host) + return None + if self.config.availability_zone: + return self.normalize_az_host(None, host) + LOG.error('--hypervisor passed without an az and no az configured') + return None + + def sanitize_az_host(self, host_list, az_host): + ''' + host_list: list of hosts as retrieved from openstack (can be empty) + az_host: either a host or a az:host string + if a host, will check host is in the list, find the corresponding az and + return az:host + if az:host is passed will check the host is in the list and az matches + if host_list is empty, will return the configured az if there is no + az passed + ''' + if ':' in az_host: + # no host_list, return as is (no check) + if not host_list: + return az_host + # if there is a host_list, extract and verify the az and host + az_host_list = az_host.split(':') + zone = az_host_list[0] + host = az_host_list[1] + for hyp in host_list: + if hyp.host == host: + if hyp.zone == zone: + # matches + return az_host + # else continue - another zone with same host name? + # no match + LOG.error('No match for availability zone and host ' + az_host) + return None + else: + return self.auto_fill_az(host_list, az_host) + + # + # Return a list of 0, 1 or 2 az:host + # + # The list is computed as follows: + # The list of all hosts is retrieved first from openstack + # if this fails, checks and az auto-fill are disabled + # + # If the user provides a list of hypervisors (--hypervisor) + # that list is checked and returned + # + # If the user provides a configured az name (config.availability_zone) + # up to the first 2 hosts from the list that match the az are returned + # + # If the user did not configure an az name + # up to the first 2 hosts from the list are returned + # Possible return values: + # [ az ] + # [ az:hyp ] + # [ az1:hyp1, az2:hyp2 ] + # [] if an error occurred (error message printed to console) + # + def get_az_host_list(self): + avail_list = [] + host_list = [] + + try: + host_list = self.novaclient.services.list() + except novaclient.exceptions.Forbidden: + LOG.warning('Operation Forbidden: could not retrieve list of hosts' + ' (likely no permission)') + + for host in host_list: + # this host must be a compute node + if host.binary != 'nova-compute' or host.state != 'up': + continue + candidate = None + if self.config.availability_zone: + if host.zone == self.config.availability_zone: + candidate = self.normalize_az_host(None, host.host) + else: + candidate = self.normalize_az_host(host.zone, host.host) + if candidate: + avail_list.append(candidate) + # pick first 2 matches at most + if len(avail_list) == 2: + break + + # if empty we insert the configured az + if not avail_list: + + if not self.config.availability_zone: + LOG.error('Availability_zone must be configured') + elif host_list: + LOG.error('No host matching the selection for availability zone: ' + + self.config.availability_zone) + avail_list = [] + else: + avail_list = [self.config.availability_zone] + return avail_list + + def get_enabled_az_host_list(self, required_count=1): + """ + Check which hypervisors are enabled and on which compute nodes they are running. + Pick required count of hosts. + + :param required_count: count of compute-nodes to return + :return: list of enabled available compute nodes + """ + host_list = [] + hypervisor_list = [] + + try: + hypervisor_list = self.novaclient.hypervisors.list() + host_list = self.novaclient.services.list() + except novaclient.exceptions.Forbidden: + LOG.warning('Operation Forbidden: could not retrieve list of hypervisors' + ' (likely no permission)') + + hypervisor_list = filter(lambda h: h.status == 'enabled' and h.state == 'up', + hypervisor_list) + if self.config.availability_zone: + host_list = filter(lambda h: h.zone == self.config.availability_zone, host_list) + + if self.config.compute_nodes: + host_list = filter(lambda h: h.host in self.config.compute_nodes, host_list) + + hosts = [h.hypervisor_hostname for h in hypervisor_list] + host_list = filter(lambda h: h.host in hosts, host_list) + + avail_list = [] + for host in host_list: + candidate = self.normalize_az_host(host.zone, host.host) + if candidate: + avail_list.append(candidate) + if len(avail_list) == required_count: + return avail_list + + return avail_list + + def get_hypervisor(self, hyper_name): + # can raise novaclient.exceptions.NotFound + # first get the id from name + hyper = self.novaclient.hypervisors.search(hyper_name)[0] + # get full hypervisor object + return self.novaclient.hypervisors.get(hyper.id) + + # Given 2 VMs test if they are running on same Host or not + def check_vm_placement(self, vm_instance1, vm_instance2): + try: + server_instance_1 = self.novaclient.servers.get(vm_instance1) + server_instance_2 = self.novaclient.servers.get(vm_instance2) + if server_instance_1.hostId == server_instance_2.hostId: + return True + else: + return False + except novaclient.exceptions: + LOG.warning("Exception in retrieving the hostId of servers") + + # Create a new security group with appropriate rules + def security_group_create(self): + # check first the security group exists + sec_groups = self.neutronclient.list_security_groups()['security_groups'] + group = [x for x in sec_groups if x['name'] == self.config.security_group_name] + if len(group) > 0: + return group[0] + + body = { + 'security_group': { + 'name': self.config.security_group_name, + 'description': 'PNS Security Group' + } + } + group = self.neutronclient.create_security_group(body)['security_group'] + self.security_group_add_rules(group) + + return group + + # Delete a security group + def security_group_delete(self, group): + if group: + LOG.info("Deleting security group") + self.neutronclient.delete_security_group(group['id']) + + # Add rules to the security group + def security_group_add_rules(self, group): + body = { + 'security_group_rule': { + 'direction': 'ingress', + 'security_group_id': group['id'], + 'remote_group_id': None + } + } + if self.config.ipv6_mode: + body['security_group_rule']['ethertype'] = 'IPv6' + body['security_group_rule']['remote_ip_prefix'] = '::/0' + else: + body['security_group_rule']['ethertype'] = 'IPv4' + body['security_group_rule']['remote_ip_prefix'] = '0.0.0.0/0' + + # Allow ping traffic + body['security_group_rule']['protocol'] = 'icmp' + body['security_group_rule']['port_range_min'] = None + body['security_group_rule']['port_range_max'] = None + self.neutronclient.create_security_group_rule(body) + + # Allow SSH traffic + body['security_group_rule']['protocol'] = 'tcp' + body['security_group_rule']['port_range_min'] = 22 + body['security_group_rule']['port_range_max'] = 22 + self.neutronclient.create_security_group_rule(body) + + # Allow TCP/UDP traffic for perf tools like iperf/nuttcp + # 5001: Data traffic (standard iperf data port) + # 5002: Control traffic (non standard) + # note that 5000/tcp is already picked by openstack keystone + body['security_group_rule']['protocol'] = 'tcp' + body['security_group_rule']['port_range_min'] = 5001 + body['security_group_rule']['port_range_max'] = 5002 + self.neutronclient.create_security_group_rule(body) + body['security_group_rule']['protocol'] = 'udp' + self.neutronclient.create_security_group_rule(body) diff --git a/nfvbench/config.py b/nfvbench/config.py new file mode 100644 index 0000000..b2972dd --- /dev/null +++ b/nfvbench/config.py @@ -0,0 +1,56 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from attrdict import AttrDict +import yaml + + +def config_load(file_name, from_cfg=None): + """Load a yaml file into a config dict, merge with from_cfg if not None + The config file content taking precedence in case of duplicate + """ + try: + with open(file_name) as fileobj: + cfg = AttrDict(yaml.safe_load(fileobj)) + except IOError: + raise Exception("Configuration file at '{}' was not found. Please use correct path " + "and verify it is visible to container if you run nfvbench in container." + .format(file_name)) + + if from_cfg: + cfg = from_cfg + cfg + + return cfg + + +def config_loads(cfg_text, from_cfg=None): + """Same as config_load but load from a string + """ + try: + cfg = AttrDict(yaml.load(cfg_text)) + except TypeError: + # empty string + cfg = AttrDict() + if from_cfg: + return from_cfg + cfg + return cfg + + +def test_config(): + cfg = config_load('a1.yaml') + cfg = config_load('a2.yaml', cfg) + cfg = config_loads('color: 500', cfg) + config_loads('') + config_loads('#') diff --git a/nfvbench/config_plugin.py b/nfvbench/config_plugin.py new file mode 100644 index 0000000..ed6b3c6 --- /dev/null +++ b/nfvbench/config_plugin.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import abc +import specs + + +class ConfigPluginBase(object): + """Base class for config plugins. Need to implement public interfaces.""" + __metaclass__ = abc.ABCMeta + + class InitializationFailure(Exception): + pass + + def __init__(self, config): + if not config: + raise ConfigPluginBase.InitializationFailure( + 'Initialization parameters need to be assigned.') + + self.config = config + + @abc.abstractmethod + def get_config(self): + """Returns updated default configuration file.""" + + @abc.abstractmethod + def get_openstack_spec(self): + """Returns OpenStack specs for host.""" + + @abc.abstractmethod + def get_run_spec(self, openstack_spec): + """Returns RunSpec for given platform.""" + + @abc.abstractmethod + def validate_config(self, cfg): + """Validate config file.""" + + @abc.abstractmethod + def prepare_results_config(self, cfg): + """This function is called before running configuration is copied. + Example usage is to remove sensitive information like switch credentials. + """ + + @abc.abstractmethod + def get_version(self): + """Returns platform version.""" + + +class ConfigPlugin(ConfigPluginBase): + """No-op config plugin class. Does not change anything.""" + + def __init__(self, config): + ConfigPluginBase.__init__(self, config) + + def get_config(self): + """Public interface for updating config file. Just returns given config.""" + return self.config + + def get_openstack_spec(self): + """Returns OpenStack specs for host.""" + return specs.OpenStackSpec() + + def get_run_spec(self, openstack_spec): + """Returns RunSpec for given platform.""" + return specs.RunSpec(self.config.no_vswitch_access, openstack_spec) + + def validate_config(self, config): + pass + + def prepare_results_config(self, cfg): + return cfg + + def get_version(self): + return {} diff --git a/nfvbench/connection.py b/nfvbench/connection.py new file mode 100644 index 0000000..0ef994f --- /dev/null +++ b/nfvbench/connection.py @@ -0,0 +1,725 @@ +# Copyright 2013: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +"""High level ssh library. +Usage examples: +Execute command and get output: + ssh = sshclient.SSH('root', 'example.com', port=33) + status, stdout, stderr = ssh.execute('ps ax') + if status: + raise Exception('Command failed with non-zero status.') + print stdout.splitlines() +Execute command with huge output: + class PseudoFile(object): + def write(chunk): + if 'error' in chunk: + email_admin(chunk) + ssh = sshclient.SSH('root', 'example.com') + ssh.run('tail -f /var/log/syslog', stdout=PseudoFile(), timeout=False) +Execute local script on remote side: + ssh = sshclient.SSH('user', 'example.com') + status, out, err = ssh.execute('/bin/sh -s arg1 arg2', + stdin=open('~/myscript.sh', 'r')) +Upload file: + ssh = sshclient.SSH('user', 'example.com') + ssh.run('cat > ~/upload/file.gz', stdin=open('/store/file.gz', 'rb')) +Eventlet: + eventlet.monkey_patch(select=True, time=True) + or + eventlet.monkey_patch() + or + sshclient = eventlet.import_patched("opentstack.common.sshclient") +""" + +import re +import select +import shlex +import socket +import StringIO +import subprocess +import sys +import threading +import time + +from log import LOG +import paramiko + +# from rally.openstack.common.gettextutils import _ + + +class ConnectionError(Exception): + pass + + +class Connection(object): + + ''' + A base connection class. Not intended to be constructed. + ''' + + def __init__(self): + self.distro_id = None + self.distro_id_like = None + self.distro_version = None + self.__get_distro() + + def close(self): + pass + + def execute(self, cmd, stdin=None, timeout=3600): + pass + + def __extract_property(self, name, input_str): + expr = name + r'="?([\w\.]*)"?' + match = re.search(expr, input_str) + if match: + return match.group(1) + return 'Unknown' + + # Get the linux distro + def __get_distro(self): + '''cat /etc/*-release | grep ID + Ubuntu: + DISTRIB_ID=Ubuntu + ID=ubuntu + ID_LIKE=debian + VERSION_ID="14.04" + RHEL: + ID="rhel" + ID_LIKE="fedora" + VERSION_ID="7.0" + ''' + distro_cmd = "grep ID /etc/*-release" + (status, distro_out, _) = self.execute(distro_cmd) + if status: + distro_out = '' + self.distro_id = self.__extract_property('ID', distro_out) + self.distro_id_like = self.__extract_property('ID_LIKE', distro_out) + self.distro_version = self.__extract_property('VERSION_ID', distro_out) + + def pidof(self, proc_name): + ''' + Return a list containing the pids of all processes of a given name + the list is empty if there is no pid + ''' + # the path update is necessary for RHEL + cmd = "PATH=$PATH:/usr/sbin pidof " + proc_name + (status, cmd_output, _) = self.execute(cmd) + if status: + return [] + cmd_output = cmd_output.strip() + result = cmd_output.split() + return result + + # kill pids in the given list of pids + def kill_proc(self, pid_list): + cmd = "kill -9 " + ' '.join(pid_list) + self.execute(cmd) + + # check stats for a given path + def stat(self, path): + (status, cmd_output, _) = self.execute('stat ' + path) + if status: + return None + return cmd_output + + def ping_check(self, target_ip, ping_count=2, pass_threshold=80): + '''helper function to ping from one host to an IP address, + for a given count and pass_threshold; + Steps: + ssh to the host and then ping to the target IP + then match the output and verify that the loss% is + less than the pass_threshold% + Return 1 if the criteria passes + Return 0, if it fails + ''' + cmd = "ping -c " + str(ping_count) + " " + str(target_ip) + (_, cmd_output, _) = self.execute(cmd) + + match = re.search(r'(\d*)% packet loss', cmd_output) + pkt_loss = match.group(1) + if int(pkt_loss) < int(pass_threshold): + return 1 + else: + LOG.error('Ping to %s failed: %s', target_ip, cmd_output) + return 0 + + def read_remote_file(self, from_path): + ''' + Read a remote file and save it to a buffer. + ''' + cmd = "cat " + from_path + (status, cmd_output, _) = self.execute(cmd) + if status: + return None + return cmd_output + + def get_host_os_version(self): + ''' + Identify the host distribution/relase. + ''' + os_release_file = "/etc/os-release" + sys_release_file = "/etc/system-release" + name = "" + version = "" + + if self.stat(os_release_file): + data = self.read_remote_file(os_release_file) + if data is None: + LOG.error("Failed to read file %s", os_release_file) + return None + + for line in data.splitlines(): + mobj = re.match(r'NAME=(.*)', line) + if mobj: + name = mobj.group(1).strip("\"") + + mobj = re.match(r'VERSION_ID=(.*)', line) + if mobj: + version = mobj.group(1).strip("\"") + + os_name = name + " " + version + return os_name + + if self.stat(sys_release_file): + data = self.read_remote_file(sys_release_file) + if data is None: + LOG.error("Failed to read file %s", sys_release_file) + return None + + for line in data.splitlines(): + mobj = re.match(r'Red Hat.*', line) + if mobj: + return mobj.group(0) + + return None + + def check_rpm_package_installed(self, rpm_pkg): + ''' + Given a host and a package name, check if it is installed on the + system. + ''' + check_pkg_cmd = "rpm -qa | grep " + rpm_pkg + + (status, cmd_output, _) = self.execute(check_pkg_cmd) + if status: + return None + + pkg_pattern = ".*" + rpm_pkg + ".*" + rpm_pattern = re.compile(pkg_pattern, re.IGNORECASE) + + for line in cmd_output.splitlines(): + mobj = rpm_pattern.match(line) + if mobj: + return mobj.group(0) + + LOG.info("%s pkg installed ", rpm_pkg) + + return None + + def get_openstack_release(self, ver_str): + ''' + Get the release series name from the package version + Refer to here for release tables: + https://wiki.openstack.org/wiki/Releases + ''' + ver_table = {"2015.1": "Kilo", + "2014.2": "Juno", + "2014.1": "Icehouse", + "2013.2": "Havana", + "2013.1": "Grizzly", + "2012.2": "Folsom", + "2012.1": "Essex", + "2011.3": "Diablo", + "2011.2": "Cactus", + "2011.1": "Bexar", + "2010.1": "Austin"} + + ver_prefix = re.search(r"20\d\d\.\d", ver_str).group(0) + if ver_prefix in ver_table: + return ver_table[ver_prefix] + else: + return "Unknown" + + def check_openstack_version(self): + ''' + Identify the openstack version running on the controller. + ''' + nova_cmd = "nova-manage --version" + (status, _, err_output) = self.execute(nova_cmd) + + if status: + return "Unknown" + + ver_str = err_output.strip() + release_str = self.get_openstack_release(err_output) + return release_str + " (" + ver_str + ")" + + def get_cpu_info(self): + ''' + Get the CPU info of the controller. + Note: Here we are assuming the controller node has the exact + hardware as the compute nodes. + ''' + + cmd = 'cat /proc/cpuinfo | grep -m1 "model name"' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + model_name = re.search(r":\s(.*)", std_output).group(1) + + cmd = 'cat /proc/cpuinfo | grep "model name" | wc -l' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + cores = std_output.strip() + + return (cores + " * " + model_name) + + def get_nic_name(self, agent_type, encap, internal_iface_dict): + ''' + Get the NIC info of the controller. + Note: Here we are assuming the controller node has the exact + hardware as the compute nodes. + ''' + + # The internal_ifac_dict is a dictionary contains the mapping between + # hostname and the internal interface name like below: + # {u'hh23-4': u'eth1', u'hh23-5': u'eth1', u'hh23-6': u'eth1'} + + cmd = "hostname" + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + hostname = std_output.strip() + + if hostname in internal_iface_dict: + iface = internal_iface_dict[hostname] + else: + return "Unknown" + + # Figure out which interface is for internal traffic + if 'Linux bridge' in agent_type: + ifname = iface + elif 'Open vSwitch' in agent_type: + if encap == 'vlan': + # [root@hh23-10 ~]# ovs-vsctl list-ports br-inst + # eth1 + # phy-br-inst + cmd = 'ovs-vsctl list-ports ' + \ + iface + ' | grep -E "^[^phy].*"' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + ifname = std_output.strip() + elif encap == 'vxlan' or encap == 'gre': + # This is complicated. We need to first get the local IP address on + # br-tun, then do a reverse lookup to get the physical interface. + # + # [root@hh23-4 ~]# ip addr show to "23.23.2.14" + # 3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000 + # inet 23.23.2.14/24 brd 23.23.2.255 scope global eth1 + # valid_lft forever preferred_lft forever + cmd = "ip addr show to " + iface + " | awk -F: '{print $2}'" + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + ifname = std_output.strip() + else: + return "Unknown" + + cmd = 'ethtool -i ' + ifname + ' | grep bus-info' + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + bus_info = re.search(r":\s(.*)", std_output).group(1) + + cmd = 'lspci -s ' + bus_info + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + nic_name = re.search( + r"Ethernet controller:\s(.*)", + std_output).group(1) + + return (nic_name) + + def get_l2agent_version(self, agent_type): + ''' + Get the L2 agent version of the controller. + Note: Here we are assuming the controller node has the exact + hardware as the compute nodes. + ''' + if 'Linux bridge' in agent_type: + cmd = "brctl --version | awk -F',' '{print $2}'" + ver_string = "Linux Bridge " + elif 'Open vSwitch' in agent_type: + cmd = "ovs-vsctl --version | awk -F')' '{print $2}'" + ver_string = "OVS " + else: + return "Unknown" + + (status, std_output, _) = self.execute(cmd) + if status: + return "Unknown" + + return ver_string + std_output.strip() + + +class SSHError(Exception): + pass + + +class SSHTimeout(SSHError): + pass + +# Check IPv4 address syntax - not completely fool proof but will catch +# some invalid formats + + +def is_ipv4(address): + try: + socket.inet_aton(address) + except socket.error: + return False + return True + + +class SSHAccess(object): + + ''' + A class to contain all the information needed to access a host + (native or virtual) using SSH + ''' + + def __init__(self, arg_value=None): + ''' + decode user@host[:pwd] + 'hugo@1.1.1.1:secret' -> ('hugo', '1.1.1.1', 'secret', None) + 'huggy@2.2.2.2' -> ('huggy', '2.2.2.2', None, None) + None ->(None, None, None, None) + Examples of fatal errors (will call exit): + 'hutch@q.1.1.1' (invalid IP) + '@3.3.3.3' (missing username) + 'hiro@' or 'buggy' (missing host IP) + The error field will be None in case of success or will + contain a string describing the error + ''' + self.username = None + self.host = None + self.password = None + # name of the file that contains the private key + self.private_key_file = None + # this is the private key itself (a long string starting with + # -----BEGIN RSA PRIVATE KEY----- + # used when the private key is not saved in any file + self.private_key = None + self.public_key_file = None + self.port = 22 + self.error = None + + if not arg_value: + return + match = re.search(r'^([^@]+)@([0-9\.]+):?(.*)$', arg_value) + if not match: + self.error = 'Invalid argument: ' + arg_value + return + if not is_ipv4(match.group(2)): + self.error = 'Invalid IPv4 address ' + match.group(2) + return + (self.username, self.host, self.password) = match.groups() + + def copy_from(self, ssh_access): + self.username = ssh_access.username + self.host = ssh_access.host + self.port = ssh_access.port + self.password = ssh_access.password + self.private_key = ssh_access.private_key + self.public_key_file = ssh_access.public_key_file + self.private_key_file = ssh_access.private_key_file + + +class SSH(Connection): + + """Represent ssh connection.""" + + def __init__(self, ssh_access, + connect_timeout=60, + connect_retry_count=30, + connect_retry_wait_sec=2): + """Initialize SSH client. + :param user: ssh username + :param host: hostname or ip address of remote ssh server + :param port: remote ssh port + :param pkey: RSA or DSS private key string or file object + :param key_filename: private key filename + :param password: password + :param connect_timeout: timeout when connecting ssh + :param connect_retry_count: how many times to retry connecting + :param connect_retry_wait_sec: seconds to wait between retries + """ + + self.ssh_access = ssh_access + if ssh_access.private_key: + self.pkey = self._get_pkey(ssh_access.private_key) + else: + self.pkey = None + self._client = False + self.connect_timeout = connect_timeout + self.connect_retry_count = connect_retry_count + self.connect_retry_wait_sec = connect_retry_wait_sec + super(SSH, self).__init__() + + def _get_pkey(self, key): + '''Get the binary form of the private key + from the text form + ''' + if isinstance(key, basestring): + key = StringIO.StringIO(key) + errors = [] + for key_class in (paramiko.rsakey.RSAKey, paramiko.dsskey.DSSKey): + try: + return key_class.from_private_key(key) + except paramiko.SSHException as exc: + errors.append(exc) + raise SSHError('Invalid pkey: %s' % (errors)) + + def _is_active(self): + if self._client: + try: + transport = self._client.get_transport() + session = transport.open_session() + session.close() + return True + except Exception: + return False + else: + return False + + def _get_client(self, force=False): + if not force and self._is_active(): + return self._client + if self._client: + LOG.info('Re-establishing ssh connection with %s' % (self.ssh_access.host)) + self._client.close() + self._client = paramiko.SSHClient() + self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + for _ in range(self.connect_retry_count): + try: + self._client.connect(self.ssh_access.host, + username=self.ssh_access.username, + port=self.ssh_access.port, + pkey=self.pkey, + key_filename=self.ssh_access.private_key_file, + password=self.ssh_access.password, + timeout=self.connect_timeout) + self._client.get_transport().set_keepalive(5) + return self._client + except (paramiko.AuthenticationException, + paramiko.BadHostKeyException, + paramiko.SSHException, + socket.error, + Exception): + time.sleep(self.connect_retry_wait_sec) + + self._client = None + msg = '[%s] SSH Connection failed after %s attempts' % (self.ssh_access.host, + self.connect_retry_count) + raise SSHError(msg) + + def _get_session(self): + client = self._get_client() + for _ in range(self.connect_retry_count): + try: + transport = client.get_transport() + session = transport.open_session() + return session + except Exception: + client = self._get_client(force=True) + return None + + def close(self): + super(SSH, self).close() + if self._client: + self._client.close() + self._client = False + + def run(self, cmd, stdin=None, stdout=None, stderr=None, + raise_on_error=True, timeout=3600, sudo=False): + """Execute specified command on the server. + :param cmd: Command to be executed. + :param stdin: Open file or string to pass to stdin. + :param stdout: Open file to connect to stdout. + :param stderr: Open file to connect to stderr. + :param raise_on_error: If False then exit code will be return. If True + then exception will be raized if non-zero code. + :param timeout: Timeout in seconds for command execution. + Default 1 hour. No timeout if set to 0. + :param sudo: Executes command as sudo with default password + """ + + if isinstance(stdin, basestring): + stdin = StringIO.StringIO(stdin) + + return self._run(cmd, stdin=stdin, stdout=stdout, + stderr=stderr, raise_on_error=raise_on_error, + timeout=timeout, sudo=sudo) + + def _run(self, cmd, stdin=None, stdout=None, stderr=None, + raise_on_error=True, timeout=3600, sudo=False): + + session = self._get_session() + + if session is None: + raise SSHError('Unable to open session to ssh connection') + + if sudo: + cmd = "echo " + self.ssh_access.password + " | sudo -S -p '' " + cmd + session.get_pty() + + session.exec_command(cmd) + start_time = time.time() + + data_to_send = '' + stderr_data = None + + # If we have data to be sent to stdin then `select' should also + # check for stdin availability. + if stdin and not stdin.closed: + writes = [session] + else: + writes = [] + + while True: + # Block until data can be read/write. + select.select([session], writes, [session], 1) + + if session.recv_ready(): + data = session.recv(4096) + if stdout is not None: + stdout.write(data) + continue + + if session.recv_stderr_ready(): + stderr_data = session.recv_stderr(4096) + if stderr is not None: + stderr.write(stderr_data) + continue + + if session.send_ready(): + if stdin is not None and not stdin.closed: + if not data_to_send: + data_to_send = stdin.read(4096) + if not data_to_send: + stdin.close() + session.shutdown_write() + writes = [] + continue + sent_bytes = session.send(data_to_send) + data_to_send = data_to_send[sent_bytes:] + + if session.exit_status_ready(): + break + + if timeout and (time.time() - timeout) > start_time: + args = {'cmd': cmd, 'host': self.ssh_access.host} + raise SSHTimeout(('Timeout executing command ' + '"%(cmd)s" on host %(host)s') % args) + # if e: + # raise SSHError('Socket error.') + + exit_status = session.recv_exit_status() + if 0 != exit_status and raise_on_error: + fmt = ('Command "%(cmd)s" failed with exit_status %(status)d.') + details = fmt % {'cmd': cmd, 'status': exit_status} + if stderr_data: + details += (' Last stderr data: "%s".') % stderr_data + raise SSHError(details) + return exit_status + + def execute(self, cmd, stdin=None, timeout=3600, sudo=False): + """Execute the specified command on the server. + :param cmd: Command to be executed. + :param stdin: Open file to be sent on process stdin. + :param timeout: Timeout for execution of the command. + Return tuple (exit_status, stdout, stderr) + """ + stdout = StringIO.StringIO() + stderr = StringIO.StringIO() + + exit_status = self.run(cmd, stderr=stderr, + stdout=stdout, stdin=stdin, + timeout=timeout, raise_on_error=False, sudo=sudo) + stdout.seek(0) + stderr.seek(0) + return (exit_status, stdout.read(), stderr.read()) + + def wait(self, timeout=120, interval=1): + """Wait for the host will be available via ssh.""" + start_time = time.time() + while True: + try: + return self.execute('uname') + except (socket.error, SSHError): + time.sleep(interval) + if time.time() > (start_time + timeout): + raise SSHTimeout( + ('Timeout waiting for "%s"') % + self.ssh_access.host) + + +class SubprocessTimeout(Exception): + pass + + +class Subprocess(Connection): + + """Represent subprocess connection.""" + + def execute(self, cmd, stdin=None, timeout=3600): + process = subprocess.Popen(shlex.split(cmd), stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + shell=True) + timer = threading.Timer(timeout, process.kill) + stdout, stderr = process.communicate(input=stdin) + status = process.wait() + if timer.is_alive(): + timer.cancel() + raise SubprocessTimeout('Timeout executing command "%(cmd)s"') + return (status, stdout, stderr) + + +################################################## +# Only invoke the module directly for test purposes. Should be +# invoked from pns script. +################################################## +def main(): + # As argument pass the SSH access string, e.g. "localadmin@1.1.1.1:secret" + test_ssh = SSH(SSHAccess(sys.argv[1])) + + print 'ID=' + test_ssh.distro_id + print 'ID_LIKE=' + test_ssh.distro_id_like + print 'VERSION_ID=' + test_ssh.distro_version + + # ssh.wait() + # print ssh.pidof('bash') + # print ssh.stat('/tmp') + print test_ssh.check_openstack_version() + print test_ssh.get_cpu_info() + print test_ssh.get_l2agent_version("Open vSwitch agent") + +if __name__ == "__main__": + main() diff --git a/nfvbench/credentials.py b/nfvbench/credentials.py new file mode 100644 index 0000000..0c8470e --- /dev/null +++ b/nfvbench/credentials.py @@ -0,0 +1,166 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +# Module for credentials in Openstack +import getpass +from keystoneauth1.identity import v2 +from keystoneauth1.identity import v3 +from keystoneauth1 import session +import os +import re + +from log import LOG + + +class Credentials(object): + + def get_session(self): + dct = { + 'username': self.rc_username, + 'password': self.rc_password, + 'auth_url': self.rc_auth_url + } + auth = None + + if self.rc_identity_api_version == 3: + dct.update({ + 'project_name': self.rc_project_name, + 'project_domain_name': self.rc_project_domain_name, + 'user_domain_name': self.rc_user_domain_name + }) + auth = v3.Password(**dct) + else: + dct.update({ + 'tenant_name': self.rc_tenant_name + }) + auth = v2.Password(**dct) + return session.Session(auth=auth, verify=self.rc_cacert) + + def __parse_openrc(self, file): + export_re = re.compile('export OS_([A-Z_]*)="?(.*)') + for line in file: + line = line.strip() + mstr = export_re.match(line) + if mstr: + # get rif of posible trailing double quote + # the first one was removed by the re + name = mstr.group(1) + value = mstr.group(2) + if value.endswith('"'): + value = value[:-1] + # get rid of password assignment + # echo "Please enter your OpenStack Password: " + # read -sr OS_PASSWORD_INPUT + # export OS_PASSWORD=$OS_PASSWORD_INPUT + if value.startswith('$'): + continue + # Check if api version is provided + # Default is keystone v2 + if name == 'IDENTITY_API_VERSION': + self.rc_identity_api_version = int(value) + + # now match against wanted variable names + elif name == 'USERNAME': + self.rc_username = value + elif name == 'AUTH_URL': + self.rc_auth_url = value + elif name == 'TENANT_NAME': + self.rc_tenant_name = value + elif name == "CACERT": + self.rc_cacert = value + elif name == "REGION_NAME": + self.rc_region_name = value + elif name == "PASSWORD": + self.rc_password = value + elif name == "USER_DOMAIN_NAME": + self.rc_user_domain_name = value + elif name == "PROJECT_NAME": + self.rc_project_name = value + elif name == "PROJECT_DOMAIN_NAME": + self.rc_project_domain_name = value + + # + # Read a openrc file and take care of the password + # The 2 args are passed from the command line and can be None + # + def __init__(self, openrc_file, pwd=None, no_env=False): + self.rc_password = None + self.rc_username = None + self.rc_tenant_name = None + self.rc_auth_url = None + self.rc_cacert = None + self.rc_region_name = None + self.rc_user_domain_name = None + self.rc_project_domain_name = None + self.rc_project_name = None + self.rc_identity_api_version = 2 + success = True + + if openrc_file: + if isinstance(openrc_file, str): + if os.path.exists(openrc_file): + self.__parse_openrc(open(openrc_file)) + else: + LOG.error('Error: rc file does not exist %s', openrc_file) + success = False + else: + self.__parse_openrc(openrc_file) + elif not no_env: + # no openrc file passed - we assume the variables have been + # sourced by the calling shell + # just check that they are present + if 'OS_IDENTITY_API_VERSION' in os.environ: + self.rc_identity_api_version = int(os.environ['OS_IDENTITY_API_VERSION']) + + if self.rc_identity_api_version == 2: + for varname in ['OS_USERNAME', 'OS_AUTH_URL', 'OS_TENANT_NAME']: + if varname not in os.environ: + LOG.warning('%s is missing', varname) + success = False + if success: + self.rc_username = os.environ['OS_USERNAME'] + self.rc_auth_url = os.environ['OS_AUTH_URL'] + self.rc_tenant_name = os.environ['OS_TENANT_NAME'] + if 'OS_REGION_NAME' in os.environ: + self.rc_region_name = os.environ['OS_REGION_NAME'] + elif self.rc_identity_api_version == 3: + for varname in ['OS_USERNAME', 'OS_AUTH_URL', 'OS_PROJECT_NAME', + 'OS_PROJECT_DOMAIN_NAME', 'OS_USER_DOMAIN_NAME']: + if varname not in os.environ: + LOG.warning('%s is missing', varname) + success = False + if success: + self.rc_username = os.environ['OS_USERNAME'] + self.rc_auth_url = os.environ['OS_AUTH_URL'] + self.rc_project_name = os.environ['OS_PROJECT_NAME'] + self.rc_project_domain_id = os.environ['OS_PROJECT_DOMAIN_NAME'] + self.rc_user_domain_id = os.environ['OS_USER_DOMAIN_NAME'] + if 'OS_CACERT' in os.environ: + self.rc_cacert = os.environ['OS_CACERT'] + + + # always override with CLI argument if provided + if pwd: + self.rc_password = pwd + # if password not know, check from env variable + elif self.rc_auth_url and not self.rc_password and success: + if 'OS_PASSWORD' in os.environ and not no_env: + self.rc_password = os.environ['OS_PASSWORD'] + else: + # interactively ask for password + self.rc_password = getpass.getpass( + 'Please enter your OpenStack Password: ') + if not self.rc_password: + self.rc_password = "" diff --git a/nfvbench/factory.py b/nfvbench/factory.py new file mode 100644 index 0000000..35a8c1b --- /dev/null +++ b/nfvbench/factory.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# Copyright 2017 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from chain_clients import EXTStageClient +from chain_clients import PVPStageClient +from chain_clients import PVVPStageClient +from chain_managers import EXTStatsManager +from chain_managers import PVPStatsManager +from chain_managers import PVVPStatsManager +import chain_workers as workers +from config_plugin import ConfigPlugin +from specs import ChainType +import tor_client + + +class BasicFactory(object): + + chain_classes = [ChainType.EXT, ChainType.PVP, ChainType.PVVP] + + chain_stats_classes = { + ChainType.EXT: EXTStatsManager, + ChainType.PVP: PVPStatsManager, + ChainType.PVVP: PVVPStatsManager, + } + + stage_clients_classes = { + ChainType.EXT: EXTStageClient, + ChainType.PVP: PVPStageClient, + ChainType.PVVP: PVVPStageClient, + } + + def get_stats_class(self, service_chain): + CLASS = self.chain_stats_classes.get(service_chain, None) + if CLASS is None: + raise Exception("Service chain '{}' not supported.".format(service_chain)) + + return CLASS + + def get_stage_class(self, service_chain): + CLASS = self.stage_clients_classes.get(service_chain, None) + if CLASS is None: + raise Exception("VM Client for chain '{}' not supported.".format(service_chain)) + + return CLASS + + def get_chain_worker(self, encaps, service_chain): + return workers.BasicWorker + + def get_tor_class(self, tor_type, no_tor_access): + if no_tor_access or not tor_type: + # if no TOR access is required, use basic no-op client + tor_type = 'BasicTORClient' + + return getattr(tor_client, tor_type) + + def get_config_plugin_class(self): + return ConfigPlugin diff --git a/nfvbench/log.py b/nfvbench/log.py new file mode 100644 index 0000000..22afefe --- /dev/null +++ b/nfvbench/log.py @@ -0,0 +1,40 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + + +def setup(product_name): + # logging.basicConfig() + formatter_str = '%(asctime)s %(levelname)s %(message)s' + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter(formatter_str)) + + # Add handler to logger + logger = logging.getLogger(product_name) + logger.addHandler(handler) + + +def set_level(product, debug=False): + log_level = logging.DEBUG if debug else logging.INFO + logger = logging.getLogger(product) + logger.setLevel(log_level) + + +def getLogger(product): + logger = logging.getLogger(product) + + return logger + +LOG = getLogger('nfvbench') diff --git a/nfvbench/network.py b/nfvbench/network.py new file mode 100644 index 0000000..e097c2b --- /dev/null +++ b/nfvbench/network.py @@ -0,0 +1,62 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +class Interface(object): + + def __init__(self, name, device, tx_packets, rx_packets): + self.name = name + self.device = device + self.packets = { + 'tx': tx_packets, + 'rx': rx_packets + } + + def set_packets(self, tx, rx): + self.packets = { + 'tx': tx, + 'rx': rx + } + + def set_packets_diff(self, tx, rx): + self.packets = { + 'tx': tx - self.packets['tx'], + 'rx': rx - self.packets['rx'], + } + + def is_no_op(self): + return self.name is None + + def get_packet_count(self, traffic_type): + return self.packets.get(traffic_type, 0) + + @staticmethod + def no_op(): + return Interface(None, None, 0, 0) + + +class Network(object): + + def __init__(self, interfaces=None, reverse=False): + if interfaces is None: + interfaces = [] + self.interfaces = interfaces + self.reverse = reverse + + def add_interface(self, interface): + self.interfaces.append(interface) + + def get_interfaces(self): + return self.interfaces[::-1] if self.reverse else self.interfaces diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py new file mode 100644 index 0000000..0dcf2f1 --- /dev/null +++ b/nfvbench/nfvbench.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from __init__ import __version__ +import argparse +from attrdict import AttrDict +from chain_runner import ChainRunner +from collections import defaultdict +from config import config_load +from config import config_loads +import credentials +import datetime +from factory import BasicFactory +import importlib +import json +import log +from log import LOG +from nfvbenchd import WebSocketIoServer +import os +import pbr.version +from pkg_resources import resource_string +from specs import Specs +from summarizer import NFVBenchSummarizer +import sys +import traceback +from traffic_client import TrafficGeneratorFactory +import utils + + +class NFVBench(object): + """Main class of NFV benchmarking tool.""" + STATUS_OK = 'OK' + STATUS_ERROR = 'ERROR' + + def __init__(self, config, openstack_spec, config_plugin, factory, notifier=None): + self.base_config = config + self.config = None + self.config_plugin = config_plugin + self.factory = factory + self.notifier = notifier + self.cred = credentials.Credentials(config.openrc_file, None, False) + self.chain_runner = None + self.specs = Specs() + self.specs.set_openstack_spec(openstack_spec) + self.clients = defaultdict(lambda: None) + self.vni_ports = [] + sys.stdout.flush() + + def setup(self): + self.specs.set_run_spec(self.config_plugin.get_run_spec(self.specs.openstack)) + self.chain_runner = ChainRunner(self.config, + self.clients, + self.cred, + self.specs, + self.factory, + self.notifier) + + def set_notifier(self, notifier): + self.notifier = notifier + + def run(self, opts): + status = NFVBench.STATUS_OK + result = None + message = '' + try: + self.update_config(opts) + self.setup() + + result = { + "date": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + "nfvbench_version": __version__, + "openstack_spec": { + "vswitch": self.specs.openstack.vswitch, + "encaps": self.specs.openstack.encaps + }, + "config": self.config_plugin.prepare_results_config(dict(self.config)), + "benchmarks": { + "network": { + "service_chain": self.chain_runner.run(), + "versions": self.chain_runner.get_version(), + } + } + } + result['benchmarks']['network']['versions'].update(self.config_plugin.get_version()) + except Exception: + status = NFVBench.STATUS_ERROR + message = traceback.format_exc() + except KeyboardInterrupt: + status = NFVBench.STATUS_ERROR + message = traceback.format_exc() + finally: + if self.chain_runner: + self.chain_runner.close() + + if status == NFVBench.STATUS_OK: + result = utils.dict_to_json_dict(result) + return { + 'status': status, + 'result': result + } + else: + return { + 'status': status, + 'error_message': message + } + + def print_summary(self, result): + """Print summary of the result""" + print NFVBenchSummarizer(result) + sys.stdout.flush() + + def save(self, result): + """Save results in json format file.""" + utils.save_json_result(result, + self.config.json_file, + self.config.std_json_path, + self.config.service_chain, + self.config.service_chain_count, + self.config.flow_count, + self.config.frame_sizes) + + def update_config(self, opts): + self.config = AttrDict(dict(self.base_config)) + self.config.update(opts) + + self.config.service_chain = self.config.service_chain.upper() + self.config.service_chain_count = int(self.config.service_chain_count) + self.config.flow_count = utils.parse_flow_count(self.config.flow_count) + required_flow_count = self.config.service_chain_count * 2 + if self.config.flow_count < required_flow_count: + LOG.info("Flow count '{}' has been set to minimum value of '{}' " + "for current configuration".format(self.config.flow_count, + required_flow_count)) + self.config.flow_count = required_flow_count + + if self.config.flow_count % 2 != 0: + self.config.flow_count += 1 + + self.config.duration_sec = float(self.config.duration_sec) + self.config.interval_sec = float(self.config.interval_sec) + + # Get traffic generator profile config + if not self.config.generator_profile: + self.config.generator_profile = self.config.traffic_generator.default_profile + + generator_factory = TrafficGeneratorFactory(self.config) + self.config.generator_config = \ + generator_factory.get_generator_config(self.config.generator_profile) + + if not any(self.config.generator_config.pcis): + raise Exception("PCI addresses configuration for selected traffic generator profile " + "({tg_profile}) are missing. Please specify them in configuration file." + .format(tg_profile=self.config.generator_profile)) + + if self.config.traffic is None or len(self.config.traffic) == 0: + raise Exception("No traffic profile found in traffic configuration, " + "please fill 'traffic' section in configuration file.") + + if isinstance(self.config.traffic, tuple): + self.config.traffic = self.config.traffic[0] + + self.config.frame_sizes = generator_factory.get_frame_sizes(self.config.traffic.profile) + + self.config.ipv6_mode = False + self.config.no_dhcp = True + self.config.same_network_only = True + if self.config.openrc_file: + self.config.openrc_file = os.path.expanduser(self.config.openrc_file) + + self.config.ndr_run = (not self.config.no_traffic + and 'ndr' in self.config.rate.strip().lower().split('_')) + self.config.pdr_run = (not self.config.no_traffic + and 'pdr' in self.config.rate.strip().lower().split('_')) + self.config.single_run = (not self.config.no_traffic + and not (self.config.ndr_run or self.config.pdr_run)) + + if self.config.vlans and len(self.config.vlans) != 2: + raise Exception('Number of configured VLAN IDs for VLAN tagging must be exactly 2.') + + self.config.json_file = self.config.json if self.config.json else None + if self.config.json_file: + (path, filename) = os.path.split(self.config.json) + if not os.path.exists(path): + raise Exception('Please provide existing path for storing results in JSON file. ' + 'Path used: {path}'.format(path=path)) + + self.config.std_json_path = self.config.std_json if self.config.std_json else None + if self.config.std_json_path: + if not os.path.exists(self.config.std_json): + raise Exception('Please provide existing path for storing results in JSON file. ' + 'Path used: {path}'.format(path=self.config.std_json_path)) + + self.config_plugin.validate_config(self.config) + + +def parse_opts_from_cli(): + parser = argparse.ArgumentParser() + + parser.add_argument('-c', '--config', dest='config', + action='store', + help='Override default values with a config file or ' + 'a yaml/json config string', + metavar='<file_name_or_yaml>') + + parser.add_argument('--server', dest='server', + default=None, + action='store', + metavar='<http_root_pathname>', + help='Run nfvbench in server mode and pass' + ' the HTTP root folder full pathname') + + parser.add_argument('--host', dest='host', + action='store', + default='0.0.0.0', + help='Host IP address on which server will be listening (default 0.0.0.0)') + + parser.add_argument('-p', '--port', dest='port', + action='store', + default=7555, + help='Port on which server will be listening (default 7555)') + + parser.add_argument('-sc', '--service-chain', dest='service_chain', + choices=BasicFactory.chain_classes, + action='store', + help='Service chain to run') + + parser.add_argument('-scc', '--service-chain-count', dest='service_chain_count', + action='store', + help='Set number of service chains to run', + metavar='<service_chain_count>') + + parser.add_argument('-fc', '--flow-count', dest='flow_count', + action='store', + help='Set number of total flows for all chains and all directions', + metavar='<flow_count>') + + parser.add_argument('--rate', dest='rate', + action='store', + help='Specify rate in pps, bps or %% as total for all directions', + metavar='<rate>') + + parser.add_argument('--duration', dest='duration_sec', + action='store', + help='Set duration to run traffic generator (in seconds)', + metavar='<duration_sec>') + + parser.add_argument('--interval', dest='interval_sec', + action='store', + help='Set interval to record traffic generator stats (in seconds)', + metavar='<interval_sec>') + + parser.add_argument('--inter-node', dest='inter_node', + default=None, + action='store_true', + help='run VMs in different compute nodes (PVVP only)') + + parser.add_argument('--sriov', dest='sriov', + default=None, + action='store_true', + help='Use SRIOV (no vswitch - requires SRIOV support in compute nodes)') + + parser.add_argument('-d', '--debug', dest='debug', + action='store_true', + default=None, + help='print debug messages (verbose)') + + parser.add_argument('-g', '--traffic-gen', dest='generator_profile', + action='store', + help='Traffic generator profile to use') + + parser.add_argument('-i', '--image', dest='image_name', + action='store', + help='VM image name to use') + + parser.add_argument('-0', '--no-traffic', dest='no_traffic', + default=None, + action='store_true', + help='Check config and connectivity only - do not generate traffic') + + parser.add_argument('--no-arp', dest='no_arp', + default=None, + action='store_true', + help='Do not use ARP to find MAC addresses, ' + 'instead use values in config file') + + parser.add_argument('--no-reset', dest='no_reset', + default=None, + action='store_true', + help='Do not reset counters prior to running') + + parser.add_argument('--no-int-config', dest='no_int_config', + default=None, + action='store_true', + help='Skip interfaces config on EXT service chain') + + parser.add_argument('--no-tor-access', dest='no_tor_access', + default=None, + action='store_true', + help='Skip TOR switch configuration and retrieving of stats') + + parser.add_argument('--no-vswitch-access', dest='no_vswitch_access', + default=None, + action='store_true', + help='Skip vswitch configuration and retrieving of stats') + + parser.add_argument('--no-cleanup', dest='no_cleanup', + default=None, + action='store_true', + help='no cleanup after run') + + parser.add_argument('--json', dest='json', + action='store', + help='store results in json format file', + metavar='<path>/<filename>') + + parser.add_argument('--std-json', dest='std_json', + action='store', + help='store results in json format file with nfvbench standard filename: ' + '<service-chain-type>-<service-chain-count>-<flow-count>' + '-<packet-sizes>.json', + metavar='<path>') + + parser.add_argument('--show-default-config', dest='show_default_config', + default=None, + action='store_true', + help='print the default config in yaml format (unedited)') + + parser.add_argument('--show-config', dest='show_config', + default=None, + action='store_true', + help='print the running config in json format') + + parser.add_argument('-ss', '--show-summary', dest='summary', + action='store', + help='Show summary from nfvbench json file', + metavar='<json>') + + parser.add_argument('-v', '--version', dest='version', + default=None, + action='store_true', + help='Show version') + + parser.add_argument('-fs', '--frame-size', dest='frame_sizes', + action='append', + help='Override traffic profile frame sizes', + metavar='<frame_size_bytes or IMIX>') + + parser.add_argument('--unidir', dest='unidir', + action='store_true', + default=None, + help='Override traffic profile direction (requires -fs)') + + opts, unknown_opts = parser.parse_known_args() + return opts, unknown_opts + + +def load_default_config(): + default_cfg = resource_string(__name__, "cfg.default.yaml") + config = config_loads(default_cfg) + config.name = '(built-in default config)' + return config, default_cfg + + +def override_custom_traffic(config, frame_sizes, unidir): + """Override the traffic profiles with a custom one + """ + if frame_sizes is not None: + traffic_profile_name = "custom_traffic_profile" + config.traffic_profile = [ + { + "l2frame_size": frame_sizes, + "name": traffic_profile_name + } + ] + else: + traffic_profile_name = config.traffic["profile"] + + bidirectional = config.traffic['bidirectional'] if unidir is None else not unidir + config.traffic = { + "bidirectional": bidirectional, + "profile": traffic_profile_name + } + + +def main(): + try: + log.setup('nfvbench') + # load default config file + config, default_cfg = load_default_config() + # create factory for platform specific classes + try: + factory_module = importlib.import_module(config['factory_module']) + factory = getattr(factory_module, config['factory_class'])() + except AttributeError: + raise Exception("Requested factory module '{m}' or class '{c}' was not found." + .format(m=config['factory_module'], c=config['factory_class'])) + # create config plugin for this platform + config_plugin = factory.get_config_plugin_class()(config) + config = config_plugin.get_config() + openstack_spec = config_plugin.get_openstack_spec() + + opts, unknown_opts = parse_opts_from_cli() + log.set_level('nfvbench', debug=opts.debug) + + if opts.version: + print pbr.version.VersionInfo('nfvbench').version_string_with_vcs() + sys.exit(0) + + if opts.summary: + with open(opts.summary) as json_data: + print NFVBenchSummarizer(json.load(json_data)) + sys.exit(0) + + # show default config in text/yaml format + if opts.show_default_config: + print default_cfg + sys.exit(0) + + config.name = '' + if opts.config: + # override default config options with start config at path parsed from CLI + # check if it is an inline yaml/json config or a file name + if os.path.isfile(opts.config): + print opts.config + config = config_load(opts.config, config) + config.name = os.path.basename(opts.config) + else: + config = config_loads(opts.config, config) + + # traffic profile override options + override_custom_traffic(config, opts.frame_sizes, opts.unidir) + + # copy over cli options that are used in config + config.generator_profile = opts.generator_profile + + # show running config in json format + if opts.show_config: + print json.dumps(config, sort_keys=True, indent=4) + sys.exit(0) + + nfvbench = NFVBench(config, openstack_spec, config_plugin, factory) + + if opts.server: + if os.path.isdir(opts.server): + server = WebSocketIoServer(opts.server, nfvbench) + nfvbench.set_notifier(server) + try: + port = int(opts.port) + except ValueError: + server.run(host=opts.host) + else: + server.run(host=opts.host, port=port) + else: + print 'Invalid HTTP root directory: ' + opts.server + sys.exit(1) + else: + with utils.RunLock(): + if unknown_opts: + LOG.warning('Unknown options: ' + ' '.join(unknown_opts)) + + # remove unfilled values + opts = {k: v for k, v in vars(opts).iteritems() if v is not None} + result = nfvbench.run(opts) + if 'error_message' in result: + raise Exception(result['error_message']) + + if 'result' in result and result['status']: + nfvbench.save(result['result']) + nfvbench.print_summary(result['result']) + except Exception: + LOG.error({ + 'status': NFVBench.STATUS_ERROR, + 'error_message': traceback.format_exc() + }) + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py new file mode 100644 index 0000000..aef896a --- /dev/null +++ b/nfvbench/nfvbenchd.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python +# Copyright 2017 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from flask import Flask +from flask import jsonify +from flask import render_template +from flask import request + +from flask_socketio import emit +from flask_socketio import SocketIO + +import json +import Queue +import traceback +from utils import byteify +from utils import RunLock +import uuid + +# this global cannot reside in Ctx because of the @app and @socketio decorators +app = None +socketio = None + +STATUS_OK = 'OK' +STATUS_ERROR = 'ERROR' +STATUS_PENDING = 'PENDING' +STATUS_NOT_FOUND = 'NOT_FOUND' + + +def result_json(status, message, request_id=None): + body = { + 'status': status, + 'error_message': message + } + + if request_id is not None: + body['request_id'] = request_id + + return body + + +def load_json(data): + return json.loads(json.dumps(data), object_hook=byteify) + + +def get_uuid(): + return uuid.uuid4().hex + + +class Ctx(object): + MAXLEN = 5 + run_queue = Queue.Queue() + busy = False + result = None + request_from_socketio = False + results = {} + ids = [] + current_id = None + + @staticmethod + def enqueue(config, request_id, from_socketio=False): + Ctx.busy = True + Ctx.request_from_socketio = from_socketio + config['request_id'] = request_id + Ctx.run_queue.put(config) + + if len(Ctx.ids) >= Ctx.MAXLEN: + try: + del Ctx.results[Ctx.ids.pop(0)] + except KeyError: + pass + Ctx.ids.append(request_id) + + @staticmethod + def dequeue(): + config = Ctx.run_queue.get() + Ctx.current_id = config['request_id'] + return config + + @staticmethod + def release(): + Ctx.current_id = None + Ctx.busy = False + + @staticmethod + def set_result(res): + res['request_id'] = Ctx.current_id + Ctx.results[Ctx.current_id] = res + Ctx.result = res + + @staticmethod + def get_result(request_id=None): + if request_id: + try: + res = Ctx.results[request_id] + except KeyError: + return None + + if Ctx.result and request_id == Ctx.result['request_id']: + Ctx.result = None + + return res + else: + res = Ctx.result + if res: + Ctx.result = None + return res + + @staticmethod + def is_busy(): + return Ctx.busy + + @staticmethod + def get_current_request_id(): + return Ctx.current_id + + +def setup_flask(root_path): + global socketio + global app + app = Flask(__name__) + app.root_path = root_path + socketio = SocketIO(app, async_mode='threading') + busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running') + not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run') + not_found_msg = 'results not found' + pending_msg = 'NFVbench run still pending' + + # --------- socketio requests ------------ + + @socketio.on('start_run') + def socketio_start_run(config): + if not Ctx.is_busy(): + Ctx.enqueue(config, get_uuid(), from_socketio=True) + else: + emit('error', {'reason': 'there is already an NFVbench request running'}) + + @socketio.on('echo') + def socketio_echo(config): + emit('echo', config) + + # --------- HTTP requests ------------ + + @app.route('/') + def index(): + return render_template('index.html') + + @app.route('/echo', methods=['GET']) + def echo(): + config = request.json + return jsonify(config) + + @app.route('/start_run', methods=['POST']) + def start_run(): + config = load_json(request.json) + if Ctx.is_busy(): + return jsonify(busy_json) + else: + request_id = get_uuid() + Ctx.enqueue(config, request_id) + return jsonify(result_json(STATUS_PENDING, pending_msg, request_id)) + + @app.route('/status', defaults={'request_id': None}, methods=['GET']) + @app.route('/status/<request_id>', methods=['GET']) + def get_status(request_id): + if request_id: + if Ctx.is_busy() and request_id == Ctx.get_current_request_id(): + # task with request_id still pending + return jsonify(result_json(STATUS_PENDING, pending_msg, request_id)) + + res = Ctx.get_result(request_id) + if res: + # found result for given request_id + return jsonify(res) + else: + # result for given request_id not found + return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id)) + else: + if Ctx.is_busy(): + # task still pending, return with request_id + return jsonify(result_json(STATUS_PENDING, + pending_msg, + Ctx.get_current_request_id())) + + res = Ctx.get_result() + if res: + return jsonify(res) + else: + return jsonify(not_busy_json) + + +class WebSocketIoServer(object): + """This class takes care of the web socketio server, accepts websocket events, and sends back + notifications using websocket events (send_ methods). Caller should simply create an instance + of this class and pass a runner object then invoke the run method + """ + def __init__(self, http_root, runner): + self.nfvbench_runner = runner + setup_flask(http_root) + + def run(self, host='127.0.0.1', port=7556): + + # socketio.run will not return so we need to run it in a background thread so that + # the calling thread (main thread) can keep doing work + socketio.start_background_task(target=socketio.run, app=app, host=host, port=port) + + # wait for run requests + # the runner must be executed from the main thread (Trex client library requirement) + while True: + # print 'main thread waiting for requests...' + config = Ctx.dequeue() + # print 'main thread processing request...' + print config + try: + # remove unfilled values as we do not want them to override default values with None + config = {k: v for k, v in config.items() if v is not None} + with RunLock(): + results = self.nfvbench_runner.run(config) + except Exception as exc: + print 'NFVbench runner exception:' + traceback.print_exc() + results = result_json(STATUS_ERROR, str(exc)) + + if Ctx.request_from_socketio: + socketio.emit('run_end', results) + else: + # this might overwrite a previously unfetched result + Ctx.set_result(results) + Ctx.release() + + def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): + stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} + socketio.emit('run_interval_stats', stats) + + def send_ndr_found(self, ndr_pps): + socketio.emit('ndr_found', {'rate_pps': ndr_pps}) + + def send_pdr_found(self, pdr_pps): + socketio.emit('pdr_found', {'rate_pps': pdr_pps}) diff --git a/nfvbench/nfvbenchvm/nfvbenchvm.conf b/nfvbench/nfvbenchvm/nfvbenchvm.conf new file mode 100644 index 0000000..0b76244 --- /dev/null +++ b/nfvbench/nfvbenchvm/nfvbenchvm.conf @@ -0,0 +1,9 @@ +FORWARDER={forwarder} +TG_MAC1={tg_mac1} +TG_MAC2={tg_mac2} +VNF_GATEWAY1_CIDR={vnf_gateway1_cidr} +VNF_GATEWAY2_CIDR={vnf_gateway2_cidr} +TG_NET1={tg_net1} +TG_NET2={tg_net2} +TG_GATEWAY1_IP={tg_gateway1_ip} +TG_GATEWAY2_IP={tg_gateway2_ip} diff --git a/nfvbench/packet_analyzer.py b/nfvbench/packet_analyzer.py new file mode 100644 index 0000000..c01675b --- /dev/null +++ b/nfvbench/packet_analyzer.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from collections import OrderedDict +from log import LOG + + +class PacketAnalyzer(object): + """Analyze packet drop counter in a chain""" + + def __init__(self): + self.last_packet_count = 0 + self.chain = [] + + def record(self, interface, traffic_type): + """Records the counter of the next interface with the corresponding traffic type""" + if interface.is_no_op(): + return + packet_count = interface.get_packet_count(traffic_type) + packet_drop_count = self.last_packet_count - packet_count + path_data = OrderedDict() + path_data['interface'] = interface.name + path_data['device'] = interface.device + path_data['packet_count'] = packet_count + + if self.chain: + path_data['packet_drop_count'] = packet_drop_count + + self.chain.append(path_data) + self.last_packet_count = packet_count + + def get_analysis(self): + """Gets the analysis of packet drops""" + transmitted_packets = self.chain[0]['packet_count'] + + for (index, path_data) in enumerate(self.chain): + LOG.info('[Packet Analyze] Interface: %s' % (path_data['interface'])) + LOG.info('[Packet Analyze] > Count: %d' % (path_data['packet_count'])) + + if index: + if transmitted_packets: + self.chain[index]['packet_drop_percentage'] = \ + 100.0 * path_data['packet_drop_count'] / transmitted_packets + else: + self.chain[index]['packet_drop_percentage'] = float('nan') + LOG.info('[Packet Analyze] > Packet Drops: %d' % + (path_data['packet_drop_count'])) + LOG.info('[Packet Analyze] > Percentage: %s' % + (path_data['packet_drop_percentage'])) + + return self.chain diff --git a/nfvbench/service_chain.py b/nfvbench/service_chain.py new file mode 100644 index 0000000..104cfb4 --- /dev/null +++ b/nfvbench/service_chain.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from chain_managers import StageManager +from collections import OrderedDict +from log import LOG +from specs import ChainType +import time + + +class ServiceChain(object): + + def __init__(self, config, clients, cred, specs, factory, notifier=None): + self.config = config + self.clients = clients + self.cred = cred + self.specs = specs + self.factory = factory + self.notifier = notifier + self.chain_name = self.config.service_chain + self.vlans = None + self.stage_manager = None + self.stats_manager = None + LOG.info('ServiceChain initialized.') + + def __set_helpers(self): + self.stage_manager = StageManager(self.config, self.cred, self.factory) + self.clients['vm'] = self.stage_manager + self.vlans = self.stage_manager.get_vlans() + + STATS_CLASS = self.factory.get_stats_class(self.config.service_chain) + self.stats_manager = STATS_CLASS(self.config, + self.clients, + self.specs, + self.factory, + self.vlans, + self.notifier) + + def __set_vlan_tags(self): + if self.config.vlan_tagging: + # override with user-specified vlans if configured + vlans = self.config.vlans if self.config.vlans else self.vlans[:2] + for vlan, device in zip(vlans, self.config.generator_config.devices): + self.stats_manager.set_vlan_tag(device, vlan) + + def __get_result_per_frame_size(self, frame_size, bidirectional): + start_time = time.time() + traffic_result = { + frame_size: {} + } + result = {} + if not self.config.no_traffic: + self.clients['traffic'].set_traffic(frame_size, bidirectional) + + if self.config.single_run: + result = self.stats_manager.run() + else: + results = self.clients['traffic'].get_ndr_and_pdr() + + for dr in ['pdr', 'ndr']: + if dr in results: + traffic_result[frame_size][dr] = results[dr] + if 'warning' in results[dr]['stats'] and results[dr]['stats']['warning']: + traffic_result['warning'] = results[dr]['stats']['warning'] + traffic_result[frame_size]['iteration_stats'] = results['iteration_stats'] + + result['analysis_duration_sec'] = time.time() - start_time + if self.config.single_run: + result['run_config'] = self.clients['traffic'].get_run_config(result) + required = result['run_config']['direction-total']['orig']['rate_pps'] + actual = result['stats']['total_tx_rate'] + warning = self.clients['traffic'].compare_tx_rates(required, actual) + if warning is not None: + result['run_config']['warning'] = warning + + traffic_result[frame_size].update(result) + return traffic_result + + def __get_chain_result(self): + result = OrderedDict() + for fs in self.config.frame_sizes: + result.update(self.__get_result_per_frame_size(fs, self.config.traffic.bidirectional)) + + chain_result = { + 'flow_count': self.config.flow_count, + 'service_chain_count': self.config.service_chain_count, + 'bidirectional': self.config.traffic.bidirectional, + 'profile': self.config.traffic.profile, + 'compute_nodes': self.stats_manager.get_compute_nodes_bios(), + 'result': result + } + + return chain_result + + def __setup_traffic(self): + self.clients['traffic'].setup() + if not self.config.no_traffic: + if self.config.service_chain == ChainType.EXT and not self.config.no_arp: + self.clients['traffic'].ensure_arp_successful() + self.clients['traffic'].ensure_end_to_end() + + def run(self): + LOG.info('Starting {} chain...'.format(self.chain_name)) + LOG.info('Dry run: {}'.format(self.config.no_traffic)) + results = {} + + self.__set_helpers() + self.__set_vlan_tags() + self.stage_manager.set_vm_macs() + self.__setup_traffic() + results[self.chain_name] = {'result': self.__get_chain_result()} + + if self.config.service_chain == ChainType.PVVP: + results[self.chain_name]['mode'] = 'inter-node' \ + if self.config.inter_node else 'intra-node' + + LOG.info("Service chain '{}' run completed.".format(self.chain_name)) + return results + + def get_version(self): + return self.stats_manager.get_version() + + def close(self): + self.stage_manager.close() + self.stats_manager.close() diff --git a/nfvbench/specs.py b/nfvbench/specs.py new file mode 100644 index 0000000..3f93df6 --- /dev/null +++ b/nfvbench/specs.py @@ -0,0 +1,93 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +class Encaps(object): + VLAN = "VLAN" + VxLAN = "VxLAN" + BASIC = "BASIC" + + encaps_mapping = { + 'VLAN': VLAN, + 'VXLAN': VxLAN + } + + @classmethod + def get(cls, network_type): + return cls.encaps_mapping.get(network_type.upper(), None) + + +class ChainType(object): + PVP = "PVP" + PVVP = "PVVP" + EXT = "EXT" + + chain_mapping = { + 'PVP': PVP, + 'PVVP': PVVP, + 'EXT': EXT + } + + @classmethod + def get_chain_type(cls, chain): + return cls.chain_mapping.get(chain.upper(), None) + + +class OpenStackSpec(object): + + def __init__(self): + self.__vswitch = "BASIC" + self.__encaps = Encaps.BASIC + + @property + def vswitch(self): + return self.__vswitch + + @vswitch.setter + def vswitch(self, vsw): + if vsw is None: + raise Exception('Trying to set vSwitch as None.') + + self.__vswitch = vsw.upper() + + @property + def encaps(self): + return self.__encaps + + @encaps.setter + def encaps(self, enc): + if enc is None: + raise Exception('Trying to set Encaps as None.') + + self.__encaps = enc + + +class RunSpec(object): + + def __init__(self, no_vswitch_access, openstack_spec): + self.use_vswitch = (not no_vswitch_access) and openstack_spec.vswitch != "BASIC" + + +class Specs(object): + + def __init__(self): + self.openstack = None + self.run_spec = None + + def set_openstack_spec(self, openstack_spec): + self.openstack = openstack_spec + + def set_run_spec(self, run_spec): + self.run_spec = run_spec diff --git a/nfvbench/stats_collector.py b/nfvbench/stats_collector.py new file mode 100644 index 0000000..964d704 --- /dev/null +++ b/nfvbench/stats_collector.py @@ -0,0 +1,145 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import time + + +class StatsCollector(object): + """Base class for all stats collector classes.""" + + def __init__(self, start_time): + self.start_time = start_time + self.stats = [] + + def get(self): + return self.stats + + def peek(self): + return self.stats[-1] + + @staticmethod + def _get_drop_percentage(drop_pkts, total_pkts): + return float(drop_pkts * 100) / total_pkts + + @staticmethod + def _get_rx_pps(tx_pps, drop_percentage): + return (tx_pps * (100 - drop_percentage)) / 100 + + def _get_current_time_diff(self): + return int((time.time() - self.start_time) * 1000) + + +class IntervalCollector(StatsCollector): + """Collects stats while traffic is running. Frequency is specified by 'interval_sec' setting.""" + + last_tx_pkts = 0 + last_rx_pkts = 0 + last_time = 0 + + def __init__(self, start_time): + StatsCollector.__init__(self, start_time) + self.notifier = None + + def attach_notifier(self, notifier): + self.notifier = notifier + + def add(self, stats): + if self.notifier: + current_stats = self.__compute_tx_rx_diff(stats) + self.notifier.send_interval_stats(**current_stats) + + def reset(self): + # don't reset time! + self.last_rx_pkts = 0 + self.last_tx_pkts = 0 + + def add_ndr_pdr(self, tag, stats): + if self.notifier: + + current_time = self._get_current_time_diff() + rx_pps = self._get_rx_pps(stats['tx_pps'], stats['drop_percentage']) + + self.last_tx_pkts = stats['tx_pps'] / 1000 * (current_time - self.last_time) + self.last_rx_pkts = rx_pps / 1000 * (current_time - self.last_time) + self.last_time = current_time + + # 'drop_pct' key is an unfortunate name, since in iteration stats it means + # number of the packets. More suitable would be 'drop_percentage'. + # FDS frontend depends on this key + current_stats = { + '{}_pps'.format(tag): stats['tx_pps'], + 'tx_pps': stats['tx_pps'], + 'rx_pps': rx_pps, + 'drop_pct': stats['drop_percentage'], + 'time_ms': current_time + } + + self.notifier.send_interval_stats(time_ms=current_stats['time_ms'], + tx_pps=current_stats['tx_pps'], + rx_pps=current_stats['rx_pps'], + drop_pct=current_stats['drop_pct']) + if tag == 'ndr': + self.notifier.send_ndr_found(stats['tx_pps']) + else: + self.notifier.send_pdr_found(stats['tx_pps']) + + def __compute_tx_rx_diff(self, stats): + current_time = self._get_current_time_diff() + tx_diff = stats['overall']['tx']['total_pkts'] - self.last_tx_pkts + tx_pps = (tx_diff * 1000) / (current_time - self.last_time) + rx_diff = stats['overall']['rx']['total_pkts'] - self.last_rx_pkts + rx_pps = (rx_diff * 1000) / (current_time - self.last_time) + + self.last_rx_pkts = stats['overall']['rx']['total_pkts'] + self.last_tx_pkts = stats['overall']['tx']['total_pkts'] + self.last_time = current_time + + return { + 'tx_pps': tx_pps, + 'rx_pps': rx_pps, + 'drop_pct': max(0.0, (1 - (float(rx_pps) / tx_pps)) * 100), + 'time_ms': current_time + } + + +class IterationCollector(StatsCollector): + """Collects stats after traffic is stopped. Frequency is specified by 'duration_sec' setting.""" + + def __init__(self, start_time): + StatsCollector.__init__(self, start_time) + + def add(self, stats, tx_pps): + drop_percentage = self._get_drop_percentage(stats['overall']['rx']['dropped_pkts'], + stats['overall']['tx']['total_pkts']) + + record = { + 'total_tx_pps': int(stats['total_tx_rate']), + 'tx_pps': tx_pps, + 'tx_pkts': stats['overall']['tx']['total_pkts'], + 'rx_pps': self._get_rx_pps(tx_pps, drop_percentage), + 'rx_pkts': stats['overall']['rx']['total_pkts'], + 'drop_pct': stats['overall']['rx']['dropped_pkts'], + 'drop_percentage': drop_percentage, + 'time_ms': int(time.time() * 1000) + } + + if 'warning' in stats: + record['warning'] = stats['warning'] + + self.stats.append(record) + + def add_ndr_pdr(self, tag, rate): + last_stats = self.peek() + last_stats['{}_pps'.format(tag)] = rate diff --git a/nfvbench/summarizer.py b/nfvbench/summarizer.py new file mode 100644 index 0000000..4ee2426 --- /dev/null +++ b/nfvbench/summarizer.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import bitmath +from contextlib import contextmanager +import math +from specs import ChainType +from tabulate import tabulate + + +class Formatter(object): + """Collection of string formatter methods""" + + @staticmethod + def fixed(data): + return data + + @staticmethod + def int(data): + return '{:,}'.format(data) + + @staticmethod + def float(decimal): + return lambda data: '%.{}f'.format(decimal) % (data) + + @staticmethod + def standard(data): + if type(data) == int: + return Formatter.int(data) + elif type(data) == float: + return Formatter.float(4)(data) + else: + return Formatter.fixed(data) + + @staticmethod + def suffix(suffix_str): + return lambda data: Formatter.standard(data) + suffix_str + + @staticmethod + def bits(data): + # By default, `best_prefix` returns a value in byte format, this hack (multiply by 8.0) + # will convert it into bit format. + bit = 8.0 * bitmath.Bit(float(data)) + bit = bit.best_prefix(bitmath.SI) + byte_to_bit_classes = { + 'kB': bitmath.kb, + 'MB': bitmath.Mb, + 'GB': bitmath.Gb, + 'TB': bitmath.Tb, + 'PB': bitmath.Pb, + 'EB': bitmath.Eb, + 'ZB': bitmath.Zb, + 'YB': bitmath.Yb, + } + bps = byte_to_bit_classes.get(bit.unit, bitmath.Bit).from_other(bit) / 8.0 + if bps.unit != 'Bit': + return bps.format("{value:.4f} {unit}ps") + else: + return bps.format("{value:.4f} bps") + + @staticmethod + def percentage(data): + if data is None: + return '' + elif math.isnan(data): + return '-' + else: + return Formatter.suffix('%')(Formatter.float(4)(data)) + + +class Table(object): + """ASCII readable table class""" + + def __init__(self, header): + header_row, self.formatters = zip(*header) + self.data = [header_row] + self.columns = len(header_row) + + def add_row(self, row): + assert(self.columns == len(row)) + formatted_row = [] + for entry, formatter in zip(row, self.formatters): + formatted_row.append(formatter(entry)) + self.data.append(formatted_row) + + def get_string(self, indent=0): + spaces = ' ' * indent + table = tabulate(self.data, + headers='firstrow', + tablefmt='grid', + stralign='center', + floatfmt='.2f') + return table.replace('\n', '\n' + spaces) + + +class Summarizer(object): + """Generic summarizer class""" + + indent_per_level = 2 + + def __init__(self): + self.indent_size = 0 + self.marker_stack = [False] + self.str = '' + + def __indent(self, marker): + self.indent_size += self.indent_per_level + self.marker_stack.append(marker) + + def __unindent(self): + assert(self.indent_size >= self.indent_per_level) + self.indent_size -= self.indent_per_level + self.marker_stack.pop() + + def __get_indent_string(self): + current_str = ' ' * self.indent_size + if self.marker_stack[-1]: + current_str = current_str[:-2] + '> ' + return current_str + + def _put(self, *args): + self.str += self.__get_indent_string() + if len(args) and type(args[-1]) == dict: + self.str += ' '.join(map(str, args[:-1])) + '\n' + self._put_dict(args[-1]) + else: + self.str += ' '.join(map(str, args)) + '\n' + + def _put_dict(self, data): + with self._create_block(False): + for key, value in data.iteritems(): + if type(value) == dict: + self._put(key + ':') + self._put_dict(value) + else: + self._put(key + ':', value) + + def _put_table(self, table): + self.str += self.__get_indent_string() + self.str += table.get_string(self.indent_size) + '\n' + + def __str__(self): + return self.str + + @contextmanager + def _create_block(self, marker=True): + self.__indent(marker) + yield + self.__unindent() + + +class NFVBenchSummarizer(Summarizer): + """Summarize nfvbench json result""" + + ndr_pdr_header = [ + ('-', Formatter.fixed), + ('L2 Frame Size', Formatter.standard), + ('Rate (fwd+rev)', Formatter.bits), + ('Rate (fwd+rev)', Formatter.suffix(' pps')), + ('Avg Drop Rate', Formatter.suffix('%')), + ('Avg Latency (usec)', Formatter.standard), + ('Min Latency (usec)', Formatter.standard), + ('Max Latency (usec)', Formatter.standard) + ] + + single_run_header = [ + ('L2 Frame Size', Formatter.standard), + ('Drop Rate', Formatter.suffix('%')), + ('Avg Latency (usec)', Formatter.standard), + ('Min Latency (usec)', Formatter.standard), + ('Max Latency (usec)', Formatter.standard) + ] + + config_header = [ + ('Direction', Formatter.standard), + ('Requested TX Rate (bps)', Formatter.bits), + ('Actual TX Rate (bps)', Formatter.bits), + ('RX Rate (bps)', Formatter.bits), + ('Requested TX Rate (pps)', Formatter.suffix(' pps')), + ('Actual TX Rate (pps)', Formatter.suffix(' pps')), + ('RX Rate (pps)', Formatter.suffix(' pps')) + ] + + chain_analysis_header = [ + ('Interface', Formatter.standard), + ('Device', Formatter.standard), + ('Packets (fwd)', Formatter.standard), + ('Drops (fwd)', Formatter.standard), + ('Drop% (fwd)', Formatter.percentage), + ('Packets (rev)', Formatter.standard), + ('Drops (rev)', Formatter.standard), + ('Drop% (rev)', Formatter.percentage) + ] + + direction_keys = ['direction-forward', 'direction-reverse', 'direction-total'] + direction_names = ['Forward', 'Reverse', 'Total'] + + def __init__(self, result): + Summarizer.__init__(self) + self.result = result + self.config = self.result['config'] + self.__summarize() + + def __summarize(self): + self._put() + self._put('========== NFVBench Summary ==========') + self._put('Date:', self.result['date']) + self._put('NFVBench version', self.result['nfvbench_version']) + self._put('Openstack Neutron:', { + 'vSwitch': self.result['openstack_spec']['vswitch'], + 'Encapsulation': self.result['openstack_spec']['encaps'] + }) + self._put('Benchmarks:') + with self._create_block(): + self._put('Networks:') + with self._create_block(): + network_benchmark = self.result['benchmarks']['network'] + + self._put('Components:') + with self._create_block(): + self._put('TOR:') + with self._create_block(False): + self._put('Type:', self.config['tor']['type']) + self._put('Traffic Generator:') + with self._create_block(False): + self._put('Profile:', self.config['generator_config']['name']) + self._put('Tool:', self.config['generator_config']['tool']) + if network_benchmark['versions']: + self._put('Versions:') + with self._create_block(): + for component, version in network_benchmark['versions'].iteritems(): + self._put(component + ':', version) + + if self.config['ndr_run'] or self.config['pdr_run']: + self._put('Measurement Parameters:') + with self._create_block(False): + if self.config['ndr_run']: + self._put('NDR:', self.config['measurement']['NDR']) + if self.config['pdr_run']: + self._put('PDR:', self.config['measurement']['PDR']) + + self._put('Service chain:') + for result in network_benchmark['service_chain'].iteritems(): + with self._create_block(): + self.__chain_summarize(*result) + + def __chain_summarize(self, chain_name, chain_benchmark): + self._put(chain_name + ':') + if chain_name == ChainType.PVVP: + self._put('Mode:', chain_benchmark.get('mode')) + with self._create_block(): + self._put('Traffic:') + with self._create_block(False): + self.__traffic_summarize(chain_benchmark['result']) + + def __traffic_summarize(self, traffic_benchmark): + self._put('Profile:', traffic_benchmark['profile']) + self._put('Bidirectional:', traffic_benchmark['bidirectional']) + self._put('Flow count:', traffic_benchmark['flow_count']) + self._put('Service chains count:', traffic_benchmark['service_chain_count']) + self._put('Compute nodes:', traffic_benchmark['compute_nodes'].keys()) + with self._create_block(False): + self._put() + if not self.config['no_traffic']: + self._put('Run Summary:') + self._put() + with self._create_block(False): + self._put_table(self.__get_summary_table(traffic_benchmark['result'])) + try: + self._put() + self._put(traffic_benchmark['result']['warning']) + except KeyError: + pass + + for entry in traffic_benchmark['result'].iteritems(): + if 'warning' in entry: + continue + self.__chain_analysis_summarize(*entry) + + def __chain_analysis_summarize(self, frame_size, analysis): + self._put() + self._put('L2 frame size:', frame_size) + if 'analysis_duration_sec' in analysis: + self._put('Chain analysis duration:', + Formatter.float(3)(analysis['analysis_duration_sec']), 'seconds') + if self.config['ndr_run']: + self._put('NDR search duration:', Formatter.float(0)(analysis['ndr']['time_taken_sec']), + 'seconds') + if self.config['pdr_run']: + self._put('PDR search duration:', Formatter.float(0)(analysis['pdr']['time_taken_sec']), + 'seconds') + self._put() + + if not self.config['no_traffic'] and self.config['single_run']: + self._put('Run Config:') + self._put() + with self._create_block(False): + self._put_table(self.__get_config_table(analysis['run_config'])) + if 'warning' in analysis['run_config'] and analysis['run_config']['warning']: + self._put() + self._put(analysis['run_config']['warning']) + self._put() + + if 'packet_analysis' in analysis: + self._put('Chain Analysis:') + self._put() + with self._create_block(False): + self._put_table(self.__get_chain_analysis_table(analysis['packet_analysis'])) + self._put() + + def __get_summary_table(self, traffic_result): + if self.config['single_run']: + summary_table = Table(self.single_run_header) + else: + summary_table = Table(self.ndr_pdr_header) + + if self.config['ndr_run']: + for frame_size, analysis in traffic_result.iteritems(): + if frame_size == 'warning': + continue + summary_table.add_row([ + 'NDR', + frame_size, + analysis['ndr']['rate_bps'], + int(analysis['ndr']['rate_pps']), + analysis['ndr']['stats']['overall']['drop_percentage'], + analysis['ndr']['stats']['overall']['avg_delay_usec'], + analysis['ndr']['stats']['overall']['min_delay_usec'], + analysis['ndr']['stats']['overall']['max_delay_usec'] + ]) + if self.config['pdr_run']: + for frame_size, analysis in traffic_result.iteritems(): + if frame_size == 'warning': + continue + summary_table.add_row([ + 'PDR', + frame_size, + analysis['pdr']['rate_bps'], + int(analysis['pdr']['rate_pps']), + analysis['pdr']['stats']['overall']['drop_percentage'], + analysis['pdr']['stats']['overall']['avg_delay_usec'], + analysis['pdr']['stats']['overall']['min_delay_usec'], + analysis['pdr']['stats']['overall']['max_delay_usec'] + ]) + if self.config['single_run']: + for frame_size, analysis in traffic_result.iteritems(): + summary_table.add_row([ + frame_size, + analysis['stats']['overall']['drop_rate_percent'], + analysis['stats']['overall']['rx']['avg_delay_usec'], + analysis['stats']['overall']['rx']['min_delay_usec'], + analysis['stats']['overall']['rx']['max_delay_usec'] + ]) + return summary_table + + def __get_config_table(self, run_config): + config_table = Table(self.config_header) + for key, name in zip(self.direction_keys, self.direction_names): + if key not in run_config: + continue + config_table.add_row([ + name, + run_config[key]['orig']['rate_bps'], + run_config[key]['tx']['rate_bps'], + run_config[key]['rx']['rate_bps'], + int(run_config[key]['orig']['rate_pps']), + int(run_config[key]['tx']['rate_pps']), + int(run_config[key]['rx']['rate_pps']), + ]) + return config_table + + def __get_chain_analysis_table(self, packet_analysis): + chain_analysis_table = Table(self.chain_analysis_header) + forward_analysis = packet_analysis['direction-forward'] + reverse_analysis = packet_analysis['direction-reverse'] + reverse_analysis.reverse() + + for fwd, rev in zip(forward_analysis, reverse_analysis): + chain_analysis_table.add_row([ + fwd['interface'], + fwd['device'], + fwd['packet_count'], + fwd.get('packet_drop_count', None), + fwd.get('packet_drop_percentage', None), + rev['packet_count'], + rev.get('packet_drop_count', None), + rev.get('packet_drop_percentage', None), + ]) + return chain_analysis_table diff --git a/nfvbench/tor_client.py b/nfvbench/tor_client.py new file mode 100644 index 0000000..c8214c8 --- /dev/null +++ b/nfvbench/tor_client.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + + +class TORClientException(Exception): + pass + + +class BasicTORClient(object): + + def __init__(self, config): + pass + + def get_int_counters(self): + return {} + + def get_vni_counters(self, vni): + return {} + + def get_vni_interface(self, vni, counters): + return None + + def get_vni_for_vlan(self, vlans): + return [] + + def attach_tg_interfaces(self, network_vlans, switch_ports): + pass + + def clear_nve(self): + pass + + def clear_interface(self, vni): + pass + + def close(self): + pass + + def get_version(self): + return {} diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py new file mode 100644 index 0000000..8bfcd76 --- /dev/null +++ b/nfvbench/traffic_client.py @@ -0,0 +1,790 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from attrdict import AttrDict +import bitmath +from datetime import datetime +from log import LOG +from netaddr import IPNetwork +from network import Interface +import socket +from specs import ChainType +from stats_collector import IntervalCollector +from stats_collector import IterationCollector +import struct +import time +import traffic_gen.traffic_utils as utils + + +class TrafficClientException(Exception): + pass + + +class TrafficRunner(object): + + def __init__(self, client, duration_sec, interval_sec=0): + self.client = client + self.start_time = None + self.duration_sec = duration_sec + self.interval_sec = interval_sec + + def run(self): + LOG.info('Running traffic generator') + self.client.gen.clear_stats() + self.client.gen.start_traffic() + self.start_time = time.time() + return self.poll_stats() + + def stop(self): + if self.is_running(): + self.start_time = None + self.client.gen.stop_traffic() + + def is_running(self): + return self.start_time is not None + + def time_elapsed(self): + if self.is_running(): + return time.time() - self.start_time + else: + return self.duration_sec + + def poll_stats(self): + if not self.is_running(): + return None + time_elapsed = self.time_elapsed() + if time_elapsed > self.duration_sec: + self.stop() + return None + time_left = self.duration_sec - time_elapsed + if self.interval_sec > 0.0: + if time_left <= self.interval_sec: + time.sleep(time_left) + self.stop() + else: + time.sleep(self.interval_sec) + else: + time.sleep(self.duration_sec) + self.stop() + return self.client.get_stats() + + +class Device(object): + + def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None, + gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None, + gateway_ip_addrs_step=None, chain_count=1, flow_count=1, vlan_tagging=False): + self.chain_count = chain_count + self.flow_count = flow_count + self.dst = None + self.port = port + self.switch_port = switch_port + self.vtep_vlan = vtep_vlan + self.vlan_tag = None + self.vlan_tagging = vlan_tagging + self.pci = pci + self.mac = None + self.vm_mac_list = None + subnet = IPNetwork(ip) + self.ip = subnet.ip.format() + self.ip_prefixlen = subnet.prefixlen + self.ip_addrs_step = ip_addrs_step + self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step + self.gateway_ip_addrs_step = gateway_ip_addrs_step + self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count) + self.gateway_ip = gateway_ip + self.gateway_ip_list = self.expand_ip(self.gateway_ip, + self.gateway_ip_addrs_step, + self.chain_count) + self.tg_gateway_ip = tg_gateway_ip + self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip, + self.tg_gateway_ip_addrs_step, + self.chain_count) + + def set_mac(self, mac): + if mac is None: + raise TrafficClientException('Trying to set traffic generator MAC address as None') + self.mac = mac + + def set_destination(self, dst): + self.dst = dst + + def set_vm_mac_list(self, vm_mac_list): + self.vm_mac_list = map(str, vm_mac_list) + + def set_vlan_tag(self, vlan_tag): + if self.vlan_tagging and vlan_tag is None: + raise TrafficClientException('Trying to set VLAN tag as None') + self.vlan_tag = vlan_tag + + def get_stream_configs(self, service_chain): + configs = [] + flow_idx = 0 + for chain_idx in xrange(self.chain_count): + current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx) + max_idx = flow_idx + current_flow_count - 1 + ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \ + self.ip_to_int(self.ip_list[flow_idx]) + 1 + ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \ + self.ip_to_int(self.dst.ip_list[flow_idx]) + 1 + + configs.append({ + 'count': current_flow_count, + 'mac_src': self.mac, + 'mac_dst': self.dst.mac if service_chain == ChainType.EXT + else self.vm_mac_list[chain_idx], + 'ip_src_addr': self.ip_list[flow_idx], + 'ip_src_addr_max': self.ip_list[max_idx], + 'ip_src_count': ip_src_count, + 'ip_dst_addr': self.dst.ip_list[flow_idx], + 'ip_dst_addr_max': self.dst.ip_list[max_idx], + 'ip_dst_count': ip_dst_count, + 'ip_addrs_step': self.ip_addrs_step, + 'mac_discovery_gw': self.gateway_ip_list[chain_idx], + 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx], + 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx], + 'vlan_tag': self.vlan_tag if self.vlan_tagging else None + }) + + flow_idx += current_flow_count + return configs + + @classmethod + def expand_ip(cls, ip, step_ip, count): + if step_ip == 'random': + # Repeatable Random will used in the stream src/dst IP pairs, but we still need + # to expand the IP based on the number of chains and flows configured. So we use + # "0.0.0.1" as the step to have the exact IP flow ranges for every chain. + step_ip = '0.0.0.1' + + step_ip_in_int = cls.ip_to_int(step_ip) + subnet = IPNetwork(ip) + ip_list = [] + for _ in xrange(count): + ip_list.append(subnet.ip.format()) + subnet = subnet.next(step_ip_in_int) + return ip_list + + @staticmethod + def mac_to_int(mac): + return int(mac.translate(None, ":.- "), 16) + + @staticmethod + def int_to_mac(i): + mac = format(i, 'x').zfill(12) + blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)] + return ':'.join(blocks) + + @staticmethod + def ip_to_int(addr): + return struct.unpack("!I", socket.inet_aton(addr))[0] + + +class RunningTrafficProfile(object): + """Represents traffic configuration for currently running traffic profile.""" + + DEFAULT_IP_STEP = '0.0.0.1' + DEFAULT_SRC_DST_IP_STEP = '0.0.0.1' + + def __init__(self, config, generator_profile): + generator_config = self.__match_generator_profile(config.traffic_generator, + generator_profile) + self.generator_config = generator_config + self.service_chain = config.service_chain + self.service_chain_count = config.service_chain_count + self.flow_count = config.flow_count + self.host_name = generator_config.host_name + self.name = generator_config.name + self.tool = generator_config.tool + self.cores = generator_config.get('cores', 1) + self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP + self.tg_gateway_ip_addrs_step = \ + generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP + self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP + self.gateway_ips = generator_config.gateway_ip_addrs + self.ip = generator_config.ip + self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits + self.vlan_tagging = config.vlan_tagging + self.no_arp = config.no_arp + self.src_device = None + self.dst_device = None + self.vm_mac_list = None + self.__prep_interfaces(generator_config) + + def to_json(self): + return dict(self.generator_config) + + def set_vm_mac_list(self, vm_mac_list): + self.src_device.set_vm_mac_list(vm_mac_list[0]) + self.dst_device.set_vm_mac_list(vm_mac_list[1]) + + @staticmethod + def __match_generator_profile(traffic_generator, generator_profile): + generator_config = AttrDict(traffic_generator) + generator_config.pop('default_profile') + generator_config.pop('generator_profile') + matching_profile = filter(lambda profile: profile.name == generator_profile, + traffic_generator.generator_profile) + if len(matching_profile) != 1: + raise Exception('Traffic generator profile not found: ' + generator_profile) + + generator_config.update(matching_profile[0]) + + return generator_config + + def __prep_interfaces(self, generator_config): + src_config = { + 'chain_count': self.service_chain_count, + 'flow_count': self.flow_count / 2, + 'ip': generator_config.ip_addrs[0], + 'ip_addrs_step': self.ip_addrs_step, + 'gateway_ip': self.gateway_ips[0], + 'gateway_ip_addrs_step': self.gateway_ip_addrs_step, + 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0], + 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, + 'vlan_tagging': self.vlan_tagging + } + dst_config = { + 'chain_count': self.service_chain_count, + 'flow_count': self.flow_count / 2, + 'ip': generator_config.ip_addrs[1], + 'ip_addrs_step': self.ip_addrs_step, + 'gateway_ip': self.gateway_ips[1], + 'gateway_ip_addrs_step': self.gateway_ip_addrs_step, + 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1], + 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, + 'vlan_tagging': self.vlan_tagging + } + + self.src_device = Device(**dict(src_config, **generator_config.interfaces[0])) + self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1])) + self.src_device.set_destination(self.dst_device) + self.dst_device.set_destination(self.src_device) + + if self.service_chain == ChainType.EXT and not self.no_arp \ + and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list): + raise Exception('Computed IP addresses are not unique, choose different base. ' + 'Start IPs: {start}. End IPs: {end}' + .format(start=self.src_device.ip_list, + end=self.dst_device.ip_list)) + + def __are_unique(self, list1, list2): + return set(list1).isdisjoint(set(list2)) + + @property + def devices(self): + return [self.src_device, self.dst_device] + + @property + def vlans(self): + return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan] + + @property + def ports(self): + return [self.src_device.port, self.dst_device.port] + + @property + def switch_ports(self): + return [self.src_device.switch_port, self.dst_device.switch_port] + + @property + def pcis(self): + return [self.src_device.pci, self.dst_device.pci] + + +class TrafficGeneratorFactory(object): + + def __init__(self, config): + self.config = config + + def get_tool(self): + return self.config.generator_config.tool + + def get_generator_client(self): + tool = self.get_tool().lower() + if tool == 'trex': + from traffic_gen import trex + return trex.TRex(self.config) + elif tool == 'dummy': + from traffic_gen import dummy + return dummy.DummyTG(self.config) + else: + return None + + def list_generator_profile(self): + return [profile.name for profile in self.config.traffic_generator.generator_profile] + + def get_generator_config(self, generator_profile): + return RunningTrafficProfile(self.config, generator_profile) + + def get_matching_profile(self, traffic_profile_name): + matching_profile = filter(lambda profile: profile.name == traffic_profile_name, + self.config.traffic_profile) + + if len(matching_profile) > 1: + raise Exception('Multiple traffic profiles with the same name found.') + elif len(matching_profile) == 0: + raise Exception('No traffic profile found.') + + return matching_profile[0] + + def get_frame_sizes(self, traffic_profile): + matching_profile = self.get_matching_profile(traffic_profile) + return matching_profile.l2frame_size + + +class TrafficClient(object): + + PORTS = [0, 1] + + def __init__(self, config, notifier=None): + generator_factory = TrafficGeneratorFactory(config) + self.gen = generator_factory.get_generator_client() + self.tool = generator_factory.get_tool() + self.config = config + self.notifier = notifier + self.interval_collector = None + self.iteration_collector = None + self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec) + if self.gen is None: + raise TrafficClientException('%s is not a supported traffic generator' % self.tool) + + self.run_config = { + 'l2frame_size': None, + 'duration_sec': self.config.duration_sec, + 'bidirectional': True, + 'rates': None + } + self.current_total_rate = {'rate_percent': '10'} + if self.config.single_run: + self.current_total_rate = utils.parse_rate_str(self.config.rate) + + def set_macs(self): + for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices): + device.set_mac(mac) + + def start_traffic_generator(self): + self.gen.init() + self.gen.connect() + + def setup(self): + self.gen.set_mode() + self.gen.config_interface() + self.gen.clear_stats() + + def get_version(self): + return self.gen.get_version() + + def ensure_end_to_end(self): + """ + Ensure traffic generator receives packets it has transmitted. + This ensures end to end connectivity and also waits until VMs are ready to forward packets. + + At this point all VMs are in active state, but forwarding does not have to work. + Small amount of traffic is sent to every chain. Then total of sent and received packets + is compared. If ratio between received and transmitted packets is higher than (N-1)/N, + N being number of chains, traffic flows through every chain and real measurements can be + performed. + + Example: + PVP chain (1 VM per chain) + N = 10 (number of chains) + threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions) + if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning + all 10 VMs are in operational state. + """ + LOG.info('Starting traffic generator to ensure end-to-end connectivity') + rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)} + self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) + + # ensures enough traffic is coming back + threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count) + + for it in xrange(self.config.generic_retry_count): + self.gen.clear_stats() + self.gen.start_traffic() + LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1, + self.config.generic_retry_count)) + time.sleep(self.config.generic_poll_sec) + self.gen.stop_traffic() + stats = self.gen.get_stats() + + # compute total sent and received traffic on both ports + total_rx = 0 + total_tx = 0 + for port in self.PORTS: + total_rx += float(stats[port]['rx'].get('total_pkts', 0)) + total_tx += float(stats[port]['tx'].get('total_pkts', 0)) + + # how much of traffic came back + ratio = total_rx / total_tx if total_tx else 0 + + if ratio > threshold: + self.gen.clear_stats() + self.gen.clear_streamblock() + LOG.info('End-to-end connectivity ensured') + return + + time.sleep(self.config.generic_poll_sec) + + raise TrafficClientException('End-to-end connectivity cannot be ensured') + + def ensure_arp_successful(self): + if not self.gen.resolve_arp(): + raise TrafficClientException('ARP cannot be resolved') + + def set_traffic(self, frame_size, bidirectional): + self.run_config['bidirectional'] = bidirectional + self.run_config['l2frame_size'] = frame_size + self.run_config['rates'] = [self.get_per_direction_rate()] + if bidirectional: + self.run_config['rates'].append(self.get_per_direction_rate()) + else: + unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps) + if unidir_reverse_pps > 0: + self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)}) + + self.gen.clear_streamblock() + self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) + + def modify_load(self, load): + self.current_total_rate = {'rate_percent': str(load)} + rate_per_direction = self.get_per_direction_rate() + + self.gen.modify_rate(rate_per_direction, False) + self.run_config['rates'][0] = rate_per_direction + if self.run_config['bidirectional']: + self.gen.modify_rate(rate_per_direction, True) + self.run_config['rates'][1] = rate_per_direction + + def get_ndr_and_pdr(self): + dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional' + targets = {} + if self.config.ndr_run: + LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst) + targets['ndr'] = self.config.measurement.NDR + if self.config.pdr_run: + LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst) + targets['pdr'] = self.config.measurement.PDR + + self.run_config['start_time'] = time.time() + self.interval_collector = IntervalCollector(self.run_config['start_time']) + self.interval_collector.attach_notifier(self.notifier) + self.iteration_collector = IterationCollector(self.run_config['start_time']) + results = {} + self.__range_search(0.0, 200.0, targets, results) + + results['iteration_stats'] = { + 'ndr_pdr': self.iteration_collector.get() + } + + if self.config.ndr_run: + LOG.info('NDR load: %s', results['ndr']['rate_percent']) + results['ndr']['time_taken_sec'] = \ + results['ndr']['timestamp_sec'] - self.run_config['start_time'] + if self.config.pdr_run: + LOG.info('PDR load: %s', results['pdr']['rate_percent']) + results['pdr']['time_taken_sec'] = \ + results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec'] + else: + LOG.info('PDR load: %s', results['pdr']['rate_percent']) + results['pdr']['time_taken_sec'] = \ + results['pdr']['timestamp_sec'] - self.run_config['start_time'] + return results + + def __get_dropped_rate(self, result): + dropped_pkts = result['rx']['dropped_pkts'] + total_pkts = result['tx']['total_pkts'] + if not total_pkts: + return float('inf') + else: + return float(dropped_pkts) / total_pkts * 100 + + def get_stats(self): + stats = self.gen.get_stats() + retDict = {'total_tx_rate': stats['total_tx_rate']} + for port in self.PORTS: + retDict[port] = {'tx': {}, 'rx': {}} + + tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate'] + rx_keys = tx_keys + ['dropped_pkts'] + + for port in self.PORTS: + for key in tx_keys: + retDict[port]['tx'][key] = int(stats[port]['tx'][key]) + for key in rx_keys: + try: + retDict[port]['rx'][key] = int(stats[port]['rx'][key]) + except ValueError: + retDict[port]['rx'][key] = 0 + retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec']) + retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec']) + retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec']) + retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port]) + + ports = sorted(retDict.keys()) + if self.run_config['bidirectional']: + retDict['overall'] = {'tx': {}, 'rx': {}} + for key in tx_keys: + retDict['overall']['tx'][key] = \ + retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key] + for key in rx_keys: + retDict['overall']['rx'][key] = \ + retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key] + total_pkts = [retDict[ports[0]]['rx']['total_pkts'], + retDict[ports[1]]['rx']['total_pkts']] + avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'], + retDict[ports[1]]['rx']['avg_delay_usec']] + max_delays = [retDict[ports[0]]['rx']['max_delay_usec'], + retDict[ports[1]]['rx']['max_delay_usec']] + min_delays = [retDict[ports[0]]['rx']['min_delay_usec'], + retDict[ports[1]]['rx']['min_delay_usec']] + retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays) + retDict['overall']['rx']['min_delay_usec'] = min(min_delays) + retDict['overall']['rx']['max_delay_usec'] = max(max_delays) + for key in ['pkt_bit_rate', 'pkt_rate']: + for dirc in ['tx', 'rx']: + retDict['overall'][dirc][key] /= 2.0 + else: + retDict['overall'] = retDict[ports[0]] + retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall']) + return retDict + + def __convert_rates(self, rate): + return utils.convert_rates(self.run_config['l2frame_size'], + rate, + self.config.generator_config.intf_speed) + + def __ndr_pdr_found(self, tag, load): + rates = self.__convert_rates({'rate_percent': load}) + self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps']) + last_stats = self.iteration_collector.peek() + self.interval_collector.add_ndr_pdr(tag, last_stats) + + def __format_output_stats(self, stats): + for key in (self.PORTS + ['overall']): + interface = stats[key] + stats[key] = { + 'tx_pkts': interface['tx']['total_pkts'], + 'rx_pkts': interface['rx']['total_pkts'], + 'drop_percentage': interface['drop_rate_percent'], + 'drop_pct': interface['rx']['dropped_pkts'], + 'avg_delay_usec': interface['rx']['avg_delay_usec'], + 'max_delay_usec': interface['rx']['max_delay_usec'], + 'min_delay_usec': interface['rx']['min_delay_usec'], + } + + return stats + + def __targets_found(self, rate, targets, results): + for tag, target in targets.iteritems(): + LOG.info('Found {} ({}) load: {}'.format(tag, target, rate)) + self.__ndr_pdr_found(tag, rate) + results[tag]['timestamp_sec'] = time.time() + + def __range_search(self, left, right, targets, results): + '''Perform a binary search for a list of targets inside a [left..right] range or rate + + left the left side of the range to search as a % the line rate (100 = 100% line rate) + indicating the rate to send on each interface + right the right side of the range to search as a % of line rate + indicating the rate to send on each interface + targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr') + results a dict to store results + ''' + if len(targets) == 0: + return + LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets)) + + # Terminate search when gap is less than load epsilon + if right - left < self.config.measurement.load_epsilon: + self.__targets_found(left, targets, results) + return + + # Obtain the average drop rate in for middle load + middle = (left + right) / 2.0 + stats, rates = self.__run_search_iteration(middle) + + # Split target dicts based on the avg drop rate + left_targets = {} + right_targets = {} + for tag, target in targets.iteritems(): + if stats['overall']['drop_rate_percent'] <= target: + # record the best possible rate found for this target + results[tag] = rates + results[tag].update({ + 'load_percent_per_direction': middle, + 'stats': self.__format_output_stats(dict(stats)), + 'timestamp_sec': None + }) + right_targets[tag] = target + else: + left_targets[tag] = target + + # search lower half + self.__range_search(left, middle, left_targets, results) + + # search upper half only if the upper rate does not exceed + # 100%, this only happens when the first search at 100% + # yields a DR that is < target DR + if middle >= 100: + self.__targets_found(100, right_targets, results) + else: + self.__range_search(middle, right, right_targets, results) + + def __run_search_iteration(self, rate): + # set load + self.modify_load(rate) + + # poll interval stats and collect them + for stats in self.run_traffic(): + self.interval_collector.add(stats) + time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec'] + if time_elapsed_ratio >= 1: + self.cancel_traffic() + self.interval_collector.reset() + + # get stats from the run + stats = self.runner.client.get_stats() + current_traffic_config = self.get_traffic_config() + warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'], + stats['total_tx_rate']) + if warning is not None: + stats['warning'] = warning + + # save reliable stats from whole iteration + self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps']) + LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent'])) + + return stats, current_traffic_config['direction-total'] + + @staticmethod + def log_stats(stats): + report = { + 'datetime': str(datetime.now()), + 'tx_packets': stats['overall']['tx']['total_pkts'], + 'rx_packets': stats['overall']['rx']['total_pkts'], + 'drop_packets': stats['overall']['rx']['dropped_pkts'], + 'drop_rate_percent': stats['overall']['drop_rate_percent'] + } + LOG.info('TX: %(tx_packets)d; ' + 'RX: %(rx_packets)d; ' + 'Dropped: %(drop_packets)d; ' + 'Drop rate: %(drop_rate_percent).4f%%', + report) + + def run_traffic(self): + stats = self.runner.run() + while self.runner.is_running: + self.log_stats(stats) + yield stats + stats = self.runner.poll_stats() + if stats is None: + return + self.log_stats(stats) + LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent'])) + yield stats + + def cancel_traffic(self): + self.runner.stop() + + def get_interface(self, port_index): + port = self.gen.port_handle[port_index] + tx, rx = 0, 0 + if not self.config.no_traffic: + stats = self.get_stats() + if port in stats: + tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) + return Interface('traffic-generator', self.tool.lower(), tx, rx) + + def get_traffic_config(self): + config = {} + load_total = 0.0 + bps_total = 0.0 + pps_total = 0.0 + for idx, rate in enumerate(self.run_config['rates']): + key = 'direction-forward' if idx == 0 else 'direction-reverse' + config[key] = { + 'l2frame_size': self.run_config['l2frame_size'], + 'duration_sec': self.run_config['duration_sec'] + } + config[key].update(rate) + config[key].update(self.__convert_rates(rate)) + load_total += float(config[key]['rate_percent']) + bps_total += float(config[key]['rate_bps']) + pps_total += float(config[key]['rate_pps']) + config['direction-total'] = dict(config['direction-forward']) + config['direction-total'].update({ + 'rate_percent': load_total, + 'rate_pps': pps_total, + 'rate_bps': bps_total + }) + + return config + + def get_run_config(self, results): + """Returns configuration which was used for the last run.""" + r = {} + for idx, key in enumerate(["direction-forward", "direction-reverse"]): + tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec + rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec + r[key] = { + "orig": self.__convert_rates(self.run_config['rates'][idx]), + "tx": self.__convert_rates({'rate_pps': tx_rate}), + "rx": self.__convert_rates({'rate_pps': rx_rate}) + } + + total = {} + for direction in ['orig', 'tx', 'rx']: + total[direction] = {} + for unit in ['rate_percent', 'rate_bps', 'rate_pps']: + total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values())) + + r['direction-total'] = total + return r + + @staticmethod + def compare_tx_rates(required, actual): + threshold = 0.9 + are_different = False + try: + if float(actual) / required < threshold: + are_different = True + except ZeroDivisionError: + are_different = True + + if are_different: + msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \ + "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \ + "to achieve the requested TX rate.".format(r=required, a=actual) + LOG.info(msg) + return msg + + return None + + def get_per_direction_rate(self): + divisor = 2 if self.run_config['bidirectional'] else 1 + if 'rate_percent' in self.current_total_rate: + # don't split rate if it's percentage + divisor = 1 + + return utils.divide_rate(self.current_total_rate, divisor) + + def close(self): + try: + self.gen.stop_traffic() + except Exception: + pass + self.gen.clear_stats() + self.gen.cleanup() diff --git a/nfvbench/traffic_gen/__init__.py b/nfvbench/traffic_gen/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/nfvbench/traffic_gen/__init__.py diff --git a/nfvbench/traffic_gen/dummy.py b/nfvbench/traffic_gen/dummy.py new file mode 100644 index 0000000..dabdc71 --- /dev/null +++ b/nfvbench/traffic_gen/dummy.py @@ -0,0 +1,95 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nfvbench.log import LOG + +from traffic_base import AbstractTrafficGenerator +import traffic_utils as utils + + + +class DummyTG(AbstractTrafficGenerator): + """Experimental dummy traffic generator. + + This traffic generator will pretend to generate traffic and return fake data. + Useful for unit testing without actually generating any traffic. + """ + + def __init__(self, runner): + AbstractTrafficGenerator.__init__(self, runner) + self.port_handle = [] + self.rates = [] + + def get_version(self): + return "0.1" + + def init(self): + pass + + def connect(self): + ports = list(self.config.generator_config.ports) + self.port_handle = ports + + def is_arp_successful(self): + return True + + def config_interface(self): + pass + + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + pass + + def modify_rate(self, rate, reverse): + port_index = int(reverse) + port = self.port_handle[port_index] + self.rates[port_index] = utils.to_rate_str(rate) + LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate))) + + def clear_streamblock(self): + pass + + def get_stats(self): + result = {} + for ph in self.port_handle: + result[ph] = { + 'tx': { + 'total_pkts': 1000, + 'total_pkt_bytes': 100000, + 'pkt_rate': 100, + 'pkt_bit_rate': 1000000 + }, + 'rx': { + 'total_pkts': 1000, + 'total_pkt_bytes': 100000, + 'pkt_rate': 100, + 'pkt_bit_rate': 1000000, + 'dropped_pkts': 0 + } + } + result[ph]['rx']['max_delay_usec'] = 10.0 + result[ph]['rx']['min_delay_usec'] = 1.0 + result[ph]['rx']['avg_delay_usec'] = 2.0 + return result + + def clear_stats(self): + pass + + def start_traffic(self): + pass + + def stop_traffic(self): + pass + + def cleanup(self): + pass diff --git a/nfvbench/traffic_gen/traffic_base.py b/nfvbench/traffic_gen/traffic_base.py new file mode 100644 index 0000000..064b2a2 --- /dev/null +++ b/nfvbench/traffic_gen/traffic_base.py @@ -0,0 +1,89 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +class TrafficGeneratorException(Exception): + pass + + +class AbstractTrafficGenerator(object): + + # src_mac (6) + dst_mac (6) + mac_type (2) + frame_check (4) = 18 + l2_header_size = 18 + + imix_l2_sizes = [64, 594, 1518] + imix_l3_sizes = [size - l2_header_size for size in imix_l2_sizes] + imix_ratios = [7, 4, 1] + imix_avg_l2_size = sum(map( + lambda imix: 1.0 * imix[0] * imix[1], + zip(imix_l2_sizes, imix_ratios))) / sum(imix_ratios) + + def __init__(self, config): + self.config = config + + @abc.abstractmethod + def get_version(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def init(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def connect(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def config_interface(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def create_traffic(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def modify_traffic(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def get_stats(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def clear_traffic(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def start_traffic(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def stop_traffic(): + # Must be implemented by sub classes + return None + + @abc.abstractmethod + def cleanup(): + # Must be implemented by sub classes + return None diff --git a/nfvbench/traffic_gen/traffic_utils.py b/nfvbench/traffic_gen/traffic_utils.py new file mode 100644 index 0000000..e5dc463 --- /dev/null +++ b/nfvbench/traffic_gen/traffic_utils.py @@ -0,0 +1,160 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import bitmath +from traffic_base import AbstractTrafficGenerator + + +def convert_rates(l2frame_size, rate, intf_speed): + avg_packet_size = get_average_packet_size(l2frame_size) + if 'rate_pps' in rate: + initial_rate_type = 'rate_pps' + pps = rate['rate_pps'] + bps = pps_to_bps(pps, avg_packet_size) + load = bps_to_load(bps, intf_speed) + elif 'rate_bps' in rate: + initial_rate_type = 'rate_bps' + bps = rate['rate_bps'] + load = bps_to_load(bps, intf_speed) + pps = bps_to_pps(bps, avg_packet_size) + elif 'rate_percent' in rate: + initial_rate_type = 'rate_percent' + load = rate['rate_percent'] + bps = load_to_bps(load, intf_speed) + pps = bps_to_pps(bps, avg_packet_size) + else: + raise Exception('Traffic config needs to have a rate type key') + + return { + 'initial_rate_type': initial_rate_type, + 'rate_pps': pps, + 'rate_percent': load, + 'rate_bps': bps + } + + +def get_average_packet_size(l2frame_size): + if l2frame_size.upper() == 'IMIX': + return AbstractTrafficGenerator.imix_avg_l2_size + else: + return float(l2frame_size) + + +def load_to_bps(load_percentage, intf_speed): + return float(load_percentage) / 100.0 * intf_speed + + +def bps_to_load(bps, intf_speed): + return float(bps) / intf_speed * 100.0 + + +def bps_to_pps(bps, avg_packet_size): + return float(bps) / (avg_packet_size + 20.0) / 8 + + +def pps_to_bps(pps, avg_packet_size): + return float(pps) * (avg_packet_size + 20.0) * 8 + + +def weighted_avg(weight, count): + if sum(weight): + return sum(map(lambda x: x[0] * x[1], zip(weight, count))) / sum(weight) + else: + return float('nan') + +multiplier_map = { + 'K': 1000, + 'M': 1000000, + 'G': 1000000000 +} + +def parse_rate_str(rate_str): + if rate_str.endswith('pps'): + rate_pps = rate_str[:-3] + if not rate_pps: + raise Exception('%s is missing a numeric value' % rate_str) + try: + multiplier = multiplier_map[rate_pps[-1].upper()] + rate_pps = rate_pps[:-1] + except KeyError: + multiplier = 1 + rate_pps = int(rate_pps.strip()) * multiplier + if rate_pps <= 0: + raise Exception('%s is out of valid range' % rate_str) + return {'rate_pps': str(rate_pps)} + elif rate_str.endswith('ps'): + rate = rate_str.replace('ps', '').strip() + bit_rate = bitmath.parse_string(rate).bits + if bit_rate <= 0: + raise Exception('%s is out of valid range' % rate_str) + return {'rate_bps': str(int(bit_rate))} + elif rate_str.endswith('%'): + rate_percent = float(rate_str.replace('%', '').strip()) + if rate_percent <= 0 or rate_percent > 100.0: + raise Exception('%s is out of valid range (must be 1-100%%)' % rate_str) + return {'rate_percent': str(rate_percent)} + else: + raise Exception('Unknown rate string format %s' % rate_str) + + +def divide_rate(rate, divisor): + if 'rate_pps' in rate: + key = 'rate_pps' + value = int(rate[key]) + elif 'rate_bps' in rate: + key = 'rate_bps' + value = int(rate[key]) + else: + key = 'rate_percent' + value = float(rate[key]) + value /= divisor + rate = dict(rate) + rate[key] = str(value) if value else str(1) + return rate + + +def to_rate_str(rate): + if 'rate_pps' in rate: + pps = rate['rate_pps'] + return '{}pps'.format(pps) + elif 'rate_bps' in rate: + bps = rate['rate_bps'] + return '{}bps'.format(bps) + elif 'rate_percent' in rate: + load = rate['rate_percent'] + return '{}%'.format(load) + else: + assert False + + +def nan_replace(d): + """Replaces every occurence of 'N/A' with float nan.""" + for k, v in d.iteritems(): + if isinstance(v, dict): + nan_replace(v) + elif v == 'N/A': + d[k] = float('nan') + + +def mac_to_int(mac): + """Converts MAC address to integer representation.""" + return int(mac.translate(None, ":.- "), 16) + + +def int_to_mac(i): + """Converts integer representation of MAC address to hex string.""" + mac = format(i, 'x').zfill(12) + blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)] + return ':'.join(blocks) diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py new file mode 100644 index 0000000..6c2a304 --- /dev/null +++ b/nfvbench/traffic_gen/trex.py @@ -0,0 +1,456 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import defaultdict +from itertools import count +from nfvbench.log import LOG +from nfvbench.specs import ChainType +from nfvbench.traffic_server import TRexTrafficServer +from nfvbench.utils import timeout +from nfvbench.utils import TimeoutError +import os +import random +import time +import traceback +from traffic_base import AbstractTrafficGenerator +from traffic_base import TrafficGeneratorException +import traffic_utils as utils + + +from trex_stl_lib.api import CTRexVmInsFixHwCs +from trex_stl_lib.api import Dot1Q +from trex_stl_lib.api import Ether +from trex_stl_lib.api import IP +from trex_stl_lib.api import STLClient +from trex_stl_lib.api import STLError +from trex_stl_lib.api import STLFlowLatencyStats +from trex_stl_lib.api import STLFlowStats +from trex_stl_lib.api import STLPktBuilder +from trex_stl_lib.api import STLScVmRaw +from trex_stl_lib.api import STLStream +from trex_stl_lib.api import STLTXCont +from trex_stl_lib.api import STLVmFixChecksumHw +from trex_stl_lib.api import STLVmFlowVar +from trex_stl_lib.api import STLVmFlowVarRepetableRandom +from trex_stl_lib.api import STLVmWrFlowVar +from trex_stl_lib.api import UDP +from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP + + +class TRex(AbstractTrafficGenerator): + + LATENCY_PPS = 1000 + + def __init__(self, runner): + AbstractTrafficGenerator.__init__(self, runner) + self.client = None + self.id = count() + self.latencies = defaultdict(list) + self.stream_ids = defaultdict(list) + self.port_handle = [] + self.streamblock = defaultdict(list) + self.rates = [] + self.arps = {} + + def get_version(self): + return self.client.get_server_version() + + def extract_stats(self, in_stats): + utils.nan_replace(in_stats) + LOG.debug(in_stats) + + result = {} + for ph in self.port_handle: + stats = self.__combine_stats(in_stats, ph) + result[ph] = { + 'tx': { + 'total_pkts': stats['tx_pkts']['total'], + 'total_pkt_bytes': stats['tx_bytes']['total'], + 'pkt_rate': stats['tx_pps']['total'], + 'pkt_bit_rate': stats['tx_bps']['total'] + }, + 'rx': { + 'total_pkts': stats['rx_pkts']['total'], + 'total_pkt_bytes': stats['rx_bytes']['total'], + 'pkt_rate': stats['rx_pps']['total'], + 'pkt_bit_rate': stats['rx_bps']['total'], + 'dropped_pkts': stats['tx_pkts']['total'] - stats['rx_pkts']['total'] + } + } + + lat = self.__combine_latencies(in_stats, ph) + result[ph]['rx']['max_delay_usec'] = lat.get('total_max', float('nan')) + result[ph]['rx']['min_delay_usec'] = lat.get('total_min', float('nan')) + result[ph]['rx']['avg_delay_usec'] = lat.get('average', float('nan')) + + total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts'] + result["total_tx_rate"] = total_tx_pkts / self.config.duration_sec + return result + + def __combine_stats(self, in_stats, port_handle): + """Traverses TRex result dictionary and combines stream stats. Used for combining latency + and regular streams together. + """ + result = defaultdict(lambda: defaultdict(float)) + + for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]: + record = in_stats['flow_stats'][pg_id] + for stat_type, stat_type_values in record.iteritems(): + for ph, value in stat_type_values.iteritems(): + result[stat_type][ph] += value + + return result + + def __combine_latencies(self, in_stats, port_handle): + """Traverses TRex result dictionary and combines chosen latency stats.""" + if not len(self.latencies[port_handle]): + return {} + + result = defaultdict(float) + result['total_min'] = float("inf") + for lat_id in self.latencies[port_handle]: + lat = in_stats['latency'][lat_id] + result['dropped_pkts'] += lat['err_cntrs']['dropped'] + result['total_max'] = max(lat['latency']['total_max'], result['total_max']) + result['total_min'] = min(lat['latency']['total_min'], result['total_min']) + result['average'] += lat['latency']['average'] + + result['average'] /= len(self.latencies[port_handle]) + + return result + + def create_pkt(self, stream_cfg, l2frame_size): + # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP) + payload = 'x' * (max(64, int(l2frame_size)) - 46) + + pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + + if stream_cfg['vlan_tag'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) + + pkt_base /= IP() / UDP() + + if stream_cfg['ip_addrs_step'] == 'random': + src_fv = STLVmFlowVarRepetableRandom( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr_max'], + size=4, + seed=random.randint(0, 32767), + limit=stream_cfg['ip_src_count']) + dst_fv = STLVmFlowVarRepetableRandom( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr_max'], + size=4, + seed=random.randint(0, 32767), + limit=stream_cfg['ip_dst_count']) + else: + src_fv = STLVmFlowVar( + name="ip_src", + min_value=stream_cfg['ip_src_addr'], + max_value=stream_cfg['ip_src_addr'], + size=4, + op="inc", + step=stream_cfg['ip_addrs_step']) + dst_fv = STLVmFlowVar( + name="ip_dst", + min_value=stream_cfg['ip_dst_addr'], + max_value=stream_cfg['ip_dst_addr_max'], + size=4, + op="inc", + step=stream_cfg['ip_addrs_step']) + + vm_param = [ + src_fv, + STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"), + dst_fv, + STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"), + STLVmFixChecksumHw(l3_offset="IP", + l4_offset="UDP", + l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP) + ] + + return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param)) + + def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True): + idx_lat = None + streams = [] + if l2frame == 'IMIX': + for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)): + pkt = self.create_pkt(stream_cfg, l2_frame_size) + streams.append(STLStream(packet=pkt, + isg=0.1 * t, + flow_stats=STLFlowStats( + pg_id=self.stream_ids[port_handle]), + mode=STLTXCont(pps=ratio))) + + if latency: + idx_lat = self.id.next() + sl = STLStream(packet=pkt, + isg=isg, + flow_stats=STLFlowLatencyStats(pg_id=idx_lat), + mode=STLTXCont(pps=self.LATENCY_PPS)) + streams.append(sl) + else: + pkt = self.create_pkt(stream_cfg, l2frame) + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]), + mode=STLTXCont())) + + if latency: + idx_lat = self.id.next() + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=idx_lat), + mode=STLTXCont(pps=self.LATENCY_PPS))) + + if latency: + self.latencies[port_handle].append(idx_lat) + + return streams + + def init(self): + pass + + @timeout(5) + def __connect(self, client): + client.connect() + + def __connect_after_start(self): + # after start, Trex may take a bit of time to initialize + # so we need to retry a few times + for it in xrange(self.config.generic_retry_count): + try: + time.sleep(1) + self.client.connect() + break + except Exception as ex: + if it == (self.config.generic_retry_count - 1): + raise ex + LOG.info("Retrying connection to TRex (%s)...", ex.message) + + def connect(self): + LOG.info("Connecting to TRex...") + server_ip = self.config.generator_config.ip + + # Connect to TRex server + self.client = STLClient(server=server_ip) + try: + self.__connect(self.client) + except (TimeoutError, STLError) as e: + if server_ip == '127.0.0.1': + try: + self.__start_server() + self.__connect_after_start() + except (TimeoutError, STLError) as e: + LOG.error('Cannot connect to TRex') + LOG.error(traceback.format_exc()) + logpath = '/tmp/trex.log' + if os.path.isfile(logpath): + # Wait for TRex to finish writing error message + last_size = 0 + for it in xrange(self.config.generic_retry_count): + size = os.path.getsize(logpath) + if size == last_size: + # probably not writing anymore + break + last_size = size + time.sleep(1) + with open(logpath, 'r') as f: + message = f.read() + else: + message = e.message + raise TrafficGeneratorException(message) + else: + raise TrafficGeneratorException(e.message) + + ports = list(self.config.generator_config.ports) + self.port_handle = ports + # Prepare the ports + self.client.reset(ports) + + def set_mode(self): + if self.config.service_chain == ChainType.EXT and not self.config.no_arp: + self.__set_l3_mode() + else: + self.__set_l2_mode() + + def __set_l3_mode(self): + self.client.set_service_mode(ports=self.port_handle, enabled=True) + for port, device in zip(self.port_handle, self.config.generator_config.devices): + try: + self.client.set_l3_mode(port=port, + src_ipv4=device.tg_gateway_ip, + dst_ipv4=device.dst.gateway_ip, + vlan=device.vlan_tag if device.vlan_tagging else None) + except STLError: + # TRex tries to resolve ARP already, doesn't have to be successful yet + continue + self.client.set_service_mode(ports=self.port_handle, enabled=False) + + def __set_l2_mode(self): + self.client.set_service_mode(ports=self.port_handle, enabled=True) + for port, device in zip(self.port_handle, self.config.generator_config.devices): + for cfg in device.get_stream_configs(self.config.generator_config.service_chain): + self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst']) + self.client.set_service_mode(ports=self.port_handle, enabled=False) + + def __start_server(self): + server = TRexTrafficServer() + server.run_server(self.config.generator_config) + + def resolve_arp(self): + self.client.set_service_mode(ports=self.port_handle) + LOG.info('Polling ARP until successful') + resolved = 0 + attempt = 0 + for port, device in zip(self.port_handle, self.config.generator_config.devices): + ctx = self.client.create_service_ctx(port=port) + + arps = [ + STLServiceARP(ctx, + src_ip=cfg['ip_src_tg_gw'], + dst_ip=cfg['mac_discovery_gw'], + vlan=device.vlan_tag if device.vlan_tagging else None) + for cfg in device.get_stream_configs(self.config.generator_config.service_chain) + ] + + for _ in xrange(self.config.generic_retry_count): + attempt += 1 + try: + ctx.run(arps) + except STLError: + LOG.error(traceback.format_exc()) + continue + + self.arps[port] = [arp.get_record().dst_mac for arp in arps + if arp.get_record().dst_mac is not None] + + if len(self.arps[port]) == self.config.service_chain_count: + resolved += 1 + LOG.info('ARP resolved successfully for port {}'.format(port)) + break + else: + failed = [arp.get_record().dst_ip for arp in arps + if arp.get_record().dst_mac is None] + LOG.info('Retrying ARP for: {} ({} / {})'.format( + failed, attempt, self.config.generic_retry_count)) + time.sleep(self.config.generic_poll_sec) + + self.client.set_service_mode(ports=self.port_handle, enabled=False) + return resolved == len(self.port_handle) + + def config_interface(self): + pass + + def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency): + """Check if rate provided by user is above requirements. Applies only if latency is True.""" + intf_speed = self.config.generator_config.intf_speed + if latency: + if bidirectional: + mult = 2 + total_rate = 0 + for rate in rates: + r = utils.convert_rates(l2frame_size, rate, intf_speed) + total_rate += int(r['rate_pps']) + else: + mult = 1 + total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed) + # rate must be enough for latency stream and at least 1 pps for base stream per chain + required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult + result = utils.convert_rates(l2frame_size, + {'rate_pps': required_rate}, + intf_speed * mult) + result['result'] = total_rate >= required_rate + return result + + return {'result': True} + + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) + if not r['result']: + raise TrafficGeneratorException( + 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.' + .format(pps=r['rate_pps'], + bps=r['rate_bps'], + load=r['rate_percent'])) + + stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain) + for d in self.config.generator_config.devices] + self.rates = map(lambda rate: utils.to_rate_str(rate), rates) + + for ph in self.port_handle: + # generate one pg_id for each direction + self.stream_ids[ph] = self.id.next() + + for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)): + if self.config.service_chain == ChainType.EXT and not self.config.no_arp: + fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i] + rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i] + + self.streamblock[0].extend(self.generate_streams(self.port_handle[0], + fwd_stream_cfg, + l2frame_size, + latency=latency)) + if len(self.rates) > 1: + self.streamblock[1].extend(self.generate_streams(self.port_handle[1], + rev_stream_cfg, + l2frame_size, + isg=10.0, + latency=bidirectional and latency)) + + for ph in self.port_handle: + self.client.add_streams(self.streamblock[ph], ports=ph) + LOG.info('Created traffic stream for port %s.' % ph) + + def modify_rate(self, rate, reverse): + port_index = int(reverse) + port = self.port_handle[port_index] + self.rates[port_index] = utils.to_rate_str(rate) + LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate))) + + def clear_streamblock(self): + self.streamblock = defaultdict(list) + self.latencies = defaultdict(list) + self.stream_ids = defaultdict(list) + self.rates = [] + self.client.reset(self.port_handle) + LOG.info('Cleared all existing streams.') + + def get_stats(self): + stats = self.client.get_pgid_stats() + return self.extract_stats(stats) + + def get_macs(self): + return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle] + + def clear_stats(self): + if self.port_handle: + self.client.clear_stats() + + def start_traffic(self): + for port, rate in zip(self.port_handle, self.rates): + self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True) + + def stop_traffic(self): + self.client.stop(ports=self.port_handle) + + def cleanup(self): + if self.client: + try: + self.client.reset(self.port_handle) + self.client.disconnect() + except STLError: + # TRex does not like a reset while in disconnected state + pass diff --git a/nfvbench/traffic_server.py b/nfvbench/traffic_server.py new file mode 100644 index 0000000..05f20e5 --- /dev/null +++ b/nfvbench/traffic_server.py @@ -0,0 +1,64 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from log import LOG +import os +import subprocess +import yaml + +class TrafficServerException(Exception): + pass + +class TrafficServer(object): + """Base class for traffic servers.""" + +class TRexTrafficServer(TrafficServer): + """Creates configuration file for TRex and runs server.""" + + def __init__(self, trex_base_dir='/opt/trex'): + contents = os.listdir(trex_base_dir) + # only one version of TRex should be supported in container + assert(len(contents) == 1) + self.trex_dir = os.path.join(trex_base_dir, contents[0]) + + def run_server(self, traffic_profile, filename='/etc/trex_cfg.yaml'): + """ + Runs TRex server for specified traffic profile. + + :param traffic_profile: traffic profile object based on config file + :param filename: path where to save TRex config file + """ + cfg = self.__save_config(traffic_profile, filename) + cores = traffic_profile.cores + subprocess.Popen(['nohup', '/bin/bash', '-c', + './t-rex-64 -i -c {} --iom 0 --no-scapy-server --close-at-end --vlan' + ' --cfg {} &> /tmp/trex.log & disown'.format(cores, cfg)], + cwd=self.trex_dir) + LOG.info('TRex server is running...') + + def __save_config(self, traffic_profile, filename): + ifs = ",".join([repr(pci) for pci in traffic_profile.pcis]) + + result = """# Config generated by NFVBench tool + - port_limit : 2 + version : 2 + interfaces : [{ifs}]""".format(ifs=ifs) + + yaml.safe_load(result) + if os.path.exists(filename): + os.remove(filename) + with open(filename, 'w') as f: + f.write(result) + + return filename diff --git a/nfvbench/utils.py b/nfvbench/utils.py new file mode 100644 index 0000000..4d9749c --- /dev/null +++ b/nfvbench/utils.py @@ -0,0 +1,170 @@ +# Copyright 2016 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import errno +import fcntl +from functools import wraps +import json +from log import LOG +import os +import re +import signal +import subprocess + + +class TimeoutError(Exception): + pass + + +def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + def wrapper(*args, **kwargs): + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wraps(func)(wrapper) + + return decorator + + +def save_json_result(result, json_file, std_json_path, service_chain, service_chain_count, + flow_count, frame_sizes): + """Save results in json format file.""" + filepaths = [] + if json_file: + filepaths.append(json_file) + if std_json_path: + name_parts = [service_chain, str(service_chain_count), str(flow_count)] + list(frame_sizes) + filename = '-'.join(name_parts) + '.json' + filepaths.append(os.path.join(std_json_path, filename)) + + if filepaths: + for file_path in filepaths: + LOG.info('Saving results in json file: ' + file_path + "...") + with open(file_path, 'w') as jfp: + json.dump(result, + jfp, + indent=4, + sort_keys=True, + separators=(',', ': '), + default=lambda obj: obj.to_json()) + + +def byteify(data, ignore_dicts=False): + # if this is a unicode string, return its string representation + if isinstance(data, unicode): + return data.encode('utf-8') + # if this is a list of values, return list of byteified values + if isinstance(data, list): + return [byteify(item, ignore_dicts=ignore_dicts) for item in data] + # if this is a dictionary, return dictionary of byteified keys and values + # but only if we haven't already byteified it + if isinstance(data, dict) and not ignore_dicts: + return {byteify(key, ignore_dicts=ignore_dicts): byteify(value, ignore_dicts=ignore_dicts) + for key, value in data.iteritems()} + # if it's anything else, return it in its original form + return data + + +def dict_to_json_dict(record): + return json.loads(json.dumps(record, default=lambda obj: obj.to_json())) + + +def get_intel_pci(nic_ports): + """Returns the first two PCI addresses of sorted PCI list for Intel NIC (i40e, ixgbe)""" + hx = r'[0-9a-fA-F]' + regex = r'{hx}{{4}}:({hx}{{2}}:{hx}{{2}}\.{hx}{{1}}).*(drv={driver}|.*unused=.*{driver})' + + try: + trex_base_dir = '/opt/trex' + contents = os.listdir(trex_base_dir) + trex_dir = os.path.join(trex_base_dir, contents[0]) + process = subprocess.Popen(['python', 'dpdk_setup_ports.py', '-s'], + cwd=trex_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + devices, _ = process.communicate() + except Exception: + devices = '' + + for driver in ['i40e', 'ixgbe']: + matches = re.findall(regex.format(hx=hx, driver=driver), devices) + if matches: + pcis = map(lambda x: x[0], matches) + if len(pcis) < 2: + continue + pcis.sort() + return [pcis[port_index] for port_index in nic_ports] + + return [] + + +multiplier_map = { + 'K': 1000, + 'M': 1000000, + 'G': 1000000000 +} + + +def parse_flow_count(flow_count): + flow_count = str(flow_count) + input_fc = flow_count + multiplier = 1 + if flow_count[-1].upper() in multiplier_map: + multiplier = multiplier_map[flow_count[-1].upper()] + flow_count = flow_count[:-1] + + try: + flow_count = int(flow_count) + except ValueError: + raise Exception("Unknown flow count format '{}'".format(input_fc)) + + return flow_count * multiplier + + +class RunLock(object): + """ + Attempts to lock file and run current instance of NFVbench as the first, + otherwise raises exception. + """ + + def __init__(self, path='/tmp/nfvbench.lock'): + self._path = path + self._fd = None + + def __enter__(self): + try: + self._fd = os.open(self._path, os.O_CREAT) + fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except (OSError, IOError): + raise Exception('Other NFVbench process is running. Please wait') + + def __exit__(self, *args): + fcntl.flock(self._fd, fcntl.LOCK_UN) + os.close(self._fd) + self._fd = None + + # Try to remove the lock file, but don't try too hard because it is unnecessary. + try: + os.unlink(self._path) + except (OSError, IOError): + pass |