diff options
Diffstat (limited to 'nfvbench')
-rwxr-xr-x | nfvbench/cfg.default.yaml | 18 | ||||
-rw-r--r-- | nfvbench/chain_clients.py | 121 | ||||
-rw-r--r-- | nfvbench/chain_managers.py | 75 | ||||
-rw-r--r-- | nfvbench/cleanup.py | 179 | ||||
-rw-r--r-- | nfvbench/config_plugin.py | 6 | ||||
-rw-r--r-- | nfvbench/fluentd.py | 11 | ||||
-rw-r--r-- | nfvbench/network.py | 29 | ||||
-rw-r--r-- | nfvbench/nfvbench.py | 124 | ||||
-rw-r--r-- | nfvbench/nfvbenchd.py | 13 | ||||
-rw-r--r-- | nfvbench/service_chain.py | 15 | ||||
-rw-r--r-- | nfvbench/summarizer.py | 8 | ||||
-rwxr-xr-x | nfvbench/traffic_client.py | 109 | ||||
-rw-r--r-- | nfvbench/traffic_gen/dummy.py | 14 | ||||
-rw-r--r-- | nfvbench/traffic_gen/traffic_base.py | 25 | ||||
-rw-r--r-- | nfvbench/traffic_gen/trex.py | 93 | ||||
-rw-r--r-- | nfvbench/traffic_server.py | 8 | ||||
-rw-r--r-- | nfvbench/utils.py | 73 |
17 files changed, 691 insertions, 230 deletions
diff --git a/nfvbench/cfg.default.yaml b/nfvbench/cfg.default.yaml index e1c05c3..2af6d63 100755 --- a/nfvbench/cfg.default.yaml +++ b/nfvbench/cfg.default.yaml @@ -131,6 +131,11 @@ sriov: false # Can be overriden by --no-int-config no_int_config: false +# Perform port to port loopback (direct or through switch) +# Should be used with EXT service chain and no ARP (no_arp: true) +# Can be overriden by --l2-loopback +l2_loopback: false + # Resources created by NFVbench will not be removed # Can be overriden by --no-cleanup no_cleanup: false @@ -231,8 +236,8 @@ loop_vm_name: 'nfvbench-loop-vm' # - PVVP uses left, middle and right # - for EXT chains, this structure is not relevant - refer to external_networks # Otherwise a new internal network will be created with that name, subnet and CIDR. -# -# segmentation_id can be set to enforce a specific VLAN id - by default (empty) the VLAN id +# +# segmentation_id can be set to enforce a specific VLAN id - by default (empty) the VLAN id # will be assigned by Neutron. # Must be unique for each network # physical_network can be set to pick a specific phsyical network - by default (empty) the @@ -284,14 +289,14 @@ external_networks: right: 'nfvbench-net1' # Use 'true' to enable VLAN tagging of packets generated and sent by the traffic generator -# Leave empty you do not want the traffic generator to insert the VLAN tag. This is +# Leave empty you do not want the traffic generator to insert the VLAN tag. This is # needed for example if VLAN tagging is enabled on switch (trunk mode) or if you want to hook directly to a NIC # By default is set to true (which is the nominal use case with TOR and trunk mode to Trex) vlan_tagging: true # Specify only when you want to override VLAN IDs used for tagging with own values (exactly 2). -# Default behavior of VLAN tagging is to retrieve VLAN IDs from OpenStack networks provided above. -# In case of VxLAN this setting is ignored and only vtep_vlan from traffic generator profile is used. +# Default behavior (empty list) is to retrieve VLAN IDs from OpenStack networks described in external_networks. +# This property is ignored in the case of l2-loopback # Example: [1998, 1999] vlans: [] @@ -354,6 +359,9 @@ duration_sec: 60 # Can be overridden by --interval interval_sec: 10 +# Default pause between iterations of a binary search (NDR/PDR) +pause_sec: 2 + # NDR / PDR configuration measurement: # Drop rates represent the ratio of dropped packet to the total number of packets sent. diff --git a/nfvbench/chain_clients.py b/nfvbench/chain_clients.py index faf7c2a..71c6c97 100644 --- a/nfvbench/chain_clients.py +++ b/nfvbench/chain_clients.py @@ -185,7 +185,7 @@ class BasicStageClient(object): return None return availability_zone + ':' + host - def _lookup_servers(self, name=None, nets=None, az=None, flavor_id=None): + def _lookup_servers(self, name=None, nets=None, flavor_id=None): error_msg = 'VM with the same name, but non-matching {} found. Aborting.' networks = set([net['name'] for net in nets]) if nets else None server_list = self.comp.get_server_list() @@ -224,7 +224,10 @@ class BasicStageClient(object): files={nfvbenchvm_config_location: nfvbenchvm_config}) if server: setattr(server, 'is_reuse', False) - LOG.info('Creating instance: %s on %s', name, az) + msg = 'Creating instance: %s' % name + if az: + msg += ' on %s' % az + LOG.info(msg) else: raise StageClientException('Unable to create instance: %s.' % (name)) return server @@ -380,8 +383,8 @@ class BasicStageClient(object): compute_nodes.append(az + ':' + hostname) return compute_nodes - def get_reusable_vm(self, name, nets, az): - servers = self._lookup_servers(name=name, nets=nets, az=az, + def get_reusable_vm(self, name, nets): + servers = self._lookup_servers(name=name, nets=nets, flavor_id=self.flavor_type['flavor'].id) if servers: server = servers[0] @@ -477,17 +480,19 @@ class PVPStageClient(BasicStageClient): nets = self.config.internal_networks self.nets.extend([self._create_net(**n) for n in [nets.left, nets.right]]) - az_list = self.comp.get_enabled_az_host_list(required_count=1) - if not az_list: - raise Exception('Not enough hosts found.') + if self.comp.config.compute_nodes: + az_list = self.comp.get_enabled_az_host_list(required_count=1) + if not az_list: + raise Exception('Not enough hosts found.') + az = az_list[0] + else: + az = None - az = az_list[0] - self.compute_nodes.add(az) for chain_index in xrange(self.config.service_chain_count): name = self.config.loop_vm_name + str(chain_index) - reusable_vm = self.get_reusable_vm(name, self.nets, az) - if reusable_vm: - self.vms.append(reusable_vm) + server = self.get_reusable_vm(name, self.nets) + if server: + self.vms.append(server) else: vnic_type = 'direct' if self.config.sriov else 'normal' ports = [self._create_port(net, vnic_type) for net in self.nets] @@ -497,8 +502,19 @@ class PVPStageClient(BasicStageClient): ports[0]['mac_address'], ports[1]['mac_address']) self.created_ports.extend(ports) - self.vms.append(self._create_server(name, ports, az, config_file)) + server = self._create_server(name, ports, az, config_file) + self.vms.append(server) + + if chain_index == 0: + # First VM, save the hypervisor name. Used in future for + # maintain affinity. + self._ensure_vms_active() + server = self.comp.poll_server(server) + az = "%s:%s" % (getattr(server, 'OS-EXT-AZ:availability_zone'), + getattr(server, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) + self._ensure_vms_active() + self.compute_nodes = set(self.get_loop_vm_compute_nodes()) self.set_ports() @@ -519,37 +535,36 @@ class PVVPStageClient(BasicStageClient): nets = self.config.internal_networks self.nets.extend([self._create_net(**n) for n in [nets.left, nets.right, nets.middle]]) - required_count = 2 if self.config.inter_node else 1 - az_list = self.comp.get_enabled_az_host_list(required_count=required_count) - - if not az_list: - raise Exception('Not enough hosts found.') - - az1 = az2 = az_list[0] - if self.config.inter_node: - if len(az_list) > 1: - az1 = az_list[0] - az2 = az_list[1] - else: - # fallback to intra-node - az1 = az2 = az_list[0] - self.config.inter_node = False - LOG.info('Using intra-node instead of inter-node.') - - self.compute_nodes.add(az1) - self.compute_nodes.add(az2) + if self.comp.config.compute_nodes: + required_count = 2 if self.config.inter_node else 1 + az_list = self.comp.get_enabled_az_host_list(required_count=required_count) + if not az_list: + raise Exception('Not enough hosts found.') + + az1 = az2 = az_list[0] + if self.config.inter_node: + if len(az_list) > 1: + az1 = az_list[0] + az2 = az_list[1] + else: + # fallback to intra-node + az1 = az2 = az_list[0] + self.config.inter_node = False + LOG.info('Using intra-node instead of inter-node.') + else: + az1 = az2 = None # Create loop VMs for chain_index in xrange(self.config.service_chain_count): name0 = self.config.loop_vm_name + str(chain_index) + 'a' # Attach first VM to net0 and net2 vm0_nets = self.nets[0::2] - reusable_vm0 = self.get_reusable_vm(name0, vm0_nets, az1) + reusable_vm0 = self.get_reusable_vm(name0, vm0_nets) name1 = self.config.loop_vm_name + str(chain_index) + 'b' # Attach second VM to net1 and net2 vm1_nets = self.nets[1:] - reusable_vm1 = self.get_reusable_vm(name1, vm1_nets, az2) + reusable_vm1 = self.get_reusable_vm(name1, vm1_nets) if reusable_vm0 and reusable_vm1: self.vms.extend([reusable_vm0, reusable_vm1]) @@ -583,14 +598,36 @@ class PVVPStageClient(BasicStageClient): vm1_port_net2['mac_address'], vm1_port_net1['mac_address']) - self.vms.append(self._create_server(name0, - [vm0_port_net0, vm0_port_net2], - az1, - config_file0)) - self.vms.append(self._create_server(name1, - [vm1_port_net2, vm1_port_net1], - az2, - config_file1)) + vm1 = self._create_server(name0, [vm0_port_net0, vm0_port_net2], az1, config_file0) + self.vms.append(vm1) + if chain_index == 0: + # First VM on first chain, save the hypervisor name. Used + # in future for maintain affinity. + self._ensure_vms_active() + vm1 = self.comp.poll_server(vm1) + az1 = "%s:%s" % (getattr(vm1, 'OS-EXT-AZ:availability_zone'), + getattr(vm1, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) + if not self.config.inter_node: + # By default, NOVA scheduler will try first with + # different hypervisor for workload balance, but when + # inter-node is not configured, use the same AZ to run + # intra-node test case. + az2 = az1 + + vm2 = self._create_server(name1, [vm1_port_net2, vm1_port_net1], az2, config_file1) + self.vms.append(vm2) + if chain_index == 0 and self.config.inter_node: + # Second VM on first chain, save the hypervisor name. Used + # in future for maintain affinity. + self._ensure_vms_active() + vm2 = self.comp.poll_server(vm2) + az2 = "%s:%s" % (getattr(vm2, 'OS-EXT-AZ:availability_zone'), + getattr(vm2, 'OS-EXT-SRV-ATTR:hypervisor_hostname')) + if az1 == az2: + # Configure to run inter-node, but not enough node to run + self.config.inter_node = False + LOG.info('Using intra-node instead of inter-node.') self._ensure_vms_active() + self.compute_nodes = set(self.get_loop_vm_compute_nodes()) self.set_ports() diff --git a/nfvbench/chain_managers.py b/nfvbench/chain_managers.py index 087c751..9cd6c7d 100644 --- a/nfvbench/chain_managers.py +++ b/nfvbench/chain_managers.py @@ -15,7 +15,6 @@ # import time - from log import LOG from network import Network from packet_analyzer import PacketAnalyzer @@ -24,6 +23,7 @@ from stats_collector import IntervalCollector class StageManager(object): + """A class to stage resources in the systenm under test.""" def __init__(self, config, cred, factory): self.config = config @@ -56,7 +56,7 @@ class StageManager(object): return self.client.ports def get_compute_nodes(self): - return self.client.compute_nodes + return self.client.compute_nodes if self.client else {} def set_vm_macs(self): if self.client and self.config.service_chain != ChainType.EXT: @@ -67,7 +67,8 @@ class StageManager(object): self.client.dispose() -class StatsManager(object): +class PVPStatsManager(object): + """A class to generate traffic and extract results for PVP chains.""" def __init__(self, config, clients, specs, factory, vlans, notifier=None): self.config = config @@ -89,19 +90,31 @@ class StatsManager(object): try: self.worker.set_vlans(self.vlans) self._config_interfaces() - except Exception as exc: + except Exception: # since the wrorker is up and running, we need to close it # in case of exception self.close() - raise exc + raise def _get_data(self): return self.worker.get_data() if self.worker else {} - def _get_network(self, traffic_port, index=None, reverse=False): - interfaces = [self.clients['traffic'].get_interface(traffic_port)] + def _get_network(self, traffic_port, stats, reverse=False): + """Get the Network object corresponding to a given TG port. + + :param traffic_port: must be either 0 or 1 + :param stats: TG stats for given traffic port + :param reverse: specifies if the interface list for this network + should go from TG to loopback point (reverse=false) or + from loopback point to TG (reverse=true) + """ + # build the interface list in fwd direction (TG To loopback point) + interfaces = [self.clients['traffic'].get_interface(traffic_port, stats)] if self.worker: - interfaces.extend(self.worker.get_network_interfaces(index)) + # if available, + # interfaces for workers must be aligned on the TG port number + interfaces.extend(self.worker.get_network_interfaces(traffic_port)) + # let Network reverse the interface order if needed return Network(interfaces, reverse) def _config_interfaces(self): @@ -131,9 +144,7 @@ class StatsManager(object): return self.worker.get_version() if self.worker else {} def run(self): - """ - Run analysis in both direction and return the analysis - """ + """Run analysis in both direction and return the analysis.""" if self.worker: self.worker.run() @@ -144,16 +155,18 @@ class StatsManager(object): 'stats': stats } + # fetch latest stats from traffic gen + stats = self.clients['traffic'].get_stats() LOG.info('Requesting packet analysis on the forward direction...') result['packet_analysis']['direction-forward'] = \ - self.get_analysis([self._get_network(0, 0), - self._get_network(0, 1, reverse=True)]) + self.get_analysis([self._get_network(0, stats), + self._get_network(1, stats, reverse=True)]) LOG.info('Packet analysis on the forward direction completed') LOG.info('Requesting packet analysis on the reverse direction...') result['packet_analysis']['direction-reverse'] = \ - self.get_analysis([self._get_network(1, 1), - self._get_network(1, 0, reverse=True)]) + self.get_analysis([self._get_network(1, stats), + self._get_network(0, stats, reverse=True)]) LOG.info('Packet analysis on the reverse direction completed') return result @@ -182,21 +195,14 @@ class StatsManager(object): self.worker.close() -class PVPStatsManager(StatsManager): +class PVVPStatsManager(PVPStatsManager): + """A Class to generate traffic and extract results for PVVP chains.""" def __init__(self, config, clients, specs, factory, vlans, notifier=None): - StatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) - - -class PVVPStatsManager(StatsManager): - - def __init__(self, config, clients, specs, factory, vlans, notifier=None): - StatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) + PVPStatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) def run(self): - """ - Run analysis in both direction and return the analysis - """ + """Run analysis in both direction and return the analysis.""" fwd_v2v_net, rev_v2v_net = self.worker.run() stats = self._generate_traffic() @@ -205,16 +211,17 @@ class PVVPStatsManager(StatsManager): 'packet_analysis': {}, 'stats': stats } - - fwd_nets = [self._get_network(0, 0)] + # fetch latest stats from traffic gen + stats = self.clients['traffic'].get_stats() + fwd_nets = [self._get_network(0, stats)] if fwd_v2v_net: fwd_nets.append(fwd_v2v_net) - fwd_nets.append(self._get_network(0, 1, reverse=True)) + fwd_nets.append(self._get_network(1, stats, reverse=True)) - rev_nets = [self._get_network(1, 1)] + rev_nets = [self._get_network(1, stats)] if rev_v2v_net: rev_nets.append(rev_v2v_net) - rev_nets.append(self._get_network(1, 0, reverse=True)) + rev_nets.append(self._get_network(0, stats, reverse=True)) LOG.info('Requesting packet analysis on the forward direction...') result['packet_analysis']['direction-forward'] = self.get_analysis(fwd_nets) @@ -227,9 +234,11 @@ class PVVPStatsManager(StatsManager): return result -class EXTStatsManager(StatsManager): +class EXTStatsManager(PVPStatsManager): + """A Class to generate traffic and extract results for EXT chains.""" + def __init__(self, config, clients, specs, factory, vlans, notifier=None): - StatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) + PVPStatsManager.__init__(self, config, clients, specs, factory, vlans, notifier) def _setup(self): if self.specs.openstack: diff --git a/nfvbench/cleanup.py b/nfvbench/cleanup.py new file mode 100644 index 0000000..246be3f --- /dev/null +++ b/nfvbench/cleanup.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# Copyright 2017 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import sys +import time + +from neutronclient.neutron import client as nclient +from novaclient.client import Client +from novaclient.exceptions import NotFound +from tabulate import tabulate + +import credentials as credentials +from log import LOG + +class ComputeCleaner(object): + """A cleaner for compute resources.""" + + def __init__(self, nova_client, instance_prefix): + self.nova_client = nova_client + LOG.info('Discovering instances %s...', instance_prefix) + all_servers = self.nova_client.servers.list() + self.servers = [server for server in all_servers + if server.name.startswith(instance_prefix)] + + def instance_exists(self, server): + try: + self.nova_client.servers.get(server.id) + except NotFound: + return False + return True + + def get_resource_list(self): + return [["Instance", server.name, server.id] for server in self.servers] + + def clean(self): + if self.servers: + for server in self.servers: + try: + LOG.info('Deleting instance %s...', server.name) + self.nova_client.servers.delete(server.id) + except Exception: + LOG.exception("Instance %s deletion failed", server.name) + LOG.info(' Waiting for %d instances to be fully deleted...', len(self.servers)) + retry_count = 5 + len(self.servers) * 2 + while True: + retry_count -= 1 + self.servers = [server for server in self.servers if self.instance_exists(server)] + if not self.servers: + break + + if retry_count: + LOG.info(' %d yet to be deleted by Nova, retries left=%d...', + len(self.servers), retry_count) + time.sleep(2) + else: + LOG.warning(' instance deletion verification timed out: %d not removed', + len(self.servers)) + break + + +class NetworkCleaner(object): + """A cleaner for network resources.""" + + def __init__(self, neutron_client, network_names): + self.neutron_client = neutron_client + LOG.info('Discovering networks...') + all_networks = self.neutron_client.list_networks()['networks'] + self.networks = [] + for net in all_networks: + try: + network_names.remove(net['name']) + self.networks.append(net) + except ValueError: + pass + if not network_names: + break + net_ids = [net['id'] for net in self.networks] + if net_ids: + LOG.info('Discovering ports...') + all_ports = self.neutron_client.list_ports()['ports'] + self.ports = [port for port in all_ports if port['network_id'] in net_ids] + else: + self.ports = [] + + def get_resource_list(self): + res_list = [["Network", net['name'], net['id']] for net in self.networks] + res_list.extend([["Port", port['name'], port['id']] for port in self.ports]) + return res_list + + def clean(self): + for port in self.ports: + LOG.info("Deleting port %s...", port['id']) + try: + self.neutron_client.delete_port(port['id']) + except Exception: + LOG.exception("Port deletion failed") + + for net in self.networks: + LOG.info("Deleting network %s...", net['name']) + try: + self.neutron_client.delete_network(net['id']) + except Exception: + LOG.exception("Network deletion failed") + +class FlavorCleaner(object): + """Cleaner for NFVbench flavor.""" + + def __init__(self, nova_client, name): + self.name = name + LOG.info('Discovering flavor %s...', name) + try: + self.flavor = nova_client.flavors.find(name=name) + except NotFound: + self.flavor = None + + def get_resource_list(self): + if self.flavor: + return [['Flavor', self.name, self.flavor.id]] + return None + + def clean(self): + if self.flavor: + LOG.info("Deleting flavor %s...", self.flavor.name) + try: + self.flavor.delete() + except Exception: + LOG.exception("Flavor deletion failed") + +class Cleaner(object): + """Cleaner for all NFVbench resources.""" + + def __init__(self, config): + cred = credentials.Credentials(config.openrc_file, None, False) + session = cred.get_session() + self.neutron_client = nclient.Client('2.0', session=session) + self.nova_client = Client(2, session=session) + network_names = [inet['name'] for inet in config.internal_networks.values()] + self.cleaners = [ComputeCleaner(self.nova_client, config.loop_vm_name), + FlavorCleaner(self.nova_client, config.flavor_type), + NetworkCleaner(self.neutron_client, network_names)] + + def show_resources(self): + """Show all NFVbench resources.""" + table = [["Type", "Name", "UUID"]] + for cleaner in self.cleaners: + res_list = cleaner.get_resource_list() + if res_list: + table.extend(res_list) + count = len(table) - 1 + if count: + LOG.info('Discovered %d NFVbench resources:', count) + print tabulate(table, headers="firstrow", tablefmt="psql") + else: + LOG.info('No matching NFVbench resources found') + return count + + def clean(self, prompt): + """Clean all resources.""" + LOG.info("NFVbench will delete all resources shown...") + if prompt: + answer = raw_input("Are you sure? (y/n) ") + if answer.lower() != 'y': + LOG.info("Exiting without deleting any resource") + sys.exit(0) + for cleaner in self.cleaners: + cleaner.clean() diff --git a/nfvbench/config_plugin.py b/nfvbench/config_plugin.py index 78c2ebb..f6654eb 100644 --- a/nfvbench/config_plugin.py +++ b/nfvbench/config_plugin.py @@ -49,7 +49,7 @@ class ConfigPluginBase(object): """Returns OpenStack specs for host.""" @abc.abstractmethod - def get_run_spec(self, openstack_spec): + def get_run_spec(self, config, openstack_spec): """Returns RunSpec for given platform.""" @abc.abstractmethod @@ -81,9 +81,9 @@ class ConfigPlugin(ConfigPluginBase): """Returns OpenStack specs for host.""" return specs.OpenStackSpec() - def get_run_spec(self, openstack_spec): + def get_run_spec(self, config, openstack_spec): """Returns RunSpec for given platform.""" - return specs.RunSpec(self.config.no_vswitch_access, openstack_spec) + return specs.RunSpec(config.no_vswitch_access, openstack_spec) def validate_config(self, config, openstack_spec): pass diff --git a/nfvbench/fluentd.py b/nfvbench/fluentd.py index 628b968..ad0ea34 100644 --- a/nfvbench/fluentd.py +++ b/nfvbench/fluentd.py @@ -35,7 +35,7 @@ class FluentLogHandler(logging.Handler): logging.Handler.__init__(self) self.log_senders = [] self.result_senders = [] - self.runlogdate = 0 + self.runlogdate = "1970-01-01T00:00:00.000000+0000" self.formatter = logging.Formatter('%(message)s') for fluentd_config in fluentd_configs: if fluentd_config.logging_tag: @@ -65,8 +65,9 @@ class FluentLogHandler(logging.Handler): "message": self.formatter.format(record), "@timestamp": self.__get_timestamp() } - # if runlogdate is 0, it's a log from server (not an nfvbench run) so do not send runlogdate - if self.runlogdate != 0: + # if runlogdate is Jan 1st 1970, it's a log from server (not an nfvbench run) + # so do not send runlogdate + if self.runlogdate != "1970-01-01T00:00:00.000000+0000": data["runlogdate"] = self.runlogdate self.__update_stats(record.levelno) @@ -103,9 +104,9 @@ class FluentLogHandler(logging.Handler): "numwarnings": self.__warning_counter, "@timestamp": self.__get_timestamp() } - # if runlogdate is 0, it's a log from server (not an nfvbench run) + # if runlogdate is Jan 1st 1970, it's a log from server (not an nfvbench run) # so don't send runlogdate - if self.runlogdate != 0: + if self.runlogdate != "1970-01-01T00:00:00.000000+0000": data["runlogdate"] = self.runlogdate for log_sender in self.log_senders: log_sender.emit(None, data) diff --git a/nfvbench/network.py b/nfvbench/network.py index e097c2b..6c02f04 100644 --- a/nfvbench/network.py +++ b/nfvbench/network.py @@ -15,8 +15,10 @@ class Interface(object): + """A class to hold the RX and TX counters for a virtual or physical interface.""" def __init__(self, name, device, tx_packets, rx_packets): + """Create a new interface instance.""" self.name = name self.device = device self.packets = { @@ -25,38 +27,65 @@ class Interface(object): } def set_packets(self, tx, rx): + """Set tx and rx counters for this interface.""" self.packets = { 'tx': tx, 'rx': rx } def set_packets_diff(self, tx, rx): + """Subtract current counters from new set of counters and update with results.""" self.packets = { 'tx': tx - self.packets['tx'], 'rx': rx - self.packets['rx'], } def is_no_op(self): + """Check if this interface is a no-opn interface.""" return self.name is None def get_packet_count(self, traffic_type): + """Get packet count for given direction.""" return self.packets.get(traffic_type, 0) @staticmethod def no_op(): + """Return an interface that doe snot pass any traffic.""" return Interface(None, None, 0, 0) class Network(object): + """This class holds all interfaces that make up a logical neutron network. + + A loopback packet path has exactly 2 networks. + The first interface is always one of the 2 traffic gen interface. + Subsequent interfaces are sorted along the path from the TG to the loopback point + which could be interfaces in a switch, a vswitch or a VM. + """ def __init__(self, interfaces=None, reverse=False): + """Create a network with initial interface list and direction. + + :param interfaces: initial interface list + :param reverse: specifies the order of interfaces returned by get_interfaces + """ if interfaces is None: interfaces = [] self.interfaces = interfaces self.reverse = reverse def add_interface(self, interface): + """Add one more interface to this network. + + Order if important as interfaces must be added from traffic generator ports towards then + looping back device. + """ self.interfaces.append(interface) def get_interfaces(self): + """Get interfaces associated to this network. + + Returned interface list is ordered from traffic generator port towards looping device if + reverse is false. Else returms the list in the reverse order. + """ return self.interfaces[::-1] if self.reverse else self.interfaces diff --git a/nfvbench/nfvbench.py b/nfvbench/nfvbench.py index 7d2e037..e0b5786 100644 --- a/nfvbench/nfvbench.py +++ b/nfvbench/nfvbench.py @@ -30,6 +30,7 @@ from pkg_resources import resource_string from __init__ import __version__ from chain_runner import ChainRunner +from cleanup import Cleaner from config import config_load from config import config_loads import credentials as credentials @@ -49,6 +50,7 @@ fluent_logger = None class NFVBench(object): """Main class of NFV benchmarking tool.""" + STATUS_OK = 'OK' STATUS_ERROR = 'ERROR' @@ -68,7 +70,7 @@ class NFVBench(object): sys.stdout.flush() def setup(self): - self.specs.set_run_spec(self.config_plugin.get_run_spec(self.specs.openstack)) + self.specs.set_run_spec(self.config_plugin.get_run_spec(self.config, self.specs.openstack)) self.chain_runner = ChainRunner(self.config, self.clients, self.cred, @@ -91,7 +93,19 @@ class NFVBench(object): try: self.update_config(opts) self.setup() - + new_frame_sizes = [] + min_packet_size = "68" if self.config.vlan_tagging else "64" + for frame_size in self.config.frame_sizes: + try: + if int(frame_size) < int(min_packet_size): + new_frame_sizes.append(min_packet_size) + LOG.info("Adjusting frame size %s Bytes to minimum size %s Bytes due to " + + "traffic generator restriction", frame_size, min_packet_size) + else: + new_frame_sizes.append(frame_size) + except ValueError: + new_frame_sizes.append(frame_size) + self.config.actual_frame_sizes = tuple(new_frame_sizes) result = { "date": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "nfvbench_version": __version__, @@ -129,7 +143,7 @@ class NFVBench(object): } def prepare_summary(self, result): - """Prepares summary of the result to print and send it to logger (eg: fluentd)""" + """Prepare summary of the result to print and send it to logger (eg: fluentd).""" global fluent_logger summary = NFVBenchSummarizer(result, fluent_logger) LOG.info(str(summary)) @@ -163,6 +177,15 @@ class NFVBench(object): self.config.duration_sec = float(self.config.duration_sec) self.config.interval_sec = float(self.config.interval_sec) + self.config.pause_sec = float(self.config.pause_sec) + + # Get traffic generator profile config + if not self.config.generator_profile: + self.config.generator_profile = self.config.traffic_generator.default_profile + + generator_factory = TrafficGeneratorFactory(self.config) + self.config.generator_config = \ + generator_factory.get_generator_config(self.config.generator_profile) # Check length of mac_addrs_left/right for serivce_chain EXT with no_arp if self.config.service_chain == ChainType.EXT and self.config.no_arp: @@ -183,14 +206,6 @@ class NFVBench(object): b=len(self.config.generator_config.mac_addrs_right), c=self.config.service_chain_count)) - # Get traffic generator profile config - if not self.config.generator_profile: - self.config.generator_profile = self.config.traffic_generator.default_profile - - generator_factory = TrafficGeneratorFactory(self.config) - self.config.generator_config = \ - generator_factory.get_generator_config(self.config.generator_profile) - if not any(self.config.generator_config.pcis): raise Exception("PCI addresses configuration for selected traffic generator profile " "({tg_profile}) are missing. Please specify them in configuration file." @@ -211,12 +226,12 @@ class NFVBench(object): if self.config.openrc_file: self.config.openrc_file = os.path.expanduser(self.config.openrc_file) - self.config.ndr_run = (not self.config.no_traffic - and 'ndr' in self.config.rate.strip().lower().split('_')) - self.config.pdr_run = (not self.config.no_traffic - and 'pdr' in self.config.rate.strip().lower().split('_')) - self.config.single_run = (not self.config.no_traffic - and not (self.config.ndr_run or self.config.pdr_run)) + self.config.ndr_run = (not self.config.no_traffic and + 'ndr' in self.config.rate.strip().lower().split('_')) + self.config.pdr_run = (not self.config.no_traffic and + 'pdr' in self.config.rate.strip().lower().split('_')) + self.config.single_run = (not self.config.no_traffic and + not (self.config.ndr_run or self.config.pdr_run)) if self.config.vlans and len(self.config.vlans) != 2: raise Exception('Number of configured VLAN IDs for VLAN tagging must be exactly 2.') @@ -240,6 +255,11 @@ class NFVBench(object): def parse_opts_from_cli(): parser = argparse.ArgumentParser() + parser.add_argument('--status', dest='status', + action='store_true', + default=None, + help='Provide NFVbench status') + parser.add_argument('-c', '--config', dest='config', action='store', help='Override default values with a config file or ' @@ -354,6 +374,16 @@ def parse_opts_from_cli(): action='store_true', help='no cleanup after run') + parser.add_argument('--cleanup', dest='cleanup', + default=None, + action='store_true', + help='Cleanup NFVbench resources (prompt to confirm)') + + parser.add_argument('--force-cleanup', dest='force_cleanup', + default=None, + action='store_true', + help='Cleanup NFVbench resources (do not prompt)') + parser.add_argument('--json', dest='json', action='store', help='store results in json format file', @@ -405,6 +435,11 @@ def parse_opts_from_cli(): action='store', help='Custom label for performance records') + parser.add_argument('--l2-loopback', '--l2loopback', dest='l2_loopback', + action='store', + metavar='<vlan>', + help='Port to port or port to switch to port L2 loopback with VLAN id') + opts, unknown_opts = parser.parse_known_args() return opts, unknown_opts @@ -417,8 +452,7 @@ def load_default_config(): def override_custom_traffic(config, frame_sizes, unidir): - """Override the traffic profiles with a custom one - """ + """Override the traffic profiles with a custom one.""" if frame_sizes is not None: traffic_profile_name = "custom_traffic_profile" config.traffic_profile = [ @@ -445,6 +479,23 @@ def check_physnet(name, netattrs): raise Exception("SRIOV requires segmentation_id to be specified for the {n} network" .format(n=name)) +def status_cleanup(config, cleanup, force_cleanup): + LOG.info('Version: %s', pbr.version.VersionInfo('nfvbench').version_string_with_vcs()) + # check if another run is pending + ret_code = 0 + try: + with utils.RunLock(): + LOG.info('Status: idle') + except Exception: + LOG.info('Status: busy (run pending)') + ret_code = 1 + # check nfvbench resources + if config.openrc_file and config.service_chain != ChainType.EXT: + cleaner = Cleaner(config) + count = cleaner.show_resources() + if count and (cleanup or force_cleanup): + cleaner.clean(not force_cleanup) + sys.exit(ret_code) def main(): global fluent_logger @@ -467,14 +518,6 @@ def main(): opts, unknown_opts = parse_opts_from_cli() log.set_level(debug=opts.debug) - # setup the fluent logger as soon as possible right after the config plugin is called, - # if there is any logging or result tag is set then initialize the fluent logger - for fluentd in config.fluentd: - if fluentd.logging_tag or fluentd.result_tag: - fluent_logger = FluentLogHandler(config.fluentd) - LOG.addHandler(fluent_logger) - break - if opts.version: print pbr.version.VersionInfo('nfvbench').version_string_with_vcs() sys.exit(0) @@ -506,6 +549,14 @@ def main(): LOG.info('Loading configuration string: %s', opts.config) config = config_loads(opts.config, config, whitelist_keys) + # setup the fluent logger as soon as possible right after the config plugin is called, + # if there is any logging or result tag is set then initialize the fluent logger + for fluentd in config.fluentd: + if fluentd.logging_tag or fluentd.result_tag: + fluent_logger = FluentLogHandler(config.fluentd) + LOG.addHandler(fluent_logger) + break + # traffic profile override options override_custom_traffic(config, opts.frame_sizes, opts.unidir) @@ -524,8 +575,22 @@ def main(): if opts.no_int_config: config.no_int_config = opts.no_int_config + # port to port loopback (direct or through switch) + if opts.l2_loopback: + config.l2_loopback = True + if config.service_chain != ChainType.EXT: + LOG.info('Changing service chain type to EXT') + config.service_chain = ChainType.EXT + if not config.no_arp: + LOG.info('Disabling ARP') + config.no_arp = True + config.vlans = [int(opts.l2_loopback), int(opts.l2_loopback)] + # disable any form of interface config since we loop at the switch level + config.no_int_config = True + LOG.info('Running L2 loopback: using EXT chain/no ARP') + if opts.use_sriov_middle_net: - if (not config.sriov) or (not config.service_chain == ChainType.PVVP): + if (not config.sriov) or (config.service_chain != ChainType.PVVP): raise Exception("--use-sriov-middle-net is only valid for PVVP with SRIOV") config.use_sriov_middle_net = True @@ -554,6 +619,9 @@ def main(): # in a copy of the dict (config plugin still holds the original dict) config_plugin.set_config(config) + if opts.status or opts.cleanup or opts.force_cleanup: + status_cleanup(config, opts.cleanup, opts.force_cleanup) + # add file log if requested if config.log_file: log.add_file_logger(config.log_file) diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py index 76906c5..fa781af 100644 --- a/nfvbench/nfvbenchd.py +++ b/nfvbench/nfvbenchd.py @@ -226,7 +226,7 @@ class WebSocketIoServer(object): # print 'main thread waiting for requests...' config = Ctx.dequeue() # print 'main thread processing request...' - print config + # print config try: # remove unfilled values as we do not want them to override default values with None config = {k: v for k, v in config.items() if v is not None} @@ -243,8 +243,15 @@ class WebSocketIoServer(object): else: # this might overwrite a previously unfetched result Ctx.set_result(results) - summary = NFVBenchSummarizer(results['result'], self.fluent_logger) - LOG.info(str(summary)) + try: + summary = NFVBenchSummarizer(results['result'], self.fluent_logger) + LOG.info(str(summary)) + except KeyError: + # in case of error, 'result' might be missing + if 'error_message' in results: + LOG.error(results['error_message']) + else: + LOG.error('REST request completed without results or error message') Ctx.release() if self.fluent_logger: self.fluent_logger.send_run_summary(True) diff --git a/nfvbench/service_chain.py b/nfvbench/service_chain.py index 216cc92..7ec1511 100644 --- a/nfvbench/service_chain.py +++ b/nfvbench/service_chain.py @@ -57,14 +57,14 @@ class ServiceChain(object): for vlan, device in zip(vlans, self.config.generator_config.devices): self.stats_manager.set_vlan_tag(device, vlan) - def __get_result_per_frame_size(self, frame_size, bidirectional): + def __get_result_per_frame_size(self, frame_size, actual_frame_size, bidirectional): start_time = time.time() traffic_result = { frame_size: {} } result = {} if not self.config.no_traffic: - self.clients['traffic'].set_traffic(frame_size, bidirectional) + self.clients['traffic'].set_traffic(actual_frame_size, bidirectional) if self.config.single_run: result = self.stats_manager.run() @@ -73,6 +73,9 @@ class ServiceChain(object): for dr in ['pdr', 'ndr']: if dr in results: + if frame_size != actual_frame_size: + results[dr]['l2frame_size'] = frame_size + results[dr]['actual_l2frame_size'] = actual_frame_size traffic_result[frame_size][dr] = results[dr] if 'warning' in results[dr]['stats'] and results[dr]['stats']['warning']: traffic_result['warning'] = results[dr]['stats']['warning'] @@ -83,6 +86,8 @@ class ServiceChain(object): result['run_config'] = self.clients['traffic'].get_run_config(result) required = result['run_config']['direction-total']['orig']['rate_pps'] actual = result['stats']['total_tx_rate'] + if frame_size != actual_frame_size: + result['actual_l2frame_size'] = actual_frame_size warning = self.clients['traffic'].compare_tx_rates(required, actual) if warning is not None: result['run_config']['warning'] = warning @@ -92,8 +97,10 @@ class ServiceChain(object): def __get_chain_result(self): result = OrderedDict() - for fs in self.config.frame_sizes: - result.update(self.__get_result_per_frame_size(fs, self.config.traffic.bidirectional)) + for fs, actual_fs in zip(self.config.frame_sizes, self.config.actual_frame_sizes): + result.update(self.__get_result_per_frame_size(fs, + actual_fs, + self.config.traffic.bidirectional)) chain_result = { 'flow_count': self.config.flow_count, diff --git a/nfvbench/summarizer.py b/nfvbench/summarizer.py index 0ff9c48..b27ed6f 100644 --- a/nfvbench/summarizer.py +++ b/nfvbench/summarizer.py @@ -312,11 +312,17 @@ class NFVBenchSummarizer(Summarizer): if 'warning' in entry: continue self.__chain_analysis_summarize(*entry) - self.__record_send() + self.__record_send() def __chain_analysis_summarize(self, frame_size, analysis): self._put() self._put('L2 frame size:', frame_size) + if 'actual_l2frame_size' in analysis: + self._put('Actual l2 frame size:', analysis['actual_l2frame_size']) + elif self.config['ndr_run'] and 'actual_l2frame_size' in analysis['ndr']: + self._put('Actual l2 frame size:', analysis['ndr']['actual_l2frame_size']) + elif self.config['pdr_run'] and 'actual_l2frame_size' in analysis['pdr']: + self._put('Actual l2 frame size:', analysis['pdr']['actual_l2frame_size']) if 'analysis_duration_sec' in analysis: self._put('Chain analysis duration:', Formatter.float(3)(analysis['analysis_duration_sec']), 'seconds') diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py index 57141be..ef68fe5 100755 --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -35,10 +35,14 @@ from utils import cast_integer class TrafficClientException(Exception): + """Generic traffic client exception.""" + pass class TrafficRunner(object): + """Serialize various steps required to run traffic.""" + def __init__(self, client, duration_sec, interval_sec=0): self.client = client self.start_time = None @@ -89,6 +93,8 @@ class TrafficRunner(object): class IpBlock(object): + """Manage a block of IP addresses.""" + def __init__(self, base_ip, step_ip, count_ip): self.base_ip_int = Device.ip_to_int(base_ip) self.step = Device.ip_to_int(step_ip) @@ -96,15 +102,13 @@ class IpBlock(object): self.next_free = 0 def get_ip(self, index=0): - '''Return the IP address at given index - ''' + """Return the IP address at given index.""" if index < 0 or index >= self.max_available: raise IndexError('Index out of bounds') return Device.int_to_ip(self.base_ip_int + index * self.step) def reserve_ip_range(self, count): - '''Reserve a range of count consecutive IP addresses spaced by step - ''' + """Reserve a range of count consecutive IP addresses spaced by step.""" if self.next_free + count > self.max_available: raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' % (self.next_free, @@ -120,6 +124,8 @@ class IpBlock(object): class Device(object): + """Represent a port device and all information associated to it.""" + def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None, gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None, gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None, @@ -158,6 +164,7 @@ class Device(object): if mac is None: raise TrafficClientException('Trying to set traffic generator MAC address as None') self.mac = mac + LOG.info("Port %d: src MAC %s", self.port, self.mac) def set_destination(self, dst): self.dst = dst @@ -169,10 +176,10 @@ class Device(object): if self.vlan_tagging and vlan_tag is None: raise TrafficClientException('Trying to set VLAN tag as None') self.vlan_tag = vlan_tag + LOG.info("Port %d: VLAN %d", self.port, self.vlan_tag) def get_gw_ip(self, chain_index): - '''Retrieve the IP address assigned for the gateway of a given chain - ''' + """Retrieve the IP address assigned for the gateway of a given chain.""" return self.gw_ip_block.get_ip(chain_index) def get_stream_configs(self, service_chain): @@ -222,8 +229,7 @@ class Device(object): return configs def ip_range_overlaps(self): - '''Check if this device ip range is overlapping with the dst device ip range - ''' + """Check if this device ip range is overlapping with the dst device ip range.""" src_base_ip = Device.ip_to_int(self.ip) dst_base_ip = Device.ip_to_int(self.dst.ip) src_last_ip = src_base_ip + self.flow_count - 1 @@ -367,6 +373,8 @@ class RunningTrafficProfile(object): class TrafficGeneratorFactory(object): + """Factory class to generate a traffic generator.""" + def __init__(self, config): self.config = config @@ -406,6 +414,8 @@ class TrafficGeneratorFactory(object): class TrafficClient(object): + """Traffic generator client.""" + PORTS = [0, 1] def __init__(self, config, notifier=None, skip_sleep=False): @@ -449,55 +459,56 @@ class TrafficClient(object): return self.gen.get_version() def ensure_end_to_end(self): - """ - Ensure traffic generator receives packets it has transmitted. + """Ensure traffic generator receives packets it has transmitted. + This ensures end to end connectivity and also waits until VMs are ready to forward packets. - At this point all VMs are in active state, but forwarding does not have to work. - Small amount of traffic is sent to every chain. Then total of sent and received packets - is compared. If ratio between received and transmitted packets is higher than (N-1)/N, - N being number of chains, traffic flows through every chain and real measurements can be - performed. + VMs that are started and in active state may not pass traffic yet. It is imperative to make + sure that all VMs are passing traffic in both directions before starting any benchmarking. + To verify this, we need to send at a low frequency bi-directional packets and make sure + that we receive all packets back from all VMs. The number of flows is equal to 2 times + the number of chains (1 per direction) and we need to make sure we receive packets coming + from exactly 2 x chain count different source MAC addresses. Example: PVP chain (1 VM per chain) N = 10 (number of chains) - threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions) - if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning - all 10 VMs are in operational state. + Flow count = 20 (number of flows) + If the number of unique source MAC addresses from received packets is 20 then + all 10 VMs 10 VMs are in operational state. """ LOG.info('Starting traffic generator to ensure end-to-end connectivity') - rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)} + rate_pps = {'rate_pps': str(self.config.service_chain_count * 1)} self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) # ensures enough traffic is coming back - threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count) retry_count = (self.config.check_traffic_time_sec + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec + mac_addresses = set() + ln = 0 + # in case of l2-loopback, we will only have 2 unique src MAC regardless of the + # number of chains configured because there are no VM involved + # otherwise, we expect to see packets coming from 2 unique MAC per chain + unique_src_mac_count = 2 if self.config.l2_loopback else self.config.service_chain_count * 2 for it in xrange(retry_count): self.gen.clear_stats() self.gen.start_traffic() + self.gen.start_capture() LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count) if not self.skip_sleep: time.sleep(self.config.generic_poll_sec) self.gen.stop_traffic() - stats = self.gen.get_stats() - - # compute total sent and received traffic on both ports - total_rx = 0 - total_tx = 0 - for port in self.PORTS: - total_rx += float(stats[port]['rx'].get('total_pkts', 0)) - total_tx += float(stats[port]['tx'].get('total_pkts', 0)) - - # how much of traffic came back - ratio = total_rx / total_tx if total_tx else 0 - - if ratio > threshold: - self.gen.clear_stats() - self.gen.clear_streamblock() - LOG.info('End-to-end connectivity ensured') - return + self.gen.fetch_capture_packets() + self.gen.stop_capture() + + for packet in self.gen.packet_list: + mac_addresses.add(packet['binary'][6:12]) + if ln != len(mac_addresses): + ln = len(mac_addresses) + LOG.info('Received unique source MAC %d / %d', ln, unique_src_mac_count) + if len(mac_addresses) == unique_src_mac_count: + LOG.info('End-to-end connectivity ensured') + return if not self.skip_sleep: time.sleep(self.config.generic_poll_sec) @@ -518,6 +529,10 @@ class TrafficClient(object): unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps) if unidir_reverse_pps > 0: self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)}) + # Fix for [NFVBENCH-67], convert the rate string to PPS + for idx, rate in enumerate(self.run_config['rates']): + if 'rate_pps' not in rate: + self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']} self.gen.clear_streamblock() self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) @@ -660,7 +675,7 @@ class TrafficClient(object): results[tag]['timestamp_sec'] = time.time() def __range_search(self, left, right, targets, results): - '''Perform a binary search for a list of targets inside a [left..right] range or rate + """Perform a binary search for a list of targets inside a [left..right] range or rate. left the left side of the range to search as a % the line rate (100 = 100% line rate) indicating the rate to send on each interface @@ -669,7 +684,7 @@ class TrafficClient(object): targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr') results a dict to store results - ''' + """ if not targets: return LOG.info('Range search [%s .. %s] targets: %s', left, right, targets) @@ -739,6 +754,7 @@ class TrafficClient(object): time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec'] if time_elapsed_ratio >= 1: self.cancel_traffic() + time.sleep(self.config.pause_sec) self.interval_collector.reset() # get stats from the run @@ -785,13 +801,11 @@ class TrafficClient(object): def cancel_traffic(self): self.runner.stop() - def get_interface(self, port_index): + def get_interface(self, port_index, stats): port = self.gen.port_handle[port_index] tx, rx = 0, 0 - if not self.config.no_traffic: - stats = self.get_stats() - if port in stats: - tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) + if stats and port in stats: + tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) return Interface('traffic-generator', self.tool.lower(), tx, rx) def get_traffic_config(self): @@ -820,11 +834,13 @@ class TrafficClient(object): return config def get_run_config(self, results): - """Returns configuration which was used for the last run.""" + """Return configuration which was used for the last run.""" r = {} + # because we want each direction to have the far end RX rates, + # use the far end index (1-idx) to retrieve the RX rates for idx, key in enumerate(["direction-forward", "direction-reverse"]): tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec - rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec + rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec r[key] = { "orig": self.__convert_rates(self.run_config['rates'][idx]), "tx": self.__convert_rates({'rate_pps': tx_rate}), @@ -835,7 +851,6 @@ class TrafficClient(object): for direction in ['orig', 'tx', 'rx']: total[direction] = {} for unit in ['rate_percent', 'rate_bps', 'rate_pps']: - total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()]) r['direction-total'] = total diff --git a/nfvbench/traffic_gen/dummy.py b/nfvbench/traffic_gen/dummy.py index 6f57f4d..788a53f 100644 --- a/nfvbench/traffic_gen/dummy.py +++ b/nfvbench/traffic_gen/dummy.py @@ -31,6 +31,11 @@ class DummyTG(AbstractTrafficGenerator): self.duration_sec = self.config.duration_sec self.intf_speed = config.generator_config.intf_speed self.set_response_curve() + self.packet_list = [{ + "binary": "01234567890123456789" + }, { + "binary": "98765432109876543210" + }] def get_version(self): return "0.1" @@ -164,9 +169,18 @@ class DummyTG(AbstractTrafficGenerator): def start_traffic(self): pass + def fetch_capture_packets(self): + pass + def stop_traffic(self): pass + def start_capture(self): + pass + + def stop_capture(self): + pass + def cleanup(self): pass diff --git a/nfvbench/traffic_gen/traffic_base.py b/nfvbench/traffic_gen/traffic_base.py index 817ecc8..81537b3 100644 --- a/nfvbench/traffic_gen/traffic_base.py +++ b/nfvbench/traffic_gen/traffic_base.py @@ -23,19 +23,12 @@ class TrafficGeneratorException(Exception): class AbstractTrafficGenerator(object): - # src_mac (6) + dst_mac (6) + mac_type (2) + frame_check (4) = 18 - l2_header_size = 18 - - imix_l2_sizes = [64, 594, 1518] - imix_l3_sizes = [size - l2_header_size for size in imix_l2_sizes] - imix_ratios = [7, 4, 1] - imix_avg_l2_size = sum( - [1.0 * imix[0] * imix[1] for imix in zip(imix_l2_sizes, imix_ratios)]) / sum(imix_ratios) - - traffic_utils.imix_avg_l2_size = imix_avg_l2_size - def __init__(self, config): self.config = config + self.imix_l2_sizes = [64, 594, 1518] + self.imix_ratios = [7, 4, 1] + self.imix_avg_l2_size = 0 + self.adjust_imix_min_size(64) @abc.abstractmethod def get_version(self): @@ -58,7 +51,7 @@ class AbstractTrafficGenerator(object): return None @abc.abstractmethod - def create_traffic(self): + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): # Must be implemented by sub classes return None @@ -96,3 +89,11 @@ class AbstractTrafficGenerator(object): def cleanup(self): # Must be implemented by sub classes return None + + def adjust_imix_min_size(self, min_size): + # assume the min size is always the first entry + self.imix_l2_sizes[0] = min_size + self.imix_avg_l2_size = sum( + [1.0 * imix[0] * imix[1] for imix in zip(self.imix_l2_sizes, self.imix_ratios)]) / sum( + self.imix_ratios) + traffic_utils.imix_avg_l2_size = self.imix_avg_l2_size diff --git a/nfvbench/traffic_gen/trex.py b/nfvbench/traffic_gen/trex.py index 23faebc..cabf1cb 100644 --- a/nfvbench/traffic_gen/trex.py +++ b/nfvbench/traffic_gen/trex.py @@ -48,6 +48,8 @@ from trex_stl_lib.api import STLVmFlowVarRepetableRandom from trex_stl_lib.api import STLVmWrFlowVar from trex_stl_lib.api import UDP from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP + + # pylint: enable=import-error @@ -64,31 +66,42 @@ class TRex(AbstractTrafficGenerator): self.streamblock = defaultdict(list) self.rates = [] self.arps = {} + self.capture_id = None + self.packet_list = [] def get_version(self): return self.client.get_server_version() def extract_stats(self, in_stats): + """Extract stats from dict returned by Trex API. + + :param in_stats: dict as returned by TRex api + """ utils.nan_replace(in_stats) LOG.debug(in_stats) result = {} + # port_handles should have only 2 elements: [0, 1] + # so (1 - ph) will be the index for the far end port for ph in self.port_handle: - stats = self.__combine_stats(in_stats, ph) + stats = in_stats[ph] + far_end_stats = in_stats[1 - ph] result[ph] = { 'tx': { - 'total_pkts': cast_integer(stats['tx_pkts']['total']), - 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']), - 'pkt_rate': cast_integer(stats['tx_pps']['total']), - 'pkt_bit_rate': cast_integer(stats['tx_bps']['total']) + 'total_pkts': cast_integer(stats['opackets']), + 'total_pkt_bytes': cast_integer(stats['obytes']), + 'pkt_rate': cast_integer(stats['tx_pps']), + 'pkt_bit_rate': cast_integer(stats['tx_bps']) }, 'rx': { - 'total_pkts': cast_integer(stats['rx_pkts']['total']), - 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']), - 'pkt_rate': cast_integer(stats['rx_pps']['total']), - 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']), + 'total_pkts': cast_integer(stats['ipackets']), + 'total_pkt_bytes': cast_integer(stats['ibytes']), + 'pkt_rate': cast_integer(stats['rx_pps']), + 'pkt_bit_rate': cast_integer(stats['rx_bps']), + # how many pkts were dropped in RX direction + # need to take the tx counter on the far end port 'dropped_pkts': cast_integer( - stats['tx_pkts']['total'] - stats['rx_pkts']['total']) + far_end_stats['opackets'] - stats['ipackets']) } } @@ -103,20 +116,6 @@ class TRex(AbstractTrafficGenerator): result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec) return result - def __combine_stats(self, in_stats, port_handle): - """Traverses TRex result dictionary and combines stream stats. Used for combining latency - and regular streams together. - """ - result = defaultdict(lambda: defaultdict(float)) - - for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]: - record = in_stats['flow_stats'][pg_id] - for stat_type, stat_type_values in record.iteritems(): - for ph, value in stat_type_values.iteritems(): - result[stat_type][ph] += value - - return result - def __combine_latencies(self, in_stats, port_handle): """Traverses TRex result dictionary and combines chosen latency stats.""" if not self.latencies[port_handle]: @@ -136,14 +135,16 @@ class TRex(AbstractTrafficGenerator): return result def create_pkt(self, stream_cfg, l2frame_size): - # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP) - payload = 'x' * (max(64, int(l2frame_size)) - 46) pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) - if stream_cfg['vlan_tag'] is not None: + # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP) pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag']) - + l2payload_size = int(l2frame_size) - 50 + else: + # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP) + l2payload_size = int(l2frame_size) - 46 + payload = 'x' * l2payload_size udp_args = {} if stream_cfg['udp_src_port']: udp_args['sport'] = int(stream_cfg['udp_src_port']) @@ -198,6 +199,8 @@ class TRex(AbstractTrafficGenerator): idx_lat = None streams = [] if l2frame == 'IMIX': + min_size = 64 if stream_cfg['vlan_tag'] is None else 68 + self.adjust_imix_min_size(min_size) for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)): pkt = self.create_pkt(stream_cfg, l2_frame_size) streams.append(STLStream(packet=pkt, @@ -208,6 +211,7 @@ class TRex(AbstractTrafficGenerator): if latency: idx_lat = self.id.next() + pkt = self.create_pkt(stream_cfg, self.imix_avg_l2_size) sl = STLStream(packet=pkt, isg=isg, flow_stats=STLFlowLatencyStats(pg_id=idx_lat), @@ -247,7 +251,7 @@ class TRex(AbstractTrafficGenerator): break except Exception as ex: if it == (self.config.generic_retry_count - 1): - raise ex + raise LOG.info("Retrying connection to TRex (%s)...", ex.message) def connect(self): @@ -318,7 +322,7 @@ class TRex(AbstractTrafficGenerator): def __start_server(self): server = TRexTrafficServer() - server.run_server(self.config.generator_config) + server.run_server(self.config.generator_config, self.config.vlan_tagging) def resolve_arp(self): self.client.set_service_mode(ports=self.port_handle) @@ -354,7 +358,7 @@ class TRex(AbstractTrafficGenerator): else: failed = [arp.get_record().dst_ip for arp in arps if arp.get_record().dst_mac is None] - LOG.info('Retrying ARP for: %d (%d / %d)', + LOG.info('Retrying ARP for: %s (%d / %d)', failed, attempt, self.config.generic_retry_count) time.sleep(self.config.generic_poll_sec) @@ -433,7 +437,7 @@ class TRex(AbstractTrafficGenerator): LOG.info('Cleared all existing streams.') def get_stats(self): - stats = self.client.get_pgid_stats() + stats = self.client.get_stats() return self.extract_stats(stats) def get_macs(self): @@ -450,6 +454,31 @@ class TRex(AbstractTrafficGenerator): def stop_traffic(self): self.client.stop(ports=self.port_handle) + def start_capture(self): + """Capture all packets on both ports that are unicast to us.""" + if self.capture_id: + self.stop_capture() + # Need to filter out unwanted packets so we do not end up counting + # src MACs of frames that are not unicast to us + src_mac_list = self.get_macs() + bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1]) + # ports must be set in service in order to enable capture + self.client.set_service_mode(ports=self.port_handle) + self.capture_id = self.client.start_capture(rx_ports=self.port_handle, + bpf_filter=bpf_filter) + + def fetch_capture_packets(self): + if self.capture_id: + self.packet_list = [] + self.client.fetch_capture_packets(capture_id=self.capture_id['id'], + output=self.packet_list) + + def stop_capture(self): + if self.capture_id: + self.client.stop_capture(capture_id=self.capture_id['id']) + self.capture_id = None + self.client.set_service_mode(ports=self.port_handle, enabled=False) + def cleanup(self): if self.client: try: diff --git a/nfvbench/traffic_server.py b/nfvbench/traffic_server.py index fe9edd2..dcb83fb 100644 --- a/nfvbench/traffic_server.py +++ b/nfvbench/traffic_server.py @@ -34,7 +34,7 @@ class TRexTrafficServer(TrafficServer): assert len(contents) == 1 self.trex_dir = os.path.join(trex_base_dir, contents[0]) - def run_server(self, traffic_profile, filename='/etc/trex_cfg.yaml'): + def run_server(self, traffic_profile, vlan_tagging, filename='/etc/trex_cfg.yaml'): """ Runs TRex server for specified traffic profile. @@ -43,10 +43,12 @@ class TRexTrafficServer(TrafficServer): """ cfg = self.__save_config(traffic_profile, filename) cores = traffic_profile.cores - sw_mode = "--software" if traffic_profile.software_mode else "" + sw_mode = "--software" if traffic_profile.generator_config.software_mode else "" + vlan_opt = "--vlan" if vlan_tagging else "" subprocess.Popen(['nohup', '/bin/bash', '-c', './t-rex-64 -i -c {} --iom 0 --no-scapy-server --close-at-end {} ' - '--vlan --cfg {} &> /tmp/trex.log & disown'.format(cores, sw_mode, cfg)], + '{} --cfg {} &> /tmp/trex.log & disown'.format(cores, sw_mode, + vlan_opt, cfg)], cwd=self.trex_dir) LOG.info('TRex server is running...') diff --git a/nfvbench/utils.py b/nfvbench/utils.py index 20dc588..ecbb55a 100644 --- a/nfvbench/utils.py +++ b/nfvbench/utils.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import glob from math import isnan import os import re @@ -91,11 +92,45 @@ def dict_to_json_dict(record): return json.loads(json.dumps(record, default=lambda obj: obj.to_json())) -def get_intel_pci(nic_ports): - """Returns the first two PCI addresses of sorted PCI list for Intel NIC (i40e, ixgbe)""" - hx = r'[0-9a-fA-F]' - regex = r'{hx}{{4}}:({hx}{{2}}:{hx}{{2}}\.{hx}{{1}}).*(drv={driver}|.*unused=.*{driver})' +def get_intel_pci(nic_slot=None, nic_ports=None): + """Returns two PCI address that will be used for NFVbench + + @param nic_slot: The physical PCIe slot number in motherboard + @param nic_ports: Array of two integers indicating the ports to use on the NIC + + When nic_slot and nic_ports are both supplied, the function will just return + the PCI addresses for them. The logic used is: + (1) Run "dmidecode -t slot" + (2) Grep for "SlotID:" with given nic_slot, and derive the bus address; + (3) Based on given nic_ports, generate the pci addresses based on above + base address; + + When either nic_slot or nic_ports is not supplied, the function will + traverse all Intel NICs which use i40e or ixgbe driver, sorted by PCI + address, and return first two available ports which are not bonded + (802.11ad). + """ + if nic_slot and nic_ports: + dmidecode = subprocess.check_output(['dmidecode', '-t', 'slot']) + regex = r"(?<=SlotID:%s).*?(....:..:..\..)" % nic_slot + match = re.search(regex, dmidecode, flags=re.DOTALL) + if not match: + return None + + pcis = [] + # On some servers, the "Bus Address" returned by dmidecode is not the + # base pci address of the NIC. So only keeping the bus part of the + # address for better compability. + bus = match.group(1)[:match.group(1).rindex(':') + 1] + "00." + for port in nic_ports: + pcis.append(bus + str(port)) + + return pcis + + hx = r'[0-9a-fA-F]' + regex = r'({hx}{{4}}:({hx}{{2}}:{hx}{{2}}\.{hx}{{1}})).*(drv={driver}|.*unused=.*{driver})' + pcis = [] try: trex_base_dir = '/opt/trex' contents = os.listdir(trex_base_dir) @@ -110,14 +145,28 @@ def get_intel_pci(nic_ports): for driver in ['i40e', 'ixgbe']: matches = re.findall(regex.format(hx=hx, driver=driver), devices) - if matches: - pcis = [x[0] for x in matches] - if len(pcis) < 2: - continue - pcis.sort() - return [pcis[port_index] for port_index in nic_ports] - - return [] + if not matches: + continue + + matches.sort() + for port in matches: + intf_name = glob.glob("/sys/bus/pci/devices/%s/net/*" % port[0]) + if not intf_name: + # Interface is not bind to kernel driver, so take it + pcis.append(port[1]) + else: + intf_name = intf_name[0][intf_name[0].rfind('/') + 1:] + process = subprocess.Popen(['ip', '-o', '-d', 'link', 'show', intf_name], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + intf_info, _ = process.communicate() + if not re.search('team_slave|bond_slave', intf_info): + pcis.append(port[1]) + + if len(pcis) == 2: + break + + return pcis multiplier_map = { |