aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/network_services/vnf_generic
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/network_services/vnf_generic')
-rw-r--r--yardstick/network_services/vnf_generic/vnf/base.py234
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py994
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_ping.py155
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py303
-rw-r--r--yardstick/network_services/vnf_generic/vnf/tg_trex.py345
-rw-r--r--yardstick/network_services/vnf_generic/vnf/vpe_vnf.py545
-rw-r--r--yardstick/network_services/vnf_generic/vnfdgen.py64
7 files changed, 1570 insertions, 1070 deletions
diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py
index 2df6037f3..955f9f03d 100644
--- a/yardstick/network_services/vnf_generic/vnf/base.py
+++ b/yardstick/network_services/vnf_generic/vnf/base.py
@@ -15,10 +15,6 @@
from __future__ import absolute_import
import logging
-import ipaddress
-import six
-
-from yardstick.network_services.utils import get_nsb_option
LOG = logging.getLogger(__name__)
@@ -61,192 +57,69 @@ class QueueFileWrapper(object):
self.q_out.get()
-class GenericVNF(object):
+class VnfdHelper(dict):
+
+ @property
+ def mgmt_interface(self):
+ return self["mgmt-interface"]
+
+ @property
+ def vdu(self):
+ return self['vdu']
+
+ @property
+ def vdu0(self):
+ return self.vdu[0]
+
+ @property
+ def interfaces(self):
+ return self.vdu0['external-interface']
+
+ @property
+ def kpi(self):
+ return self['benchmark']['kpi']
+
+ def find_virtual_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ virtual_intf = interface["virtual-interface"]
+ if virtual_intf[key] == value:
+ return interface
+
+ def find_interface(self, **kwargs):
+ key, value = next(iter(kwargs.items()))
+ for interface in self.interfaces:
+ if interface[key] == value:
+ return interface
+
+
+class VNFObject(object):
+
+ def __init__(self, name, vnfd):
+ super(VNFObject, self).__init__()
+ self.name = name
+ self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
+
+
+class GenericVNF(VNFObject):
+
""" Class providing file-like API for generic VNF implementation """
- def __init__(self, vnfd):
- super(GenericVNF, self).__init__()
- self.vnfd = vnfd # fixme: parse this into a structure
+ def __init__(self, name, vnfd):
+ super(GenericVNF, self).__init__(name, vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
- self.kpi = self._get_kpi_definition(vnfd)
+ self.kpi = self._get_kpi_definition()
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- self.name = "vnf__1" # name in topology file
- self.bin_path = get_nsb_option("bin_path", "")
- @classmethod
- def _get_kpi_definition(cls, vnfd):
+ def _get_kpi_definition(self):
""" Get list of KPIs defined in VNFD
:param vnfd:
:return: list of KPIs, e.g. ['throughput', 'latency']
"""
- return vnfd['benchmark']['kpi']
-
- @classmethod
- def get_ip_version(cls, ip_addr):
- """ get ip address version v6 or v4 """
- try:
- address = ipaddress.ip_address(six.text_type(ip_addr))
- except ValueError:
- LOG.error(ip_addr, " is not valid")
- return
- else:
- return address.version
-
- def _ip_to_hex(self, ip_addr):
- ip_x = ip_addr
- if self.get_ip_version(ip_addr) == 4:
- ip_to_convert = ip_addr.split(".")
- ip_octect = [int(octect) for octect in ip_to_convert]
- ip_x = "{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}".format(ip_octect)
- return ip_x
-
- def _get_dpdk_port_num(self, name):
- for intf in self.vnfd['vdu'][0]['external-interface']:
- if name == intf['name']:
- return intf['virtual-interface']['dpdk_port_num']
-
- def _append_routes(self, ip_pipeline_cfg):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- where = ip_pipeline_cfg.find("arp_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_add = ip_pipeline_cfg[where:]
-
- tmp = route_add.find('\n')
- route_add = route_add[tmp:]
-
- cmds = "arp_route_tbl ="
-
- for route in routing_table:
- net = self._ip_to_hex(route['network'])
- net_nm = self._ip_to_hex(route['netmask'])
- net_gw = self._ip_to_hex(route['gateway'])
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_add
-
- return ip_pipeline_cfg
-
- def _append_nd_routes(self, ip_pipeline_cfg):
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- where = ip_pipeline_cfg.find("nd_route_tbl")
- link = ip_pipeline_cfg[:where]
- route_nd = ip_pipeline_cfg[where:]
-
- tmp = route_nd.find('\n')
- route_nd = route_nd[tmp:]
-
- cmds = "nd_route_tbl ="
-
- for route in routing_table:
- net = route['network']
- net_nm = route['netmask']
- net_gw = route['gateway']
- port = self._get_dpdk_port_num(route['if'])
- cmd = \
- " ({port0_local_ip_hex},{port0_netmask_hex},{dpdk_port},"\
- "{port1_local_ip_hex})".format(port0_local_ip_hex=net,
- port0_netmask_hex=net_nm,
- dpdk_port=port,
- port1_local_ip_hex=net_gw)
- cmds += cmd
-
- cmds += '\n'
- ip_pipeline_cfg = link + cmds + route_nd
-
- return ip_pipeline_cfg
-
- def _get_port0localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0localip6 : %s", return_value)
- return return_value
-
- def _get_port1localip6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1localip6 : %s", return_value)
- return return_value
-
- def _get_port0prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['netmask']
- LOG.info("_get_port0prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port1prefixlen6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['netmask']
- LOG.info("_get_port1prefixlen6 : %s", return_value)
- return return_value
-
- def _get_port0gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 1:
- return_value = route['network']
- LOG.info("_get_port0gateway6 : %s", return_value)
- return return_value
-
- def _get_port1gateway6(self):
- return_value = ""
- if 'nd_route_tbl' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['nd_route_tbl']
-
- inc = 0
- for route in routing_table:
- inc += 1
- if inc == 2:
- return_value = route['network']
- LOG.info("_get_port1gateway6 : %s", return_value)
- return return_value
+ return self.vnfd_helper.kpi
def instantiate(self, scenario_cfg, context_cfg):
""" Prepare VNF for operation and start the VNF process/VM
@@ -284,11 +157,10 @@ class GenericVNF(object):
class GenericTrafficGen(GenericVNF):
""" Class providing file-like API for generic traffic generator """
- def __init__(self, vnfd):
- super(GenericTrafficGen, self).__init__(vnfd)
+ def __init__(self, name, vnfd):
+ super(GenericTrafficGen, self).__init__(name, vnfd)
self.runs_traffic = True
self.traffic_finished = False
- self.name = "tgen__1" # name in topology file
def run_traffic(self, traffic_profile):
""" Generate traffic on the wire according to the given params.
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
new file mode 100644
index 000000000..89c086d97
--- /dev/null
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -0,0 +1,994 @@
+# Copyright (c) 2016-2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" Base class implementation for generic vnf implementation """
+
+from __future__ import absolute_import
+
+import posixpath
+import time
+import logging
+import os
+import re
+import subprocess
+from collections import Mapping
+
+from multiprocessing import Queue, Value, Process
+
+from six.moves import cStringIO
+
+from yardstick.benchmark.contexts.base import Context
+from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.network_services.helpers.cpu import CpuSysCores
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
+from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
+from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.network_services.utils import get_nsb_option
+
+from stl.trex_stl_lib.trex_stl_client import STLClient
+from stl.trex_stl_lib.trex_stl_client import LoggerApi
+from stl.trex_stl_lib.trex_stl_exceptions import STLError, STLStateError
+
+from yardstick.ssh import AutoConnectSSH
+
+DPDK_VERSION = "dpdk-16.07"
+
+LOG = logging.getLogger(__name__)
+
+
+REMOTE_TMP = "/tmp"
+
+
+class VnfSshHelper(AutoConnectSSH):
+
+ def __init__(self, node, bin_path, wait=None):
+ self.node = node
+ kwargs = self.args_from_node(self.node)
+ if wait:
+ kwargs.setdefault('wait', wait)
+
+ super(VnfSshHelper, self).__init__(**kwargs)
+ self.bin_path = bin_path
+
+ @staticmethod
+ def get_class():
+ # must return static class name, anything else refers to the calling class
+ # i.e. the subclass, not the superclass
+ return VnfSshHelper
+
+ def copy(self):
+ # this copy constructor is different from SSH classes, since it uses node
+ return self.get_class()(self.node, self.bin_path)
+
+ def upload_config_file(self, prefix, content):
+ cfg_file = os.path.join(REMOTE_TMP, prefix)
+ LOG.debug(content)
+ file_obj = cStringIO(content)
+ self.put_file_obj(file_obj, cfg_file)
+ return cfg_file
+
+ def join_bin_path(self, *args):
+ return os.path.join(self.bin_path, *args)
+
+ def provision_tool(self, tool_path=None, tool_file=None):
+ if tool_path is None:
+ tool_path = self.bin_path
+ return super(VnfSshHelper, self).provision_tool(tool_path, tool_file)
+
+
+class SetupEnvHelper(object):
+
+ CFG_CONFIG = os.path.join(REMOTE_TMP, "sample_config")
+ CFG_SCRIPT = os.path.join(REMOTE_TMP, "sample_script")
+ CORES = []
+ DEFAULT_CONFIG_TPL_CFG = "sample.cfg"
+ PIPELINE_COMMAND = ''
+ VNF_TYPE = "SAMPLE"
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(SetupEnvHelper, self).__init__()
+ self.vnfd_helper = vnfd_helper
+ self.ssh_helper = ssh_helper
+ self.scenario_helper = scenario_helper
+
+ def _get_ports_gateway(self, name):
+ routing_table = self.vnfd_helper.vdu0.get('routing_table', [])
+ for route in routing_table:
+ if name == route['if']:
+ return route['gateway']
+ return None
+
+ def build_config(self):
+ raise NotImplementedError
+
+ def setup_vnf_environment(self):
+ pass
+ # raise NotImplementedError
+
+ def tear_down(self):
+ raise NotImplementedError
+
+
+class DpdkVnfSetupEnvHelper(SetupEnvHelper):
+
+ APP_NAME = 'DpdkVnf'
+ DPDK_BIND_CMD = "sudo {dpdk_nic_bind} {force} -b {driver} {vpci}"
+ DPDK_UNBIND_CMD = "sudo {dpdk_nic_bind} --force -b {driver} {vpci}"
+ FIND_NET_CMD = "find /sys/class/net -lname '*{}*' -printf '%f'"
+
+ HW_DEFAULT_CORE = 3
+ SW_DEFAULT_CORE = 2
+
+ DPDK_STATUS_DRIVER_RE = re.compile(r"(\d{2}:\d{2}\.\d).*drv=([-\w]+)")
+
+ @staticmethod
+ def _update_packet_type(ip_pipeline_cfg, traffic_options):
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = {0}'.format(traffic_options['pkt_type'])
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ @classmethod
+ def _update_traffic_type(cls, ip_pipeline_cfg, traffic_options):
+ traffic_type = traffic_options['traffic_type']
+
+ if traffic_options['vnf_type'] is not cls.APP_NAME:
+ match_str = 'traffic_type = 4'
+ replace_str = 'traffic_type = {0}'.format(traffic_type)
+
+ elif traffic_type == 4:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv4'
+
+ else:
+ match_str = 'pkt_type = ipv4'
+ replace_str = 'pkt_type = ipv6'
+
+ pipeline_config_str = ip_pipeline_cfg.replace(match_str, replace_str)
+ return pipeline_config_str
+
+ def __init__(self, vnfd_helper, ssh_helper, scenario_helper):
+ super(DpdkVnfSetupEnvHelper, self).__init__(vnfd_helper, ssh_helper, scenario_helper)
+ self.all_ports = None
+ self.bound_pci = None
+ self._dpdk_nic_bind = None
+ self.socket = None
+
+ @property
+ def dpdk_nic_bind(self):
+ if self._dpdk_nic_bind is None:
+ self._dpdk_nic_bind = self.ssh_helper.provision_tool(tool_file="dpdk-devbind.py")
+ return self._dpdk_nic_bind
+
+ def _setup_hugepages(self):
+ cmd = "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo"
+ hugepages = self.ssh_helper.execute(cmd)[1].rstrip()
+
+ memory_path = \
+ '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
+ self.ssh_helper.execute("awk -F: '{ print $1 }' < %s" % memory_path)
+
+ if hugepages == "2048kB":
+ pages = 16384
+ else:
+ pages = 16
+
+ self.ssh_helper.execute("echo %s | sudo tee %s" % (pages, memory_path))
+
+ def _get_dpdk_port_num(self, name):
+ interface = self.vnfd_helper.find_interface(name=name)
+ return interface['virtual-interface']['dpdk_port_num']
+
+ def build_config(self):
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ task_path = self.scenario_helper.task_path
+
+ lb_count = vnf_cfg.get('lb_count', 3)
+ lb_config = vnf_cfg.get('lb_config', 'SW')
+ worker_config = vnf_cfg.get('worker_config', '1C/1T')
+ worker_threads = vnf_cfg.get('worker_threads', 3)
+
+ traffic_type = self.scenario_helper.all_options.get('traffic_type', 4)
+ traffic_options = {
+ 'traffic_type': traffic_type,
+ 'pkt_type': 'ipv%s' % traffic_type,
+ 'vnf_type': self.VNF_TYPE,
+ }
+
+ config_tpl_cfg = find_relative_file(self.DEFAULT_CONFIG_TPL_CFG, task_path)
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ multiport = MultiPortConfig(self.scenario_helper.topology,
+ config_tpl_cfg,
+ config_basename,
+ self.vnfd_helper.interfaces,
+ self.VNF_TYPE,
+ lb_count,
+ worker_threads,
+ worker_config,
+ lb_config,
+ self.socket)
+
+ multiport.generate_config()
+ with open(self.CFG_CONFIG) as handle:
+ new_config = handle.read()
+
+ new_config = self._update_traffic_type(new_config, traffic_options)
+ new_config = self._update_packet_type(new_config, traffic_options)
+
+ self.ssh_helper.upload_config_file(config_basename, new_config)
+ self.ssh_helper.upload_config_file(script_basename,
+ multiport.generate_script(self.vnfd_helper))
+ self.all_ports = multiport.port_pair_list
+
+ LOG.info("Provision and start the %s", self.APP_NAME)
+ self._build_pipeline_kwargs()
+ return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
+
+ def _build_pipeline_kwargs(self):
+ tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
+ ports_len_hex = hex(2 ** (len(self.all_ports) + 1) - 1)
+ self.pipeline_kwargs = {
+ 'cfg_file': self.CFG_CONFIG,
+ 'script': self.CFG_SCRIPT,
+ 'ports_len_hex': ports_len_hex,
+ 'tool_path': tool_path,
+ }
+
+ def _get_app_cpu(self):
+ if self.CORES:
+ return self.CORES
+
+ vnf_cfg = self.scenario_helper.vnf_cfg
+ sys_obj = CpuSysCores(self.ssh_helper)
+ self.sys_cpu = sys_obj.get_core_socket()
+ num_core = int(vnf_cfg["worker_threads"])
+ if vnf_cfg.get("lb_config", "SW") == 'HW':
+ num_core += self.HW_DEFAULT_CORE
+ else:
+ num_core += self.SW_DEFAULT_CORE
+ app_cpu = self.sys_cpu[str(self.socket)][:num_core]
+ return app_cpu
+
+ def _get_cpu_sibling_list(self, cores=None):
+ if cores is None:
+ cores = self._get_app_cpu()
+ sys_cmd_template = "%s/cpu%s/topology/thread_siblings_list"
+ awk_template = "awk -F: '{ print $1 }' < %s"
+ sys_path = "/sys/devices/system/cpu/"
+ cpu_topology = []
+ try:
+ for core in cores:
+ sys_cmd = sys_cmd_template % (sys_path, core)
+ cpu_id = self.ssh_helper.execute(awk_template % sys_cmd)[1]
+ cpu_topology.extend(cpu.strip() for cpu in cpu_id.split(','))
+
+ return cpu_topology
+ except Exception:
+ return []
+
+ def _validate_cpu_cfg(self):
+ return self._get_cpu_sibling_list()
+
+ def _find_used_drivers(self):
+ cmd = "{0} -s".format(self.dpdk_nic_bind)
+ rc, dpdk_status, _ = self.ssh_helper.execute(cmd)
+
+ self.used_drivers = {
+ vpci: (index, driver)
+ for index, (vpci, driver)
+ in enumerate(self.DPDK_STATUS_DRIVER_RE.findall(dpdk_status))
+ if any(b.endswith(vpci) for b in self.bound_pci)
+ }
+
+ def setup_vnf_environment(self):
+ self._setup_dpdk()
+ resource = self._setup_resources()
+ self._kill_vnf()
+ self._detect_drivers()
+ return resource
+
+ def _kill_vnf(self):
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+
+ def _setup_dpdk(self):
+ """ setup dpdk environment needed for vnf to run """
+
+ self._setup_hugepages()
+ self.ssh_helper.execute("sudo modprobe uio && sudo modprobe igb_uio")
+
+ exit_status = self.ssh_helper.execute("lsmod | grep -i igb_uio")[0]
+ if exit_status == 0:
+ return
+
+ dpdk = self.ssh_helper.join_bin_path(DPDK_VERSION)
+ dpdk_setup = self.ssh_helper.provision_tool(tool_file="nsb_setup.sh")
+ exit_status = self.ssh_helper.execute("which {} >/dev/null 2>&1".format(dpdk))[0]
+ if exit_status != 0:
+ self.ssh_helper.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+
+ def _setup_resources(self):
+ interfaces = self.vnfd_helper.interfaces
+ self.bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
+
+ # what is this magic? how do we know which socket is for which port?
+ # what about quad-socket?
+ if any(v[5] == "0" for v in self.bound_pci):
+ self.socket = 0
+ else:
+ self.socket = 1
+
+ cores = self._validate_cpu_cfg()
+ return ResourceProfile(self.vnfd_helper, cores)
+
+ def _detect_drivers(self):
+ interfaces = self.vnfd_helper.interfaces
+
+ self._find_used_drivers()
+ for vpci, (index, _) in self.used_drivers.items():
+ try:
+ intf1 = next(v for v in interfaces if vpci == v['virtual-interface']['vpci'])
+ except StopIteration:
+ pass
+ else:
+ intf1['dpdk_port_num'] = index
+
+ for vpci in self.bound_pci:
+ self._bind_dpdk('igb_uio', vpci)
+ time.sleep(2)
+
+ def _bind_dpdk(self, driver, vpci, force=True):
+ if force:
+ force = '--force '
+ else:
+ force = ''
+ cmd = self.DPDK_BIND_CMD.format(force=force,
+ dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci)
+ self.ssh_helper.execute(cmd)
+
+ def _detect_and_bind_dpdk(self, vpci, driver):
+ find_net_cmd = self.FIND_NET_CMD.format(vpci)
+ exit_status, _, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status == 0:
+ # already bound
+ return None
+ self._bind_dpdk(driver, vpci)
+ exit_status, stdout, _ = self.ssh_helper.execute(find_net_cmd)
+ if exit_status != 0:
+ # failed to bind
+ return None
+ return stdout
+
+ def _bind_kernel_devices(self):
+ for intf in self.vnfd_helper.interfaces:
+ vi = intf["virtual-interface"]
+ stdout = self._detect_and_bind_dpdk(vi["vpci"], vi["driver"])
+ if stdout is not None:
+ vi["local_iface_name"] = posixpath.basename(stdout)
+
+ def tear_down(self):
+ for vpci, (_, driver) in self.used_drivers.items():
+ self.ssh_helper.execute(self.DPDK_UNBIND_CMD.format(dpdk_nic_bind=self.dpdk_nic_bind,
+ driver=driver,
+ vpci=vpci))
+
+
+class ResourceHelper(object):
+
+ COLLECT_KPI = ''
+ MAKE_INSTALL = 'cd {0} && make && sudo make install'
+ RESOURCE_WORD = 'sample'
+
+ COLLECT_MAP = {}
+
+ def __init__(self, setup_helper):
+ super(ResourceHelper, self).__init__()
+ self.resource = None
+ self.setup_helper = setup_helper
+ self.ssh_helper = setup_helper.ssh_helper
+
+ def setup(self):
+ self.resource = self.setup_helper.setup_vnf_environment()
+
+ def generate_cfg(self):
+ pass
+
+ def _collect_resource_kpi(self):
+ result = {}
+ status = self.resource.check_if_sa_running("collectd")[0]
+ if status:
+ result = self.resource.amqp_collect_nfvi_kpi()
+
+ result = {"core": result}
+ return result
+
+ def start_collect(self):
+ self.resource.initiate_systemagent(self.ssh_helper.bin_path)
+ self.resource.start()
+ self.resource.amqp_process_for_nfvi_kpi()
+
+ def stop_collect(self):
+ if self.resource:
+ self.resource.stop()
+
+ def collect_kpi(self):
+ return self._collect_resource_kpi()
+
+
+class ClientResourceHelper(ResourceHelper):
+
+ RUN_DURATION = 60
+ QUEUE_WAIT_TIME = 5
+ SYNC_PORT = 1
+ ASYNC_PORT = 2
+
+ def __init__(self, setup_helper):
+ super(ClientResourceHelper, self).__init__(setup_helper)
+ self.vnfd_helper = setup_helper.vnfd_helper
+ self.scenario_helper = setup_helper.scenario_helper
+
+ self.client = None
+ self.client_started = Value('i', 0)
+ self.my_ports = None
+ self._queue = Queue()
+ self._result = {}
+ self._terminated = Value('i', 0)
+ self._vpci_ascending = None
+
+ def _build_ports(self):
+ self.my_ports = [0, 1]
+
+ def get_stats(self, *args, **kwargs):
+ try:
+ return self.client.get_stats(*args, **kwargs)
+ except STLStateError:
+ LOG.exception("TRex client not connected")
+ return {}
+
+ def generate_samples(self, key=None, default=None):
+ last_result = self.get_stats(self.my_ports)
+ key_value = last_result.get(key, default)
+
+ if not isinstance(last_result, Mapping): # added for mock unit test
+ self._terminated.value = 1
+ return {}
+
+ samples = {}
+ for vpci_idx, vpci in enumerate(self._vpci_ascending):
+ name = self.vnfd_helper.find_virtual_interface(vpci=vpci)["name"]
+ # fixme: VNFDs KPIs values needs to be mapped to TRex structure
+ xe_value = last_result.get(vpci_idx, {})
+ samples[name] = {
+ "rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
+ "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
+ "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
+ "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
+ "in_packets": int(xe_value.get("ipackets", 0)),
+ "out_packets": int(xe_value.get("opackets", 0)),
+ }
+ if key:
+ samples[name][key] = key_value
+ return samples
+
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ samples = self.generate_samples()
+ time.sleep(self.QUEUE_WAIT_TIME)
+ self._queue.put(samples)
+
+ def run_traffic(self, traffic_profile):
+ # fixme: fix passing correct trex config file,
+ # instead of searching the default path
+ self._build_ports()
+ self.client = self._connect()
+ self.client.reset(ports=self.my_ports)
+ self.client.remove_all_streams(self.my_ports) # remove all streams
+ traffic_profile.register_generator(self)
+
+ while self._terminated.value == 0:
+ self._run_traffic_once(traffic_profile)
+
+ self.client.stop(self.my_ports)
+ self.client.disconnect()
+ self._terminated.value = 0
+
+ def terminate(self):
+ self._terminated.value = 1 # stop client
+
+ def clear_stats(self, ports=None):
+ if ports is None:
+ ports = self.my_ports
+ self.client.clear_stats(ports=ports)
+
+ def start(self, ports=None, *args, **kwargs):
+ if ports is None:
+ ports = self.my_ports
+ self.client.start(ports=ports, *args, **kwargs)
+
+ def collect_kpi(self):
+ if not self._queue.empty():
+ kpi = self._queue.get()
+ self._result.update(kpi)
+ LOG.debug("Collect {0} KPIs {1}".format(self.RESOURCE_WORD, self._result))
+ return self._result
+
+ def _connect(self, client=None):
+ if client is None:
+ client = STLClient(username=self.vnfd_helper.mgmt_interface["user"],
+ server=self.vnfd_helper.mgmt_interface["ip"],
+ verbose_level=LoggerApi.VERBOSE_QUIET)
+
+ # try to connect with 5s intervals, 30s max
+ for idx in range(6):
+ try:
+ client.connect()
+ break
+ except STLError:
+ LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
+ time.sleep(5)
+ return client
+
+
+class Rfc2544ResourceHelper(object):
+
+ DEFAULT_CORRELATED_TRAFFIC = False
+ DEFAULT_LATENCY = False
+ DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+
+ def __init__(self, scenario_helper):
+ super(Rfc2544ResourceHelper, self).__init__()
+ self.scenario_helper = scenario_helper
+ self._correlated_traffic = None
+ self.iteration = Value('i', 0)
+ self._latency = None
+ self._rfc2544 = None
+ self._tolerance_low = None
+ self._tolerance_high = None
+
+ @property
+ def rfc2544(self):
+ if self._rfc2544 is None:
+ self._rfc2544 = self.scenario_helper.all_options['rfc2544']
+ return self._rfc2544
+
+ @property
+ def tolerance_low(self):
+ if self._tolerance_low is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_low
+
+ @property
+ def tolerance_high(self):
+ if self._tolerance_high is None:
+ self.get_rfc_tolerance()
+ return self._tolerance_high
+
+ @property
+ def correlated_traffic(self):
+ if self._correlated_traffic is None:
+ self._correlated_traffic = \
+ self.get_rfc2544('correlated_traffic', self.DEFAULT_CORRELATED_TRAFFIC)
+
+ return self._correlated_traffic
+
+ @property
+ def latency(self):
+ if self._latency is None:
+ self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
+ return self._latency
+
+ def get_rfc2544(self, name, default=None):
+ return self.rfc2544.get(name, default)
+
+ def get_rfc_tolerance(self):
+ tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
+ tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
+ self._tolerance_low = next(tolerance_iter)
+ self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+
+
+class SampleVNFDeployHelper(object):
+
+ SAMPLE_VNF_REPO = 'https://gerrit.opnfv.org/gerrit/samplevnf'
+ REPO_NAME = posixpath.basename(SAMPLE_VNF_REPO)
+ SAMPLE_REPO_DIR = os.path.join('~/', REPO_NAME)
+
+ def __init__(self, vnfd_helper, ssh_helper):
+ super(SampleVNFDeployHelper, self).__init__()
+ self.ssh_helper = ssh_helper
+ self.vnfd_helper = vnfd_helper
+
+ DISABLE_DEPLOY = True
+
+ def deploy_vnfs(self, app_name):
+ # temp disable for now
+ if self.DISABLE_DEPLOY:
+ return
+
+ vnf_bin = self.ssh_helper.join_bin_path(app_name)
+ exit_status = self.ssh_helper.execute("which %s" % vnf_bin)[0]
+ if not exit_status:
+ return
+
+ subprocess.check_output(["rm", "-rf", self.REPO_NAME])
+ subprocess.check_output(["git", "clone", self.SAMPLE_VNF_REPO])
+ time.sleep(2)
+ self.ssh_helper.execute("rm -rf %s" % self.SAMPLE_REPO_DIR)
+ self.ssh_helper.put(self.REPO_NAME, self.SAMPLE_REPO_DIR, True)
+
+ build_script = os.path.join(self.SAMPLE_REPO_DIR, 'tools/vnf_build.sh')
+ time.sleep(2)
+ http_proxy = os.environ.get('http_proxy', '')
+ https_proxy = os.environ.get('https_proxy', '')
+ cmd = "sudo -E %s --silent '%s' '%s'" % (build_script, http_proxy, https_proxy)
+ LOG.debug(cmd)
+ self.ssh_helper.execute(cmd)
+ vnf_bin_loc = os.path.join(self.SAMPLE_REPO_DIR, "VNFs", app_name, "build", app_name)
+ self.ssh_helper.execute("sudo mkdir -p %s" % self.ssh_helper.bin_path)
+ self.ssh_helper.execute("sudo cp %s %s" % (vnf_bin_loc, vnf_bin))
+
+
+class ScenarioHelper(object):
+
+ DEFAULT_VNF_CFG = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+
+ def __init__(self, name):
+ self.name = name
+ self.scenario_cfg = None
+
+ @property
+ def task_path(self):
+ return self.scenario_cfg["task_path"]
+
+ @property
+ def nodes(self):
+ return self.scenario_cfg['nodes']
+
+ @property
+ def all_options(self):
+ return self.scenario_cfg["options"]
+
+ @property
+ def options(self):
+ return self.all_options[self.name]
+
+ @property
+ def vnf_cfg(self):
+ return self.options.get('vnf_config', self.DEFAULT_VNF_CFG)
+
+ @property
+ def topology(self):
+ return self.scenario_cfg['topology']
+
+
+class SampleVNF(GenericVNF):
+ """ Class providing file-like API for generic VNF implementation """
+
+ VNF_PROMPT = "pipeline>"
+ WAIT_TIME = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNF, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ self.deploy_helper = SampleVNFDeployHelper(vnfd, self.ssh_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.all_ports = None
+ self.context_cfg = None
+ self.nfvi_context = None
+ self.pipeline_kwargs = {}
+ self.priv_ports = None
+ self.pub_ports = None
+ # TODO(esm): make QueueFileWrapper invert-able so that we
+ # never have to manage the queues
+ self.q_in = Queue()
+ self.q_out = Queue()
+ self.queue_wrapper = None
+ self.run_kwargs = {}
+ self.scenario_cfg = None
+ self.tg_port_pairs = None
+ self.used_drivers = {}
+ self.vnf_port_pairs = None
+ self._vnf_process = None
+
+ def _get_route_data(self, route_index, route_type):
+ route_iter = iter(self.vnfd_helper.vdu0.get('nd_route_tbl', []))
+ for _ in range(route_index):
+ next(route_iter, '')
+ return next(route_iter, {}).get(route_type, '')
+
+ def _get_port0localip6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0localip6 : %s", return_value)
+ return return_value
+
+ def _get_port1localip6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1localip6 : %s", return_value)
+ return return_value
+
+ def _get_port0prefixlen6(self):
+ return_value = self._get_route_data(0, 'netmask')
+ LOG.info("_get_port0prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port1prefixlen6(self):
+ return_value = self._get_route_data(1, 'netmask')
+ LOG.info("_get_port1prefixlen6 : %s", return_value)
+ return return_value
+
+ def _get_port0gateway6(self):
+ return_value = self._get_route_data(0, 'network')
+ LOG.info("_get_port0gateway6 : %s", return_value)
+ return return_value
+
+ def _get_port1gateway6(self):
+ return_value = self._get_route_data(1, 'network')
+ LOG.info("_get_port1gateway6 : %s", return_value)
+ return return_value
+
+ def _start_vnf(self):
+ self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
+ self._vnf_process = Process(target=self._run)
+ self._vnf_process.start()
+
+ def _vnf_up_post(self):
+ pass
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.context_cfg = context_cfg
+ self.nfvi_context = Context.get_context_from_server(self.scenario_helper.nodes[self.name])
+ # self.nfvi_context = None
+
+ self.deploy_helper.deploy_vnfs(self.APP_NAME)
+ self.resource_helper.setup()
+ self._start_vnf()
+
+ def wait_for_instantiate(self):
+ buf = []
+ time.sleep(self.WAIT_TIME) # Give some time for config to load
+ while True:
+ if not self._vnf_process.is_alive():
+ raise RuntimeError("%s VNF process died." % self.APP_NAME)
+
+ # TODO(esm): move to QueueFileWrapper
+ while self.q_out.qsize() > 0:
+ buf.append(self.q_out.get())
+ message = ''.join(buf)
+ if self.VNF_PROMPT in message:
+ LOG.info("%s VNF is up and running.", self.APP_NAME)
+ self._vnf_up_post()
+ self.queue_wrapper.clear()
+ self.resource_helper.start_collect()
+ return self._vnf_process.exitcode
+
+ if "PANIC" in message:
+ raise RuntimeError("Error starting %s VNF." %
+ self.APP_NAME)
+
+ LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+ time.sleep(1)
+
+ def _build_run_kwargs(self):
+ self.run_kwargs = {
+ 'stdin': self.queue_wrapper,
+ 'stdout': self.queue_wrapper,
+ 'keep_stdin_open': True,
+ 'pty': True,
+ }
+
+ def _build_config(self):
+ return self.setup_helper.build_config()
+
+ def _run(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+ cmd = self._build_config()
+ # kill before starting
+ self.ssh_helper.execute("pkill {}".format(self.APP_NAME))
+
+ LOG.debug(cmd)
+ self._build_run_kwargs()
+ self.ssh_helper.run(cmd, **self.run_kwargs)
+
+ def vnf_execute(self, cmd, wait_time=2):
+ """ send cmd to vnf process """
+
+ LOG.info("%s command: %s", self.APP_NAME, cmd)
+ self.q_in.put("{}\r\n".format(cmd))
+ time.sleep(wait_time)
+ output = []
+ while self.q_out.qsize() > 0:
+ output.append(self.q_out.get())
+ return "".join(output)
+
+ def _tear_down(self):
+ pass
+
+ def terminate(self):
+ self.vnf_execute("quit")
+ if self._vnf_process:
+ self._vnf_process.terminate()
+ self.ssh_helper.execute("sudo pkill %s" % self.APP_NAME)
+ self._tear_down()
+ self.resource_helper.stop_collect()
+
+ def get_stats(self, *args, **kwargs):
+ """
+ Method for checking the statistics
+
+ :return:
+ VNF statistics
+ """
+ cmd = 'p {0} stats'.format(self.APP_WORD)
+ out = self.vnf_execute(cmd)
+ return out
+
+ def collect_kpi(self):
+ stats = self.get_stats()
+ m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if m:
+ result = {k: int(m.group(v)) for k, v in self.COLLECT_MAP.items()}
+ result["collect_stats"] = self.resource_helper.collect_kpi()
+ else:
+ result = {
+ "packets_in": 0,
+ "packets_fwd": 0,
+ "packets_dropped": 0,
+ }
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+
+class SampleVNFTrafficGen(GenericTrafficGen):
+ """ Class providing file-like API for generic traffic generator """
+
+ APP_NAME = 'Sample'
+ RUN_WAIT = 1
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+ self.bin_path = get_nsb_option('bin_path', '')
+ self.name = "tgen__1" # name in topology file
+
+ self.scenario_helper = ScenarioHelper(self.name)
+ self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
+
+ if setup_env_helper_type is None:
+ setup_env_helper_type = SetupEnvHelper
+
+ self.setup_helper = setup_env_helper_type(self.vnfd_helper,
+ self.ssh_helper,
+ self.scenario_helper)
+
+ if resource_helper_type is None:
+ resource_helper_type = ClientResourceHelper
+
+ self.resource_helper = resource_helper_type(self.setup_helper)
+
+ self.runs_traffic = True
+ self.traffic_finished = False
+ self.tg_port_pairs = None
+ self._tg_process = None
+ self._traffic_process = None
+
+ def _start_server(self):
+ # we can't share ssh paramiko objects to force new connection
+ self.ssh_helper.drop_connection()
+
+ def instantiate(self, scenario_cfg, context_cfg):
+ self.scenario_helper.scenario_cfg = scenario_cfg
+ self.resource_helper.generate_cfg()
+ self.setup_helper.setup_vnf_environment()
+ self.resource_helper.setup()
+
+ LOG.info("Starting %s server...", self.APP_NAME)
+ self._tg_process = Process(target=self._start_server)
+ self._tg_process.start()
+
+ def wait_for_instantiate(self):
+ return self._wait_for_process()
+
+ def _check_status(self):
+ raise NotImplementedError
+
+ def _wait_for_process(self):
+ while True:
+ if not self._tg_process.is_alive():
+ raise RuntimeError("%s traffic generator process died." % self.APP_NAME)
+ LOG.info("Waiting for %s TG Server to start.. ", self.APP_NAME)
+ time.sleep(1)
+ status = self._check_status()
+ if status == 0:
+ LOG.info("%s TG Server is up and running.", self.APP_NAME)
+ return self._tg_process.exitcode
+
+ def _traffic_runner(self, traffic_profile):
+ LOG.info("Starting %s client...", self.APP_NAME)
+ self.resource_helper.run_traffic(traffic_profile)
+
+ def run_traffic(self, traffic_profile):
+ """ Generate traffic on the wire according to the given params.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Mandatory.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ self._traffic_process = Process(target=self._traffic_runner,
+ args=(traffic_profile,))
+ self._traffic_process.start()
+ # Wait for traffic process to start
+ while self.resource_helper.client_started.value == 0:
+ time.sleep(self.RUN_WAIT)
+
+ return self._traffic_process.is_alive()
+
+ def listen_traffic(self, traffic_profile):
+ """ Listen to traffic with the given parameters.
+ Method is non-blocking, returns immediately when traffic process
+ is running. Optional.
+
+ :param traffic_profile:
+ :return: True/False
+ """
+ pass
+
+ def verify_traffic(self, traffic_profile):
+ """ Verify captured traffic after it has ended. Optional.
+
+ :param traffic_profile:
+ :return: dict
+ """
+ pass
+
+ def collect_kpi(self):
+ result = self.resource_helper.collect_kpi()
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
+ return result
+
+ def terminate(self):
+ """ After this method finishes, all traffic processes should stop. Mandatory.
+
+ :return: True/False
+ """
+ self.traffic_finished = True
+ if self._traffic_process is not None:
+ self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
index 000a91db4..e65296287 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py
@@ -16,14 +16,13 @@
from __future__ import absolute_import
from __future__ import print_function
import logging
-import multiprocessing
import re
-import time
-import os
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import provision_tool
+from multiprocessing import Queue
+from ipaddress import IPv4Interface
+
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
LOG = logging.getLogger(__name__)
@@ -42,77 +41,59 @@ class PingParser(object):
if match:
# IMPORTANT: in order for the data to be properly taken
# in by InfluxDB, it needs to be converted to numeric types
- self.queue.put({"packets_received": float(match.group(1)),
- "rtt": float(match.group(2))})
+ self.queue.put({
+ "packets_received": float(match.group(1)),
+ "rtt": float(match.group(2)),
+ })
def close(self):
- ''' close the ssh connection '''
- pass
+ """ close the ssh connection """
+ self.closed = True
def clear(self):
- ''' clear queue till Empty '''
+ """ clear queue till Empty """
while self.queue.qsize() > 0:
self.queue.get()
-class PingTrafficGen(GenericTrafficGen):
+class PingSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ def setup_vnf_environment(self):
+ self._bind_kernel_devices()
+
+
+class PingTrafficGen(SampleVNFTrafficGen):
"""
This traffic generator can ping a single IP with pingsize
and target given in traffic profile
"""
- def __init__(self, vnfd):
- super(PingTrafficGen, self).__init__(vnfd)
+ TG_NAME = 'Ping'
+ RUN_WAIT = 4
+
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = PingSetupEnvHelper
+
+ super(PingTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
+ self._queue = Queue()
+ self._parser = PingParser(self._queue)
self._result = {}
- self._parser = None
- self._queue = None
- self._traffic_process = None
-
- mgmt_interface = vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- def _bind_device_kernel(self, connection):
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- drivers = {intf["virtual-interface"]["vpci"]:
- intf["virtual-interface"]["driver"]
- for intf in self.vnfd["vdu"][0]["external-interface"]}
-
- commands = \
- ['"{0}" --force -b "{1}" "{2}"'.format(dpdk_nic_bind, value, key)
- for key, value in drivers.items()]
- for command in commands:
- connection.execute(command)
-
- for index, out in enumerate(self.vnfd["vdu"][0]["external-interface"]):
- vpci = out["virtual-interface"]["vpci"]
- net = "find /sys/class/net -lname '*{}*' -printf '%f'".format(vpci)
- out = connection.execute(net)[1]
- ifname = out.split('/')[-1].strip('\n')
- self.vnfd["vdu"][0]["external-interface"][index][
- "virtual-interface"]["local_iface_name"] = ifname
def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(PingTrafficGen, self).scale(flavor)
+ """ scale vnf-based on flavor input """
+ pass
- def instantiate(self, scenario_cfg, context_cfg):
- self._result = {"packets_received": 0, "rtt": 0}
- self._bind_device_kernel(self.connection)
+ def _check_status(self):
+ return self._tg_process.is_alive()
- def run_traffic(self, traffic_profile):
- self._queue = multiprocessing.Queue()
- self._parser = PingParser(self._queue)
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._parser))
- self._traffic_process.start()
- # Wait for traffic process to start
- time.sleep(4)
- return self._traffic_process.is_alive()
+ def instantiate(self, scenario_cfg, context_cfg):
+ self._result = {
+ "packets_received": 0,
+ "rtt": 0,
+ }
+ self.setup_helper.setup_vnf_environment()
def listen_traffic(self, traffic_profile):
""" Not needed for ping
@@ -122,38 +103,26 @@ class PingTrafficGen(GenericTrafficGen):
"""
pass
- def _traffic_runner(self, traffic_profile, filewrapper):
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
- external_interface = self.vnfd["vdu"][0]["external-interface"]
- virtual_interface = external_interface[0]["virtual-interface"]
- target_ip = virtual_interface["dst_ip"].split('/')[0]
- local_ip = virtual_interface["local_ip"].split('/')[0]
- local_if_name = \
- virtual_interface["local_iface_name"].split('/')[0]
- packet_size = traffic_profile.params["traffic_profile"]["frame_size"]
-
- run_cmd = []
-
- run_cmd.append("ip addr flush %s" % local_if_name)
- run_cmd.append("ip addr add %s/24 dev %s" % (local_ip, local_if_name))
- run_cmd.append("ip link set %s up" % local_if_name)
-
- for cmd in run_cmd:
- self.connection.execute(cmd)
-
- ping_cmd = ("ping -s %s %s" % (packet_size, target_ip))
- self.connection.run(ping_cmd, stdout=filewrapper,
+ def _traffic_runner(self, traffic_profile):
+ intf = self.vnfd_helper.interfaces[0]["virtual-interface"]
+ profile = traffic_profile.params["traffic_profile"]
+ cmd_kwargs = {
+ 'target_ip': IPv4Interface(intf["dst_ip"]).ip.exploded,
+ 'local_ip': IPv4Interface(intf["local_ip"]).ip.exploded,
+ 'local_if_name': intf["local_iface_name"].split('/')[0],
+ 'packet_size': profile["frame_size"],
+ }
+
+ cmd_list = [
+ "sudo ip addr flush {local_if_name}",
+ "sudo ip addr add {local_ip}/24 dev {local_if_name}",
+ "sudo ip link set {local_if_name} up",
+ ]
+
+ for cmd in cmd_list:
+ self.ssh_helper.execute(cmd.format(**cmd_kwargs))
+
+ ping_cmd = "ping -s {packet_size} {target_ip}"
+ self.ssh_helper.run(ping_cmd.format(**cmd_kwargs),
+ stdout=self._parser,
keep_stdin_open=True, pty=True)
-
- def collect_kpi(self):
- if not self._queue.empty():
- kpi = self._queue.get()
- self._result.update(kpi)
- return self._result
-
- def terminate(self):
- if self._traffic_process is not None:
- self._traffic_process.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
index 7da4b31e9..79e42e0a8 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_trex.py
@@ -15,267 +15,98 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
import time
import logging
-import os
-import yaml
+from collections import Mapping
+from itertools import chain
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
-from yardstick.network_services.utils import get_nsb_option
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import Rfc2544ResourceHelper
+from yardstick.network_services.vnf_generic.vnf.tg_trex import TrexResourceHelper
LOGGING = logging.getLogger(__name__)
-DURATION = 30
-WAIT_TIME = 3
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
+class TrexRfc2544ResourceHelper(Rfc2544ResourceHelper):
-class TrexTrafficGenRFC(GenericTrafficGen):
- """
- This class handles mapping traffic profile and generating
- traffic for rfc2544 testcase.
- """
-
- def __init__(self, vnfd):
- super(TrexTrafficGenRFC, self).__init__(vnfd)
- self._result = {}
- self._terminated = multiprocessing.Value('i', 0)
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.tc_file_name = None
- self.client = None
- self.my_ports = None
-
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- ext_intf = vnfd["vdu"][0]["external-interface"]
- for interface in ext_intf:
- virt_intf = interface["virtual-interface"]
- vpci.append(virt_intf["vpci"])
-
- port["src_mac"] = \
- self._split_mac_address_into_list(virt_intf["local_mac"])
-
- time.sleep(WAIT_TIME)
- port["dest_mac"] = \
- self._split_mac_address_into_list(virt_intf["dst_mac"])
- if virt_intf["dst_mac"]:
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGenRFC, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
- trex = os.path.join(self.bin_path, "trex")
- err, _, _ = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))
- if err != 0:
- self.connection.put(trex, trex, True)
+ def is_done(self):
+ return self.latency and self.iteration.value > 10
- LOGGING.debug("Starting TRex server...")
- _tg_server = \
- multiprocessing.Process(target=self._start_server)
- _tg_server.start()
- while True:
- LOGGING.info("Waiting for TG Server to start.. ")
- time.sleep(WAIT_TIME)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOGGING.info("TG server is up and running.")
- return _tg_server.exitcode
- if not _tg_server.is_alive():
- raise RuntimeError("Traffic Generator process died.")
+class TrexRfcResourceHelper(TrexResourceHelper):
- def listen_traffic(self, traffic_profile):
- pass
+ LATENCY_TIME_SLEEP = 120
+ RUN_DURATION = 30
+ WAIT_TIME = 3
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
+ def __init__(self, setup_helper, rfc_helper_type=None):
+ super(TrexRfcResourceHelper, self).__init__(setup_helper)
- def run_traffic(self, traffic_profile,
- client_started=multiprocessing.Value('i', 0)):
+ if rfc_helper_type is None:
+ rfc_helper_type = TrexRfc2544ResourceHelper
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- client_started, self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while client_started.value == 0:
- time.sleep(1)
+ self.rfc2544_helper = rfc_helper_type(self.scenario_helper)
+ # self.tg_port_pairs = []
- return self._traffic_process.is_alive()
+ def _build_ports(self):
+ self.tg_port_pairs, self.networks = MultiPortConfig.get_port_pairs(
+ self.vnfd_helper.interfaces)
+ self.priv_ports = [int(x[0][-1]) for x in self.tg_port_pairs]
+ self.pub_ports = [int(x[1][-1]) for x in self.tg_port_pairs]
+ self.my_ports = list(set(chain(self.priv_ports, self.pub_ports)))
- def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
-
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- _server.execute("pkill -9 rex > /dev/null 2>&1")
-
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOGGING.info("Unable to connect to Trex. Attempt %s", idx)
- time.sleep(WAIT_TIME)
- return client
-
- @classmethod
- def _get_rfc_tolerance(cls, tc_yaml):
- tolerance = '0.8 - 1.0'
- if 'tc_options' in tc_yaml['scenarios'][0]:
- tc_options = tc_yaml['scenarios'][0]['tc_options']
- if 'rfc2544' in tc_options:
- tolerance = \
- tc_options['rfc2544'].get('allowed_drop_rate', '0.8 - 1.0')
-
- tolerance = tolerance.split('-')
- min_tol = float(tolerance[0])
- if len(tolerance) == 2:
- max_tol = float(tolerance[1])
- else:
- max_tol = float(tolerance[0])
-
- return [min_tol, max_tol]
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOGGING.info("Starting TRex client...")
- tc_yaml = {}
-
- with open(self.tc_file_name) as tc_file:
- tc_yaml = yaml.load(tc_file.read())
+ def _run_traffic_once(self, traffic_profile):
+ traffic_profile.execute(self)
+ self.client_started.value = 1
+ time.sleep(self.RUN_DURATION)
+ self.client.stop(self.my_ports)
+ time.sleep(self.WAIT_TIME)
+ samples = traffic_profile.get_drop_percentage(self)
+ self._queue.put(samples)
- tolerance = self._get_rfc_tolerance(tc_yaml)
+ if not self.rfc2544_helper.is_done():
+ return
- # fixme: fix passing correct trex config file,
- # instead of searching the default path
- self.my_ports = [0, 1]
- self.client = self._connect_client()
+ self.client.stop(self.my_ports)
self.client.reset(ports=self.my_ports)
- self.client.remove_all_streams(self.my_ports) # remove all streams
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- time.sleep(DURATION)
+ self.client.remove_all_streams(self.my_ports)
+ traffic_profile.execute_latency(samples=samples)
+ multiplier = traffic_profile.calculate_pps(samples)[1]
+ for _ in range(5):
+ time.sleep(self.LATENCY_TIME_SLEEP)
self.client.stop(self.my_ports)
- time.sleep(WAIT_TIME)
+ time.sleep(self.WAIT_TIME)
last_res = self.client.get_stats(self.my_ports)
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- if not isinstance(last_res, dict):
- terminated.value = 1
- last_res = {}
+ if not isinstance(last_res, Mapping):
+ self._terminated.value = 1
+ continue
+ self.generate_samples('latency', {})
+ self._queue.put(samples)
+ self.client.start(mult=str(multiplier),
+ ports=self.my_ports,
+ duration=120, force=True)
- samples[name] = \
- {"rx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("rx_pps", 0.0)),
- "tx_throughput_fps":
- float(last_res.get(vpci_idx, {}).get("tx_pps", 0.0)),
- "rx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("rx_bps", 0.0)),
- "tx_throughput_mbps":
- float(last_res.get(vpci_idx, {}).get("tx_bps", 0.0)),
- "in_packets":
- last_res.get(vpci_idx, {}).get("ipackets", 0),
- "out_packets":
- last_res.get(vpci_idx, {}).get("opackets", 0)}
+ def start_client(self, mult, duration, force=True):
+ self.client.start(ports=self.my_ports, mult=mult, duration=duration, force=force)
- samples = \
- traffic_profile.get_drop_percentage(self, samples,
- tolerance[0], tolerance[1])
- queue.put(samples)
- self.client.stop(self.my_ports)
- self.client.disconnect()
- queue.put(samples)
+ def clear_client_stats(self):
+ self.client.clear_stats(ports=self.my_ports)
def collect_kpi(self):
- if not self._queue.empty():
- result = self._queue.get()
- self._result.update(result)
- LOGGING.debug("trex collect Kpis %s", self._result)
- return self._result
+ self.rfc2544_helper.iteration.value += 1
+ super(TrexRfcResourceHelper, self).collect_kpi()
+
- def terminate(self):
- self._terminated.value = 1 # stop Trex clinet
+class TrexTrafficGenRFC(TrexTrafficGen):
+ """
+ This class handles mapping traffic profile and generating
+ traffic for rfc2544 testcase.
+ """
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexRfcResourceHelper
- if self._traffic_process:
- self._traffic_process.terminate()
+ super(TrexTrafficGenRFC, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
diff --git a/yardstick/network_services/vnf_generic/vnf/tg_trex.py b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
index 058b715fe..616b331ba 100644
--- a/yardstick/network_services/vnf_generic/vnf/tg_trex.py
+++ b/yardstick/network_services/vnf_generic/vnf/tg_trex.py
@@ -15,261 +15,136 @@
from __future__ import absolute_import
from __future__ import print_function
-import multiprocessing
-import time
+
import logging
import os
+
import yaml
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.common.utils import mac_address_to_hex_list
from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.utils import provision_tool
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
LOG = logging.getLogger(__name__)
-DURATION = 30
-WAIT_QUEUE = 1
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
-class TrexTrafficGen(GenericTrafficGen):
+class TrexResourceHelper(ClientResourceHelper):
+
+ CONF_FILE = '/tmp/trex_cfg.yaml'
+ QUEUE_WAIT_TIME = 1
+ RESOURCE_WORD = 'trex'
+ RUN_DURATION = 0
+
+ SYNC_PORT = 4500
+ ASYNC_PORT = 4501
+
+ def generate_cfg(self):
+ ext_intf = self.vnfd_helper.interfaces
+ vpci_list = []
+ port_list = []
+ trex_cfg = {
+ 'port_limit': 0,
+ 'version': '2',
+ 'interfaces': vpci_list,
+ 'port_info': port_list,
+ "port_limit": len(ext_intf),
+ "version": '2',
+ }
+ cfg_file = [trex_cfg]
+
+ for interface in ext_intf:
+ virtual_interface = interface['virtual-interface']
+ vpci_list.append(virtual_interface["vpci"])
+ dst_mac = virtual_interface["dst_mac"]
+
+ if not dst_mac:
+ continue
+
+ local_mac = virtual_interface["local_mac"]
+ port_list.append({
+ "src_mac": mac_address_to_hex_list(local_mac),
+ "dest_mac": mac_address_to_hex_list(dst_mac),
+ })
+
+ cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True)
+ self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str)
+ self._vpci_ascending = sorted(vpci_list)
+
+ def check_status(self):
+ status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT)
+ return status
+
+ # temp disable
+ DISABLE_DEPLOY = True
+
+ def setup(self):
+ if self.DISABLE_DEPLOY:
+ return
+
+ trex_path = self.ssh_helper.join_bin_path('trex')
+
+ err = self.ssh_helper.execute("which {}".format(trex_path))[0]
+ if err == 0:
+ return
+
+ LOG.info("Copying %s to destination...", self.RESOURCE_WORD)
+ self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path)))
+ self.ssh_helper.put("~/.bash_profile", "~/.bash_profile")
+ self.ssh_helper.put(trex_path, trex_path, True)
+ ko_src = os.path.join(trex_path, "scripts/ko/src/")
+ self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src))
+
+ def start(self, ports=None, *args, **kwargs):
+ cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd.format(self))
+
+ self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1")
+
+ trex_path = self.ssh_helper.join_bin_path("trex", "scripts")
+ path = get_nsb_option("trex_path", trex_path)
+
+ # cmd = "sudo ./t-rex-64 -i --cfg %s > /dev/null 2>&1" % self.CONF_FILE
+ cmd = "./t-rex-64 -i --cfg '{}'".format(self.CONF_FILE)
+
+ # if there are errors we want to see them
+ # we have to sudo cd because the path might be owned by root
+ trex_cmd = """sudo bash -c "cd '{}' ; {}" >/dev/null""".format(path, cmd)
+ self.ssh_helper.execute(trex_cmd)
+
+ def terminate(self):
+ super(TrexResourceHelper, self).terminate()
+ cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1"
+ self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT))
+
+
+class TrexTrafficGen(SampleVNFTrafficGen):
"""
This class handles mapping traffic profile and generating
traffic for given testcase
"""
- def __init__(self, vnfd):
- super(TrexTrafficGen, self).__init__(vnfd)
- self._result = {}
- self._queue = multiprocessing.Queue()
- self._terminated = multiprocessing.Value('i', 0)
- self._traffic_process = None
- self._vpci_ascending = None
- self.client = None
- self.my_ports = None
- self.client_started = multiprocessing.Value('i', 0)
-
- mgmt_interface = vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- @classmethod
- def _split_mac_address_into_list(cls, mac):
- octets = mac.split(':')
- for i, elem in enumerate(octets):
- octets[i] = "0x" + str(elem)
- return octets
-
- def _generate_trex_cfg(self, vnfd):
- """
-
- :param vnfd: vnfd.yaml
- :return: trex_cfg.yaml file
- """
- trex_cfg = dict(
- port_limit=0,
- version='2',
- interfaces=[],
- port_info=list(dict(
- ))
- )
- trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
- trex_cfg["version"] = '2'
-
- cfg_file = []
- vpci = []
- port = {}
-
- for interface in range(len(vnfd["vdu"][0]["external-interface"])):
- ext_intrf = vnfd["vdu"][0]["external-interface"]
- virtual_interface = ext_intrf[interface]["virtual-interface"]
- vpci.append(virtual_interface["vpci"])
-
- port["src_mac"] = self._split_mac_address_into_list(
- virtual_interface["local_mac"])
- port["dest_mac"] = self._split_mac_address_into_list(
- virtual_interface["dst_mac"])
-
- trex_cfg["port_info"].append(port.copy())
-
- trex_cfg["interfaces"] = vpci
- cfg_file.append(trex_cfg)
-
- with open('/tmp/trex_cfg.yaml', 'w') as outfile:
- outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
- self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
- self._vpci_ascending = sorted(vpci)
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
+ APP_NAME = 'TRex'
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if resource_helper_type is None:
+ resource_helper_type = TrexResourceHelper
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(TrexTrafficGen, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- self._generate_trex_cfg(self.vnfd)
- self.setup_vnf_environment(self.connection)
-
- trex = os.path.join(self.bin_path, "trex")
- err = \
- self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
- if err != 0:
- LOG.info("Copying trex to destination...")
- self.connection.put("/root/.bash_profile", "/root/.bash_profile")
- self.connection.put(trex, trex, True)
- ko_src = os.path.join(trex, "scripts/ko/src/")
- self.connection.execute("cd %s && make && make install" % ko_src)
-
- LOG.info("Starting TRex server...")
- _tg_process = \
- multiprocessing.Process(target=self._start_server)
- _tg_process.start()
- while True:
- if not _tg_process.is_alive():
- raise RuntimeError("Traffic Generator process died.")
- LOG.info("Waiting for TG Server to start.. ")
- time.sleep(1)
- status = \
- self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
- if status == 0:
- LOG.info("TG server is up and running.")
- return _tg_process.exitcode
-
- def listen_traffic(self, traffic_profile):
- pass
+ super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+ resource_helper_type)
- def _get_logical_if_name(self, vpci):
- ext_intf = self.vnfd["vdu"][0]["external-interface"]
- for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
- virtual_intf = ext_intf[interface]["virtual-interface"]
- if virtual_intf["vpci"] == vpci:
- return ext_intf[interface]["name"]
-
- def run_traffic(self, traffic_profile):
- self._traffic_process = \
- multiprocessing.Process(target=self._traffic_runner,
- args=(traffic_profile, self._queue,
- self.client_started,
- self._terminated))
- self._traffic_process.start()
- # Wait for traffic process to start
- while self.client_started.value == 0:
- time.sleep(1)
-
- return self._traffic_process.is_alive()
+ def _check_status(self):
+ return self.resource_helper.check_status()
def _start_server(self):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- _server = ssh.SSH.from_node(mgmt_interface)
- _server.wait()
+ super(TrexTrafficGen, self)._start_server()
+ self.resource_helper.start()
- _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+ def scale(self, flavor=""):
+ pass
- trex_path = os.path.join(self.bin_path, "trex/scripts")
- path = get_nsb_option("trex_path", trex_path)
- trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
- _server.execute(trex_cmd)
-
- def _connect_client(self, client=None):
- if client is None:
- client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
- server=self.vnfd["mgmt-interface"]["ip"],
- verbose_level=LoggerApi.VERBOSE_QUIET)
- # try to connect with 5s intervals, 30s max
- for idx in range(6):
- try:
- client.connect()
- break
- except STLError:
- LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
- time.sleep(5)
- return client
-
- def _traffic_runner(self, traffic_profile, queue,
- client_started, terminated):
- LOG.info("Starting TRex client...")
-
- self.my_ports = [0, 1]
- self.client = self._connect_client()
- self.client.reset(ports=self.my_ports)
-
- self.client.remove_all_streams(self.my_ports) # remove all streams
-
- while not terminated.value:
- traffic_profile.execute(self)
- client_started.value = 1
- last_res = self.client.get_stats(self.my_ports)
- if not isinstance(last_res, dict): # added for mock unit test
- terminated.value = 1
- last_res = {}
-
- samples = {}
- for vpci_idx in range(len(self._vpci_ascending)):
- name = \
- self._get_logical_if_name(self._vpci_ascending[vpci_idx])
- # fixme: VNFDs KPIs values needs to be mapped to TRex structure
- xe_value = last_res.get(vpci_idx, {})
- samples[name] = \
- {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
- "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
- "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
- "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
- "in_packets": xe_value.get("ipackets", 0),
- "out_packets": xe_value.get("opackets", 0)}
- time.sleep(WAIT_QUEUE)
- queue.put(samples)
-
- self.client.disconnect()
- terminated.value = 0
-
- def collect_kpi(self):
- if not self._queue.empty():
- self._result.update(self._queue.get())
- LOG.debug("trex collect Kpis %s", self._result)
- return self._result
+ def listen_traffic(self, traffic_profile):
+ pass
def terminate(self):
- self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
- (TREX_SYNC_PORT, TREX_ASYNC_PORT))
- self.traffic_finished = True
- if self._traffic_process:
- self._traffic_process.terminate()
+ self.resource_helper.terminate()
diff --git a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
index e9e80bdfb..310ab67cb 100644
--- a/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/vpe_vnf.py
@@ -15,313 +15,268 @@
from __future__ import absolute_import
from __future__ import print_function
-import tempfile
-import time
import os
import logging
import re
-from multiprocessing import Queue
-import multiprocessing
-import ipaddress
-import six
+import posixpath
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
-from yardstick.network_services.utils import provision_tool
-from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
-from yardstick.network_services.nfvi.resource import ResourceProfile
+from six.moves import configparser, zip
-LOG = logging.getLogger(__name__)
-VPE_PIPELINE_COMMAND = '{tool_path} -p 0x3 -f {cfg_file} -s {script}'
-CORES = ['0', '1', '2']
-WAIT_TIME = 20
+from yardstick.network_services.pipeline import PipelineRules
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
+LOG = logging.getLogger(__name__)
-class VpeApproxVnf(GenericVNF):
+VPE_PIPELINE_COMMAND = """sudo {tool_path} -p {ports_len_hex} -f {cfg_file} -s {script}"""
+
+VPE_COLLECT_KPI = """\
+Pkts in:\s(\d+)\r\n\
+\tPkts dropped by Pkts in:\s(\d+)\r\n\
+\tPkts dropped by AH:\s(\d+)\r\n\\
+\tPkts dropped by other:\s(\d+)\
+"""
+
+
+class ConfigCreate(object):
+
+ @staticmethod
+ def vpe_tmq(config, index):
+ tm_q = 'TM{0}'.format(index)
+ config.add_section(tm_q)
+ config.set(tm_q, 'burst_read', '24')
+ config.set(tm_q, 'burst_write', '32')
+ config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
+ return config
+
+ def __init__(self, priv_ports, pub_ports, socket):
+ super(ConfigCreate, self).__init__()
+ self.sw_q = -1
+ self.sink_q = -1
+ self.n_pipeline = 1
+ self.priv_ports = priv_ports
+ self.pub_ports = pub_ports
+ self.pipeline_per_port = 9
+ self.socket = socket
+
+ def vpe_initialize(self, config):
+ config.add_section('EAL')
+ config.set('EAL', 'log_level', '0')
+
+ config.add_section('PIPELINE0')
+ config.set('PIPELINE0', 'type', 'MASTER')
+ config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
+
+ config.add_section('MEMPOOL0')
+ config.set('MEMPOOL0', 'pool_size', '256K')
+
+ config.add_section('MEMPOOL1')
+ config.set('MEMPOOL1', 'pool_size', '2M')
+ return config
+
+ def vpe_rxq(self, config):
+ for port in self.pub_ports:
+ new_section = 'RXQ{0}.0'.format(port)
+ config.add_section(new_section)
+ config.set(new_section, 'mempool', 'MEMPOOL1')
+
+ return config
+
+ def get_sink_swq(self, parser, pipeline, k, index):
+ sink = ""
+ pktq = parser.get(pipeline, k)
+ if "SINK" in pktq:
+ self.sink_q += 1
+ sink = " SINK{0}".format(self.sink_q)
+ if "TM" in pktq:
+ sink = " TM{0}".format(index)
+ pktq = "SWQ{0}{1}".format(self.sw_q, sink)
+ return pktq
+
+ def vpe_upstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ if k == "pktq_in":
+ index = intf['index']
+ if "RXQ" in v:
+ value = "RXQ{0}.0".format(index)
+ else:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ elif k == "pktq_out":
+ index = intf['peer_intf']['index']
+ if "TXQ" in v:
+ value = "TXQ{0}.0".format(index)
+ else:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def vpe_downstream(self, vnf_cfg, intf):
+ parser = configparser.ConfigParser()
+ parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
+ for pipeline in parser.sections():
+ for k, v in parser.items(pipeline):
+ index = intf['dpdk_port_num']
+ peer_index = intf['peer_intf']['dpdk_port_num']
+
+ if k == "pktq_in":
+ if "RXQ" not in v:
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "RXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "RXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ if k == "pktq_out":
+ if "TXQ" not in v:
+ self.sw_q += 1
+ value = self.get_sink_swq(parser, pipeline, k, index)
+ elif "TM" in v:
+ value = "TXQ{0}.0 TM{1}".format(peer_index, index)
+ else:
+ value = "TXQ{0}.0".format(peer_index)
+
+ parser.set(pipeline, k, value)
+
+ new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
+ if new_pipeline != pipeline:
+ parser._sections[new_pipeline] = parser._sections[pipeline]
+ parser._sections.pop(pipeline)
+ self.n_pipeline += 1
+ return parser
+
+ def create_vpe_config(self, vnf_cfg):
+ config = configparser.ConfigParser()
+ vpe_cfg = os.path.join("/tmp/vpe_config")
+ with open(vpe_cfg, 'w') as cfg_file:
+ config = self.vpe_initialize(config)
+ config = self.vpe_rxq(config)
+ config.write(cfg_file)
+ for index, priv_port in enumerate(self.priv_ports):
+ config = self.vpe_upstream(vnf_cfg, priv_port)
+ config.write(cfg_file)
+ config = self.vpe_downstream(vnf_cfg, priv_port)
+ config = self.vpe_tmq(config, index)
+ config.write(cfg_file)
+
+ def generate_vpe_script(self, interfaces):
+ rules = PipelineRules(pipeline_id=1)
+ for priv_port, pub_port in zip(self.priv_ports, self.pub_ports):
+ priv_intf = interfaces[priv_port]["virtual-interface"]
+ pub_intf = interfaces[pub_port]["virtual-interface"]
+
+ dst_port0_ip = priv_intf["dst_ip"]
+ dst_port1_ip = pub_intf["dst_ip"]
+ dst_port0_mac = priv_intf["dst_mac"]
+ dst_port1_mac = pub_intf["dst_mac"]
+
+ rules.add_firewall_script(dst_port0_ip)
+ rules.next_pipeline()
+ rules.add_flow_classification_script()
+ rules.next_pipeline()
+ rules.add_flow_action()
+ rules.next_pipeline()
+ rules.add_flow_action2()
+ rules.next_pipeline()
+ rules.add_route_script(dst_port1_ip, dst_port1_mac)
+ rules.next_pipeline()
+ rules.add_route_script2(dst_port0_ip, dst_port0_mac)
+ rules.next_pipeline(num=4)
+
+ return rules.get_string()
+
+
+class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
+
+ CFG_CONFIG = "/tmp/vpe_config"
+ CFG_SCRIPT = "/tmp/vpe_script"
+ CORES = ['0', '1', '2', '3', '4', '5']
+ PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
+
+ def build_config(self):
+ vpe_vars = {
+ "bin_path": self.ssh_helper.bin_path,
+ "socket": self.socket,
+ }
+
+ all_ports = []
+ priv_ports = []
+ pub_ports = []
+ for interface in self.vnfd_helper.interfaces:
+ all_ports.append(interface['name'])
+ vld_id = interface['virtual-interface']['vld_id']
+ if vld_id.startswith('private'):
+ priv_ports.append(interface)
+ elif vld_id.startswith('public'):
+ pub_ports.append(interface)
+
+ vpe_conf = ConfigCreate(priv_ports, pub_ports, self.socket)
+ vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
+
+ config_basename = posixpath.basename(self.CFG_CONFIG)
+ script_basename = posixpath.basename(self.CFG_SCRIPT)
+ with open(self.CFG_CONFIG) as handle:
+ vpe_config = handle.read()
+
+ self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
+
+ vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
+ self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
+
+
+class VpeApproxVnf(SampleVNF):
""" This class handles vPE VNF model-driver definitions """
- def __init__(self, vnfd):
- super(VpeApproxVnf, self).__init__(vnfd)
- self.socket = None
- self.q_in = Queue()
- self.q_out = Queue()
- self.vnf_cfg = None
- self._vnf_process = None
- self.connection = None
- self.resource = None
-
- def _resource_collect_start(self):
- self.resource.initiate_systemagent(self.bin_path)
- self.resource.start()
+ APP_NAME = 'vPE_vnf'
+ APP_WORD = 'vpe'
+ COLLECT_KPI = VPE_COLLECT_KPI
+ WAIT_TIME = 20
- def _resource_collect_stop(self):
- self.resource.stop()
+ def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+ if setup_env_helper_type is None:
+ setup_env_helper_type = VpeApproxSetupEnvHelper
- def _collect_resource_kpi(self):
- result = {}
+ super(VpeApproxVnf, self).__init__(name, vnfd, setup_env_helper_type, resource_helper_type)
- status = self.resource.check_if_sa_running("collectd")[0]
- if status:
- result = self.resource.amqp_collect_nfvi_kpi()
-
- result = {"core": result}
-
- return result
-
- @classmethod
- def __setup_hugepages(cls, connection):
- hugepages = \
- connection.execute(
- "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
- hugepages = hugepages.rstrip()
-
- memory_path = \
- '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
- connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
- pages = 16384 if hugepages.rstrip() == "2048kB" else 16
- connection.execute("echo %s > %s" % (pages, memory_path))
-
- def setup_vnf_environment(self, connection):
- ''' setup dpdk environment needed for vnf to run '''
-
- self.__setup_hugepages(connection)
- connection.execute("modprobe uio && modprobe igb_uio")
-
- exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
- if exit_status == 0:
- return
-
- dpdk = os.path.join(self.bin_path, "dpdk-16.07")
- dpdk_setup = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "nsb_setup.sh"))
- status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
- if status:
- connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
-
- def _get_cpu_sibling_list(self):
- cpu_topo = []
- for core in CORES:
- sys_cmd = \
- "/sys/devices/system/cpu/cpu%s/topology/thread_siblings_list" \
- % core
- cpuid = \
- self.connection.execute("awk -F: '{ print $1 }' < %s" %
- sys_cmd)[1]
- cpu_topo += \
- [(idx) if idx.isdigit() else idx for idx in cpuid.split(',')]
-
- return [cpu.strip() for cpu in cpu_topo]
-
- def scale(self, flavor=""):
- ''' scale vnfbased on flavor input '''
- super(VpeApproxVnf, self).scale(flavor)
-
- def instantiate(self, scenario_cfg, context_cfg):
- vnf_cfg = scenario_cfg['vnf_options']['vpe']['cfg']
-
- mgmt_interface = self.vnfd["mgmt-interface"]
- self.connection = ssh.SSH.from_node(mgmt_interface)
-
- self.tc_file_name = '{0}.yaml'.format(scenario_cfg['tc'])
-
- self.setup_vnf_environment(self.connection)
-
- cores = self._get_cpu_sibling_list()
- self.resource = ResourceProfile(self.vnfd, cores)
-
- self.connection.execute("pkill vPE_vnf")
- dpdk_nic_bind = \
- provision_tool(self.connection,
- os.path.join(self.bin_path, "dpdk_nic_bind.py"))
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- self.socket = \
- next((0 for v in interfaces
- if v['virtual-interface']["vpci"][5] == "0"), 1)
-
- bound_pci = [v['virtual-interface']["vpci"] for v in interfaces]
- for vpci in bound_pci:
- self.connection.execute(
- "%s --force -b igb_uio %s" % (dpdk_nic_bind, vpci))
- queue_wrapper = \
- QueueFileWrapper(self.q_in, self.q_out, "pipeline>")
- self._vnf_process = multiprocessing.Process(target=self._run_vpe,
- args=(queue_wrapper,
- vnf_cfg,))
- self._vnf_process.start()
- buf = []
- time.sleep(WAIT_TIME) # Give some time for config to load
- while True:
- message = ''
- while self.q_out.qsize() > 0:
- buf.append(self.q_out.get())
- message = ''.join(buf)
- if "pipeline>" in message:
- LOG.info("VPE VNF is up and running.")
- queue_wrapper.clear()
- self._resource_collect_start()
- return self._vnf_process.exitcode
- if "PANIC" in message:
- raise RuntimeError("Error starting vPE VNF.")
-
- LOG.info("Waiting for VNF to start.. ")
- time.sleep(3)
- if not self._vnf_process.is_alive():
- raise RuntimeError("vPE VNF process died.")
-
- def _get_ports_gateway(self, name):
- if 'routing_table' in self.vnfd['vdu'][0]:
- routing_table = self.vnfd['vdu'][0]['routing_table']
-
- for route in routing_table:
- if name == route['if']:
- return route['gateway']
-
- def terminate(self):
- self.execute_command("quit")
- if self._vnf_process:
- self._vnf_process.terminate()
-
- def _run_vpe(self, filewrapper, vnf_cfg):
- mgmt_interface = self.vnfd["mgmt-interface"]
-
- self.connection = ssh.SSH.from_node(mgmt_interface)
- self.connection.wait()
-
- interfaces = self.vnfd["vdu"][0]['external-interface']
- port0_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[0]["virtual-interface"]["local_ip"],
- interfaces[0]["virtual-interface"]["netmask"])))
- port1_ip = ipaddress.ip_interface(six.text_type(
- "%s/%s" % (interfaces[1]["virtual-interface"]["local_ip"],
- interfaces[1]["virtual-interface"]["netmask"])))
- dst_port0_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[0]["virtual-interface"]["dst_ip"],
- interfaces[0]["virtual-interface"]["netmask"]))
- dst_port1_ip = ipaddress.ip_interface(
- u"%s/%s" % (interfaces[1]["virtual-interface"]["dst_ip"],
- interfaces[1]["virtual-interface"]["netmask"]))
-
- vpe_vars = {"port0_local_ip": port0_ip.ip.exploded,
- "port0_dst_ip": dst_port0_ip.ip.exploded,
- "port0_local_ip_hex":
- self._ip_to_hex(port0_ip.ip.exploded),
- "port0_prefixlen": port0_ip.network.prefixlen,
- "port0_netmask": port0_ip.network.netmask.exploded,
- "port0_netmask_hex":
- self._ip_to_hex(port0_ip.network.netmask.exploded),
- "port0_local_mac":
- interfaces[0]["virtual-interface"]["local_mac"],
- "port0_dst_mac":
- interfaces[0]["virtual-interface"]["dst_mac"],
- "port0_gateway":
- self._get_ports_gateway(interfaces[0]["name"]),
- "port0_local_network":
- port0_ip.network.network_address.exploded,
- "port0_prefix": port0_ip.network.prefixlen,
- "port1_local_ip": port1_ip.ip.exploded,
- "port1_dst_ip": dst_port1_ip.ip.exploded,
- "port1_local_ip_hex":
- self._ip_to_hex(port1_ip.ip.exploded),
- "port1_prefixlen": port1_ip.network.prefixlen,
- "port1_netmask": port1_ip.network.netmask.exploded,
- "port1_netmask_hex":
- self._ip_to_hex(port1_ip.network.netmask.exploded),
- "port1_local_mac":
- interfaces[1]["virtual-interface"]["local_mac"],
- "port1_dst_mac":
- interfaces[1]["virtual-interface"]["dst_mac"],
- "port1_gateway":
- self._get_ports_gateway(interfaces[1]["name"]),
- "port1_local_network":
- port1_ip.network.network_address.exploded,
- "port1_prefix": port1_ip.network.prefixlen,
- "port0_local_ip6": self._get_port0localip6(),
- "port1_local_ip6": self._get_port1localip6(),
- "port0_prefixlen6": self._get_port0prefixlen6(),
- "port1_prefixlen6": self._get_port1prefixlen6(),
- "port0_gateway6": self._get_port0gateway6(),
- "port1_gateway6": self._get_port1gateway6(),
- "port0_dst_ip_hex6": self._get_port0localip6(),
- "port1_dst_ip_hex6": self._get_port1localip6(),
- "port0_dst_netmask_hex6": self._get_port0prefixlen6(),
- "port1_dst_netmask_hex6": self._get_port1prefixlen6(),
- "bin_path": self.bin_path,
- "socket": self.socket}
-
- for cfg in os.listdir(vnf_cfg):
- vpe_config = ""
- with open(os.path.join(vnf_cfg, cfg), 'r') as vpe_cfg:
- vpe_config = vpe_cfg.read()
-
- self._provide_config_file(cfg, vpe_config, vpe_vars)
-
- LOG.info("Provision and start the vPE")
- tool_path = provision_tool(self.connection,
- os.path.join(self.bin_path, "vPE_vnf"))
- cmd = VPE_PIPELINE_COMMAND.format(cfg_file="/tmp/vpe_config",
- script="/tmp/vpe_script",
- tool_path=tool_path)
- self.connection.run(cmd, stdin=filewrapper, stdout=filewrapper,
- keep_stdin_open=True, pty=True)
-
- def _provide_config_file(self, prefix, template, args):
- cfg, cfg_content = tempfile.mkstemp()
- cfg = os.fdopen(cfg, "w+")
- cfg.write(template.format(**args))
- cfg.close()
- cfg_file = "/tmp/%s" % prefix
- self.connection.put(cfg_content, cfg_file)
- return cfg_file
-
- def execute_command(self, cmd):
- ''' send cmd to vnf process '''
- LOG.info("VPE command: %s", cmd)
- output = []
- if self.q_in:
- self.q_in.put(cmd + "\r\n")
- time.sleep(3)
- while self.q_out.qsize() > 0:
- output.append(self.q_out.get())
- return "".join(output)
+ def get_stats(self, *args, **kwargs):
+ raise NotImplementedError
def collect_kpi(self):
- result = self.get_stats_vpe()
- collect_stats = self._collect_resource_kpi()
- result["collect_stats"] = collect_stats
- LOG.debug("vPE collet Kpis: %s", result)
- return result
-
- def get_stats_vpe(self):
- ''' get vpe statistics '''
- result = {'pkt_in_up_stream': 0, 'pkt_drop_up_stream': 0,
- 'pkt_in_down_stream': 0, 'pkt_drop_down_stream': 0}
- up_stat_commands = ['p 5 stats port in 0', 'p 5 stats port out 0',
- 'p 5 stats port out 1']
- down_stat_commands = ['p 9 stats port in 0', 'p 9 stats port out 0']
- pattern = \
- "Pkts in:\\s(\\d+)\\r\\n\\tPkts dropped by " \
- "AH:\\s(\\d+)\\r\\n\\tPkts dropped by other:\\s(\\d+)"
-
- for cmd in up_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_up_stream"] = \
- result.get("pkt_in_up_stream", 0) + int(match.group(1))
- result["pkt_drop_up_stream"] = \
- result.get("pkt_drop_up_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
-
- for cmd in down_stat_commands:
- stats = self.execute_command(cmd)
- match = re.search(pattern, stats, re.MULTILINE)
- if match:
- result["pkt_in_down_stream"] = \
- result.get("pkt_in_down_stream", 0) + int(match.group(1))
- result["pkt_drop_down_stream"] = \
- result.get("pkt_drop_down_stream", 0) + \
- int(match.group(2)) + int(match.group(3))
+ result = {
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'pkt_in_down_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'collect_stats': self.resource_helper.collect_kpi(),
+ }
+
+ indexes_in = [1]
+ indexes_drop = [2, 3]
+ command = 'p {0} stats port {1} 0'
+ for index, direction in ((5, 'up'), (9, 'down')):
+ key_in = "pkt_in_{0}_stream".format(direction)
+ key_drop = "pkt_drop_{0}_stream".format(direction)
+ for mode in ('in', 'out'):
+ stats = self.vnf_execute(command.format(index, mode))
+ match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
+ if not match:
+ continue
+ result[key_in] += sum(int(match.group(x)) for x in indexes_in)
+ result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
+
+ LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
return result
diff --git a/yardstick/network_services/vnf_generic/vnfdgen.py b/yardstick/network_services/vnf_generic/vnfdgen.py
index b56a91915..0120b493e 100644
--- a/yardstick/network_services/vnf_generic/vnfdgen.py
+++ b/yardstick/network_services/vnf_generic/vnfdgen.py
@@ -14,11 +14,16 @@
""" Generic file to map and build vnf discriptor """
from __future__ import absolute_import
-import collections
+from functools import reduce
import jinja2
+import logging
import yaml
+from yardstick.common.utils import try_int
+
+LOG = logging.getLogger(__name__)
+
def render(vnf_model, **kwargs):
"""Render jinja2 VNF template
@@ -40,7 +45,8 @@ def generate_vnfd(vnf_model, node):
as input for GenericVNF.__init__
"""
# get is unused as global method inside template
- node["get"] = get
+ # node["get"] = key_flatten_get
+ node["get"] = deepgetitem
# Set Node details to default if not defined in pod file
# we CANNOT use TaskTemplate.render because it does not allow
# for missing variables, we need to allow password for key_filename
@@ -52,36 +58,34 @@ def generate_vnfd(vnf_model, node):
return filled_vnfd
-def dict_key_flatten(data):
- """ Convert nested dict structure to dotted key
- (e.g. {"a":{"b":1}} -> {"a.b":1}
-
- :param data: nested dictionary
- :return: flat dicrionary
- """
- next_data = {}
-
- # check for non-string iterables
- if not any((isinstance(v, collections.Iterable) and not isinstance(v, str))
- for v in data.values()):
- return data
+# dict_flatten was causing recursion errors with Jinja2 so we removed and replaced
+# which this function from stackoverflow that doesn't require generating entire dictionaries
+# each time we query a key
+def deepgetitem(obj, item, default=None):
+ """Steps through an item chain to get the ultimate value.
- for key, val in data.items():
- if isinstance(val, collections.Mapping):
- for n_k, n_v in val.items():
- next_data["%s.%s" % (key, n_k)] = n_v
- elif isinstance(val, collections.Iterable) and not isinstance(val,
- str):
- for index, item in enumerate(val):
- next_data["%s%d" % (key, index)] = item
- else:
- next_data[key] = val
+ If ultimate value or path to value does not exist, does not raise
+ an exception and instead returns `fallback`.
- return dict_key_flatten(next_data)
+ Based on
+ https://stackoverflow.com/a/38623359
+ https://stackoverflow.com/users/1820042/donny-winston
+ add try_int to work with sequences
-def get(obj, key, *args):
- """ Get template key from dictionary, get default value or raise an exception
+ >>> d = {'snl_final': {'about': {'_icsd': {'icsd_id': 1, 'fr': [2, 3]}}}}
+ >>> deepgetitem(d, 'snl_final.about._icsd.icsd_id')
+ 1
+ >>> deepgetitem(d, 'snl_final.about._sandbox.sbx_id')
+ >>>
+ >>> deepgetitem(d, 'snl_final.about._icsd.fr.1')
+ 3
"""
- data = dict_key_flatten(obj)
- return data.get(key, *args)
+ def getitem(obj, name):
+ # if integer then list index
+ name = try_int(name)
+ try:
+ return obj[name]
+ except (KeyError, TypeError, IndexError):
+ return default
+ return reduce(getitem, item.split('.'), obj)